Skip to content

Commit 1262bb4

Browse files
committed
feat: named queues with concurrency caps, backpressure, and live config updates
- QueueConfig(concurrency, max_size) for per-queue limits - queue= on task decorator and add_task() for routing - QueueFullError when max_size is reached - REJECTED status for tasks blocked by queue policy - queue_stats() and update_queue_config() on TaskManager - Fan-out peer endpoint for multi-instance task aggregation - Dashboard: Queues tab, Queue column, Queue filter, dark mode - Docs and README updated for all new features
1 parent f3c8746 commit 1262bb4

40 files changed

Lines changed: 3936 additions & 871 deletions

README.md

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ The route signature does not change. Tasks that fail are retried. If the server
7979
## Features
8080

8181
- Automatic retries with configurable delay and exponential backoff
82-
- Task IDs and full lifecycle tracking: `PENDING`, `RUNNING`, `SUCCESS`, `FAILED`, `INTERRUPTED`
82+
- Task IDs and full lifecycle tracking: `PENDING`, `RUNNING`, `SUCCESS`, `FAILED`, `INTERRUPTED`, `REJECTED`
8383
- Live admin dashboard over SSE at `/tasks/dashboard`
8484
- SQLite persistence out of the box; Redis, PostgreSQL, and MySQL as optional extras
8585
- Pending task requeue: unfinished tasks at shutdown are re-dispatched on startup
@@ -94,6 +94,7 @@ The route signature does not change. Tasks that fail are retried. If the server
9494
- Trace context propagation: OpenTelemetry spans flow from the request into background execution (Python 3.11+)
9595
- Process executor: `executor='process'` routes CPU-bound tasks through a `ProcessPoolExecutor`, bypassing the GIL with true parallel workers
9696
- Concurrency controls: opt-in semaphore for async tasks, dedicated thread pool for sync tasks, and configurable process worker count
97+
- Named queues: `QueueConfig(concurrency, max_size)` for independent concurrency caps and backpressure per queue, with `QueueFullError` when a queue is full
9798
- Priority queues: `priority=` on `@task_manager.task()` or `add_task()`, higher-priority tasks run first, equal-priority tasks are FIFO
9899
- Eager dispatch: `eager=True` starts a task immediately via `asyncio.create_task` before the HTTP response is sent
99100
- Scheduled tasks: `@task_manager.schedule(every=)` and `cron=` with distributed lock for multi-instance
@@ -166,6 +167,8 @@ def route(tasks=Depends(task_manager.get_tasks)):
166167
| `requeue_on_interrupt` | `bool` | `False` | Requeue this task if it was mid-execution at shutdown. Only set for idempotent tasks. |
167168
| `eager` | `bool` | `False` | Start the task via `asyncio.create_task` immediately when `add_task()` is called, before the response is sent. Per-call `eager` on `add_task()` overrides this. |
168169
| `priority` | `int \| None` | `None` | Route through the priority queue. Higher integers run first. Conventional range 1 (lowest) to 10 (highest). Per-call `priority` on `add_task()` overrides this. |
170+
| `queue` | `str \| None` | `None` | Route this task to a named queue. Per-call `queue` on `add_task()` overrides this. |
171+
| `executor` | `str \| None` | `None` | Force a specific executor: `"async"`, `"thread"`, or `"process"`. When `None`, the executor is inferred from the function signature. |
169172

170173
## Idempotency keys
171174

@@ -332,6 +335,51 @@ task_id = tasks.add_task(process_item, item_id) # use decorator de
332335

333336
Tasks with no priority route through Starlette's normal background task list unchanged.
334337

338+
## Named queues
339+
340+
Define named queues with independent concurrency caps and backpressure limits using `QueueConfig`. Tasks route to a queue via `queue=` on the decorator or per call on `add_task()`.
341+
342+
```python
343+
from fastapi_taskflow import TaskManager
344+
from fastapi_taskflow.models import QueueConfig
345+
346+
task_manager = TaskManager(
347+
queues={
348+
"email": QueueConfig(concurrency=30, max_size=500),
349+
"reports": QueueConfig(concurrency=4, max_size=50),
350+
"default": QueueConfig(concurrency=20),
351+
},
352+
)
353+
354+
355+
@task_manager.task(retries=3, queue="email")
356+
async def send_email(address: str) -> None:
357+
...
358+
359+
360+
@task_manager.task(queue="reports")
361+
def generate_report(user_id: int) -> None:
362+
...
363+
```
364+
365+
When a queue is at its `max_size` limit, `add_task()` raises `QueueFullError` so callers can return a 429 rather than silently growing memory. Unknown queue names fall back to `default` with a warning log.
366+
367+
```python
368+
from fastapi_taskflow import QueueFullError
369+
370+
try:
371+
task_id = tasks.add_task(generate_report, user_id)
372+
except QueueFullError:
373+
raise HTTPException(status_code=429, detail="Report queue is full")
374+
```
375+
376+
`queue_stats()` returns per-queue pending, running, and finished counts. `update_queue_config()` changes concurrency and max_size at runtime without a restart.
377+
378+
```python
379+
task_manager.queue_stats()
380+
task_manager.update_queue_config("email", concurrency=50, max_size=1000)
381+
```
382+
335383
## Eager dispatch
336384

337385
Set `eager=True` to start a task via `asyncio.create_task` the moment `add_task()` is called, before FastAPI sends the response. Useful for batch endpoints where multiple tasks are added in a single request handler and you want them to run concurrently rather than queued sequentially.

docs/api/managed-background-tasks.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def add_task(
4040
tags: dict[str, str] | None = None,
4141
eager: bool | None = None,
4242
priority: int | None = None,
43+
queue: str | None = None,
4344
**kwargs: Any,
4445
) -> str
4546
```
@@ -55,7 +56,8 @@ This overrides `BackgroundTasks.add_task()`, which returns `None`. If you are al
5556
| `idempotency_key` | `str \| None` | `None` | Deduplication key. If a non-failed task with the same key already exists in the store or backend, its `task_id` is returned and `func` is not enqueued again. |
5657
| `tags` | `dict[str, str] \| None` | `None` | Key/value labels attached to this task. Forwarded to every `LogEvent` and `LifecycleEvent` emitted for the task. |
5758
| `eager` | `bool \| None` | `None` | When `True`, dispatch via `asyncio.create_task` immediately rather than waiting for the response to be sent. Overrides the decorator-level `eager` setting for this call only. |
58-
| `priority` | `int \| None` | `None` | Route through the priority queue instead of Starlette's background task list. Higher values run first; the conventional range is 1 (lowest) to 10 (highest). Overrides the decorator-level `priority` for this call only. Mutually exclusive with `eager`: when `priority` is set, `eager` is ignored. |
59+
| `priority` | `int \| None` | `None` | Route through the priority queue instead of Starlette's background task list. Higher values run first; the conventional range is 1 (lowest) to 10 (highest). Overrides the decorator-level `priority` for this call only. When named queues are active, controls ordering within the target queue's heap. Mutually exclusive with `eager`: when `priority` is set, `eager` is ignored. |
60+
| `queue` | `str \| None` | `None` | Named queue to route this task into. Overrides the decorator-level `queue` for this call only. Only effective when the named queue system is active (any `queues=` or `max_size=` argument was passed to `TaskManager`). Raises `QueueFullError` if the target queue is at its `max_size` limit. |
5961
| `**kwargs` | `Any` | | Keyword arguments forwarded to `func`. |
6062

6163
**Examples:**

docs/api/models.md

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class TaskStatus(str, Enum):
2020
FAILED = "failed"
2121
INTERRUPTED = "interrupted"
2222
CANCELLED = "cancelled"
23+
REJECTED = "rejected"
2324
```
2425

2526
| Value | Meaning |
@@ -30,6 +31,7 @@ class TaskStatus(str, Enum):
3031
| `FAILED` | Task raised an exception on its final attempt after all retries were exhausted. Terminal. |
3132
| `INTERRUPTED` | Task was mid-execution when the app shut down and `requeue_on_interrupt` was not enabled. Saved to history, visible in the dashboard, and not re-executed automatically. Can be retried via `POST /tasks/{task_id}/retry`. Terminal. |
3233
| `CANCELLED` | Task was cancelled before execution via `POST /tasks/{task_id}/cancel`. Terminal; the task will not run. |
34+
| `REJECTED` | The target named queue was at its `max_size` limit when `add_task()` was called. The task record is created with this status and `QueueFullError` is raised. Only occurs when the named queue system is active with a `max_size` limit. Can be retried via `POST /tasks/{task_id}/retry`. Terminal. |
3335

3436
---
3537

@@ -64,6 +66,7 @@ class TaskRecord:
6466
source: str
6567
priority: int | None
6668
executor: str | None
69+
queue: str
6770
```
6871

6972
### Fields
@@ -88,8 +91,9 @@ Fields marked **auto** are set by the framework. Fields marked **caller** are pr
8891
| `tags` | `dict[str, str]` | caller | Key/value labels attached at enqueue time. Forwarded to every `LogEvent` and `LifecycleEvent`. Stored as part of the snapshot payload, not as a separate column. |
8992
| `encrypted_payload` | `bytes \| None` | auto | Fernet-encrypted `(args, kwargs)` when `encrypt_args_key` is configured on `TaskManager`. When present, `args` and `kwargs` are stored empty. Not included in `to_dict()` output or API responses. |
9093
| `source` | `str` | auto | How the task was created: `"manual"` for tasks enqueued via `add_task()`, `"scheduled"` for tasks fired by the periodic scheduler. |
91-
| `priority` | `int \| None` | caller | Priority level assigned at enqueue time. `None` when routed through the standard Starlette mechanism. Any integer when routed through the priority queue; higher values run first. |
94+
| `priority` | `int \| None` | caller | Priority level assigned at enqueue time. `None` when routed through the standard Starlette mechanism. Any integer when routed through the priority queue or named queue heap; higher values run first. |
9295
| `executor` | `str \| None` | auto | The executor that ran (or will run) this task: `"async"`, `"thread"`, or `"process"`. Reflects the effective executor after auto-detection. Shown in the dashboard detail panel. |
96+
| `queue` | `str` | auto | The named queue this task was routed into. Always `"default"` when the named queue system is not active. Set from the per-call `queue=` argument, then the decorator-level `queue=`, then `"default"`. |
9397

9498
### Properties
9599

@@ -179,11 +183,52 @@ class TaskConfig:
179183
| `name` | `str \| None` | `None` | Display name in logs and the dashboard. Defaults to the function's `__name__`. |
180184
| `requeue_on_interrupt` | `bool` | `False` | When `True`, a task interrupted at shutdown is reset to `PENDING` and re-dispatched on next startup. Only safe for idempotent tasks. |
181185
| `eager` | `bool` | `False` | Dispatch via `asyncio.create_task` immediately when `add_task()` is called rather than after the response is sent. |
182-
| `priority` | `int \| None` | `None` | Execution priority. `None` routes through the standard Starlette mechanism. Any integer routes through the priority queue; higher values run first. |
186+
| `priority` | `int \| None` | `None` | Execution priority. `None` routes through the standard Starlette mechanism. Any integer routes through the priority queue or named queue heap; higher values run first. |
187+
| `queue` | `str \| None` | `None` | Named queue this function is routed into by default. `None` routes to `"default"` when the named queue system is active. |
183188
| `executor` | `str \| None` | `None` | Configured executor. `"async"`, `"thread"`, or `"process"`. `None` means auto-detect from the function signature at dispatch time. |
184189

185190
---
186191

192+
## QueueConfig
193+
194+
`QueueConfig` holds the configuration for a single named queue. Pass instances of this in the `queues` dict when constructing `TaskManager`.
195+
196+
```python
197+
from fastapi_taskflow.models import QueueConfig
198+
```
199+
200+
```python
201+
@dataclass
202+
class QueueConfig:
203+
concurrency: int | None = None
204+
max_size: int | None = None
205+
```
206+
207+
| Field | Type | Default | Description |
208+
|-------|------|---------|-------------|
209+
| `concurrency` | `int \| None` | `None` | Maximum number of tasks from this queue that may run concurrently. When the limit is reached, the queue drainer blocks until a slot is released. `None` removes the limit. |
210+
| `max_size` | `int \| None` | `None` | Maximum number of tasks allowed to wait in this queue's heap. When the heap is full, `add_task()` creates a `REJECTED` task record and raises `QueueFullError`. `None` removes the limit. |
211+
212+
`QueueConfig` fields can be updated at runtime via `PATCH /tasks/queues/{name}` or `TaskManager.update_queue_config()`. Changes take effect immediately for new tasks; in-flight tasks are not affected. Updated values are persisted to the backend and restored on the next startup.
213+
214+
**Example:**
215+
216+
```python
217+
from fastapi_taskflow import TaskManager
218+
from fastapi_taskflow.models import QueueConfig
219+
220+
task_manager = TaskManager(
221+
snapshot_db="tasks.db",
222+
queues={
223+
"email": QueueConfig(concurrency=30, max_size=500),
224+
"reports": QueueConfig(concurrency=4, max_size=50),
225+
"default": QueueConfig(concurrency=20),
226+
},
227+
)
228+
```
229+
230+
---
231+
187232
## AuditEntry
188233

189234
`AuditEntry` is a single record in the in-memory audit log, written whenever a retry or cancel action is performed via the API.

docs/api/task-manager.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ TaskManager(
3737
max_process_workers: int | None = None,
3838
process_shutdown_timeout: float = 30.0,
3939
retention_days: float | None = None,
40+
retry_replaces_original: bool = True,
41+
queues: dict[str, QueueConfig] | None = None,
42+
max_size: int | None = None,
43+
instance_url: str | None = None,
44+
instance_tasks_prefix: str = "",
45+
registry_ttl: int = 90,
46+
registry_heartbeat: int = 30,
4047
)
4148
```
4249

@@ -77,6 +84,28 @@ TaskManager(
7784
|-----------|------|---------|-------------|
7885
| `encrypt_args_key` | `bytes \| str \| None` | `None` | A Fernet key for encrypting task args and kwargs at rest. When set, arguments are encrypted at enqueue time and decrypted only when the executor is about to call the function. Accepts a URL-safe base64 string or raw bytes from `Fernet.generate_key()`. Requires `pip install "fastapi-taskflow[encryption]"`. |
7986

87+
### Retry parameter
88+
89+
| Parameter | Type | Default | Description |
90+
|-----------|------|---------|-------------|
91+
| `retry_replaces_original` | `bool` | `True` | When `True`, retrying a task via the API (single retry, bulk retry, or timed retry) removes the original record from the in-memory store and backend after the new task is dispatched. The dashboard and history show only the new run. When `False`, both the original and the new task record are kept. Applies to all retry paths regardless of whether named queues are active. |
92+
93+
### Named queue parameters
94+
95+
| Parameter | Type | Default | Description |
96+
|-----------|------|---------|-------------|
97+
| `queues` | `dict[str, QueueConfig] \| None` | `None` | Named queues with individual concurrency and backpressure settings. When provided, tasks are routed through the named queue system. A `"default"` queue is created automatically if not included. |
98+
| `max_size` | `int \| None` | `None` | Backpressure limit for the implicit `"default"` queue. When the default queue reaches this many tasks pending, `add_task()` raises `QueueFullError`. Activates the named queue system even when `queues` is not provided. |
99+
100+
### Multi-instance parameters
101+
102+
| Parameter | Type | Default | Description |
103+
|-----------|------|---------|-------------|
104+
| `instance_url` | `str \| None` | `None` | Public base URL of this instance (for example `"http://10.0.0.1:8000"`). When set alongside a backend that supports `save_metadata`/`load_metadata` (SQLite, Redis, Postgres, MySQL), this instance registers itself so peers can fan out to it. No registration or fan-out occurs when `None`. |
105+
| `instance_tasks_prefix` | `str` | `""` | URL prefix where the tasks router is mounted on this instance (for example `"/api/tasks"`). Peers append this to `instance_url` when building the fan-out URL. Must match the prefix used when mounting `TaskAdmin` or the router. |
106+
| `registry_ttl` | `int` | `90` | Seconds before a peer registry entry is considered stale and excluded from fan-out. Should be at least `2 * registry_heartbeat`. |
107+
| `registry_heartbeat` | `int` | `30` | Seconds between heartbeat writes that keep this instance's registry entry fresh. |
108+
80109
---
81110

82111
## Decorators
@@ -95,6 +124,7 @@ Registers a function as a managed background task. The decorated function is ret
95124
requeue_on_interrupt=False,
96125
eager=False,
97126
priority=None,
127+
queue=None,
98128
executor=None,
99129
)
100130
def my_task(user_id: int) -> None:
@@ -110,7 +140,8 @@ def my_task(user_id: int) -> None:
110140
| `name` | `str \| None` | function name | Override the display name in logs and the dashboard. |
111141
| `requeue_on_interrupt` | `bool` | `False` | When `True` and `requeue_pending=True` on the manager, a task interrupted at shutdown is reset to `PENDING` and re-dispatched on next startup. Only set this for idempotent functions that are safe to restart from scratch. |
112142
| `eager` | `bool` | `False` | Dispatch via `asyncio.create_task` immediately when `add_task()` is called, before FastAPI sends the response. Per-call `eager` on `add_task()` overrides this value. |
113-
| `priority` | `int \| None` | `None` | Route through the priority queue instead of Starlette's background task list. Higher values run first. The conventional range is 1 (lowest) to 10 (highest). Per-call `priority` on `add_task()` overrides this value. |
143+
| `priority` | `int \| None` | `None` | Route through the priority queue instead of Starlette's background task list. Higher values run first. The conventional range is 1 (lowest) to 10 (highest). Per-call `priority` on `add_task()` overrides this value. When named queues are active, controls ordering within the target queue's heap. |
144+
| `queue` | `str \| None` | `None` | Named queue to route this function into. Requires `queues=` on `TaskManager`. Per-call `queue` on `add_task()` overrides this value. Tasks routed to an unknown queue fall back to `"default"` with a warning. |
114145
| `executor` | `"async" \| "thread" \| "process" \| None` | `None` | Force a specific executor. `"async"` requires a coroutine. `"thread"` requires a plain function. `"process"` routes to a `ProcessPoolExecutor` and requires a module-level function with picklable arguments. `None` auto-detects from the function signature. |
115146

116147
Raises `ValueError` at decoration time if the function is incompatible with the requested executor (for example, `executor='async'` on a sync function, or `executor='process'` on a lambda or nested function).
@@ -155,6 +186,7 @@ async def cleanup_expired_sessions() -> None:
155186
| `name` | `str \| None` | function name | Override the display name in logs and the dashboard. |
156187
| `run_on_startup` | `bool` | `False` | When `True`, fire the task on the first scheduler tick immediately after startup, rather than waiting for the first interval or cron slot. |
157188
| `timezone` | `str` | `"UTC"` | IANA timezone name used when evaluating `cron` expressions (for example `"America/New_York"`). Ignored when `every` is used. |
189+
| `queue` | `str \| None` | `None` | Named queue to route each periodic firing into. When the named queue system is active, the task is subject to that queue's concurrency limit and backpressure. Defaults to `"default"`. |
158190
| `executor` | `"async" \| "thread" \| "process" \| None` | `None` | Force a specific executor for each firing. Same constraints as on `@task()`. `None` auto-detects from the function signature. |
159191

160192
Raises `ValueError` if neither or both of `every` and `cron` are provided, or if `executor` is incompatible with the function. Raises `ImportError` if `cron` is used and `croniter` is not installed.

0 commit comments

Comments
 (0)