Running Workers¶
Workers dequeue jobs and execute them concurrently. Each worker runs as a single process.
CLI¶
app_path is a module:attribute path to your Aarq instance:
Options¶
| Flag | Default | Description |
|---|---|---|
-q / --queue |
default |
Queue(s) to consume. Repeatable. |
-c / --concurrency |
10 |
Max concurrent jobs |
--log-level |
info |
Logging level (debug, info, warning, error) |
# Consume two queues with higher concurrency
aioq worker myapp.tasks:app -q default -q email --concurrency 20
# Verbose logging for debugging
aioq worker myapp.tasks:app --log-level debug
Programmatic start¶
import asyncio
from aioq.worker import Worker
from myapp.tasks import app
worker = Worker(app, queues=["default", "email"], concurrency=20)
asyncio.run(worker.run())
Concurrency model¶
The worker uses asyncio.Semaphore(concurrency) to cap the number of jobs running at the same time. All jobs run as asyncio.Task objects in the same event loop — this means CPU-bound work will block other jobs. Use asyncio.to_thread() or a ProcessPoolExecutor for CPU-intensive tasks.
import asyncio
@app.task(queue="cpu")
async def heavy_compute(ctx, n: int) -> int:
# Offload to a thread to avoid blocking the event loop
return await asyncio.to_thread(expensive_function, n)
Heartbeat¶
Workers send a heartbeat to the broker every 10 seconds. The dashboard uses this to show a liveness indicator. A worker is considered dead if its last heartbeat is older than 30 seconds.
Graceful shutdown¶
When the worker receives SIGTERM or SIGINT (Ctrl-C):
- Stops pulling new jobs from the queue
- Waits for all in-flight jobs to finish
- Deregisters from the broker
- Exits cleanly
This means a docker stop or kubectl rollout restart will not drop in-flight jobs.
Running multiple workers¶
Run multiple worker processes against the same broker for horizontal scaling:
# Terminal 1
aioq worker myapp.tasks:app -q default
# Terminal 2
aioq worker myapp.tasks:app -q email --concurrency 5
Each worker gets its own UUID and registers independently.