Worker¶
aioq.worker.Worker dequeues and executes jobs from one or more queues.
Constructor¶
from aioq.worker import Worker
worker = Worker(
app=app,
queues=["default", "email"],
concurrency=10,
heartbeat_interval=10.0,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
app |
Aarq |
— | Application instance |
queues |
list[str] \| None |
["default"] |
Queues to consume |
concurrency |
int |
10 |
Max concurrent jobs |
heartbeat_interval |
float |
10.0 |
Seconds between heartbeats |
await worker.run()¶
Start the worker. This method blocks until the worker stops.
Startup sequence:
broker.connect()broker.register_worker(worker_id, queues)- Install SIGTERM / SIGINT handlers
- Start heartbeat loop
- Start cron loop (if any cron tasks are registered)
- Enter the main dequeue loop
Shutdown sequence (on SIGTERM/SIGINT):
- Stop the dequeue loop
- Cancel heartbeat and cron tasks
- Wait for all in-flight jobs to finish (
asyncio.gather) broker.deregister_worker(worker_id)broker.disconnect()
worker.worker_id¶
A UUID string assigned on construction. Unique per process.
Internal loops¶
Heartbeat loop¶
Runs every heartbeat_interval seconds, calling broker.heartbeat_worker(). Failures are silently suppressed so a temporary broker outage doesn't crash the worker.
Cron loop¶
Runs every second, checking whether any cron task is due. When a task fires, it runs as an asyncio.Task and its next run time is scheduled immediately.