Enqueueing Jobs¶
Once a task is defined, call .enqueue() on the decorated function to push a job onto the queue.
Basic enqueue¶
job = await my_task.enqueue(arg1, arg2, kwarg=value)
print(job.id) # UUID string
print(job.status) # "pending"
.enqueue() returns a Job object immediately after writing to the broker.
Deferred jobs¶
Run a job after a fixed delay:
Run a job at a specific time:
from datetime import datetime
job = await send_email.enqueue(
to="user@example.com",
defer_until=datetime(2026, 1, 1, 9, 0),
)
Note
Deferred jobs are stored in a separate sorted set (Redis) or filtered by run_at (PostgreSQL). The worker promotes them to the active queue once their scheduled time passes.
Enqueue from within a task¶
@app.task(queue="pipeline")
async def step_one(ctx, data: dict):
result = process(data)
await step_two.enqueue(result)
The broker is available as ctx["broker"] if you need it directly.
Checking job status¶
async with broker:
job = await my_task.enqueue(value=42)
# Poll for completion
import asyncio
while True:
job = await broker.get_job(job.id)
if job.status in ("completed", "failed", "cancelled"):
break
await asyncio.sleep(1)
print(job.status, job.result, job.error)
Cancelling a pending job¶
cancelled = await broker.cancel_job(job.id)
# True if the job was pending/retrying and is now cancelled
# False if the job was already running/completed
Retrying a failed job¶
retried = await broker.retry_job(job.id)
# True if the job was failed/cancelled and has been re-enqueued as pending