Defining Tasks¶
Tasks are async functions decorated with @app.task(...). The decorator registers the function in the app and returns a TaskDef object that exposes an .enqueue() method.
Basic task¶
from aioq import Aarq
from aioq.backends import RedisBroker
app = Aarq(broker=RedisBroker())
@app.task(queue="default")
async def my_task(ctx, message: str) -> str:
print(f"Running: {message}")
return message.upper()
Task options¶
@app.task(
queue="default", # (str) queue to publish to
retries=0, # (int) max retry attempts on failure
retry_delay=5.0, # (float) seconds to wait before each retry
save_result=False, # (bool) persist return value in the broker
result_ttl=3600, # (int) seconds to keep the result (Redis only)
)
async def my_task(ctx, ...):
...
| Option | Type | Default | Description |
|---|---|---|---|
queue |
str |
"default" |
Queue name |
retries |
int |
0 |
Max retry attempts |
retry_delay |
float |
5.0 |
Seconds between retries |
save_result |
bool |
False |
Persist return value |
result_ttl |
int |
3600 |
Result TTL in seconds (Redis) |
The ctx argument¶
Every task receives a context dict as its first argument:
async def my_task(ctx, arg1, arg2):
worker_id = ctx["worker_id"] # str — UUID of the executing worker
job_id = ctx["job_id"] # str — UUID of the current job
broker = ctx["broker"] # BaseBroker — broker instance
You can use ctx["broker"] to enqueue follow-up jobs from within a task:
@app.task(queue="default")
async def parent_task(ctx, n: int):
for i in range(n):
await child_task.enqueue(i)
Retry behaviour¶
When a task raises an exception and retries > 0, the worker:
- Increments
job.retries - Sets
job.status = "retrying" - Waits
retry_delayseconds - Re-enqueues the job as
pending
Once job.retries == job.max_retries and the task fails again, the job is marked failed permanently.
@app.task(queue="default", retries=5, retry_delay=30.0)
async def flaky_api_call(ctx, url: str):
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
return response.json()
Result storage¶
Enable save_result=True to persist the return value. Retrieve it via the broker:
@app.task(queue="default", save_result=True, result_ttl=7200)
async def compute(ctx, n: int) -> int:
return n * n
# After the job completes:
job = await broker.get_job(job_id)
print(job.result) # 25 (if n=5)
Note
result_ttl is only honoured by RedisBroker. PostgresBroker stores results indefinitely until the row is deleted.
Task name¶
The task name is derived automatically as {module}.{qualname}:
# In module "myapp.tasks"
@app.task()
async def send_email(ctx, to: str): ...
# task name: "myapp.tasks.send_email"
print(send_email.name)
The worker uses this name to look up the function at execution time, so tasks.py must be importable from the worker process.