Cancel a job¶
We can cancel a job that has not yet been processed by a worker. We can also mark a job that is currently being processed for abortion, but this request has to be handled by the task itself.
Cancel a job (that is not being processed yet)¶
# by using the sync method
app.job_manager.cancel_job_by_id(33)
# or by using the async method
await app.job_manager.cancel_job_by_id_async(33)
Delete the cancelled job¶
A cancelled job can also be deleted from the database.
# by using the sync method
app.job_manager.cancel_job_by_id(33, delete_job=True)
# or by using the async method
await app.job_manager.cancel_job_by_id_async(33, delete_job=True)
Mark a running job for abortion¶
If a worker has not picked up the job yet, the below command behaves like the
command without the abort option. But if a job is already running, the abort option marks this job for abortion (see below
how to handle this request).
# by using the sync method
app.job_manager.cancel_job_by_id(33, abort=True)
# or by using the async method
await app.job_manager.cancel_job_by_id_async(33, abort=True)
Behind the scenes, the worker receives a Postgres notification every time a job is requested to abort, (unless listen_notify=False).
The worker also polls (respecting fetch_job_polling_interval) the database for abortion requests, as long as the worker is running at least one job (in the absence of running job, there is nothing to abort).
Note
When a job is requested to abort and that job fails, it will not be retried (regardless of the retry strategy).
Handle an abortion request inside the task¶
Sync tasks¶
In a sync task, we can check (for example, periodically) if the task should be
aborted. If we want to respect that abortion request (we don’t have to), we raise a
JobAborted error. Any message passed to JobAborted (e.g.
raise JobAborted("custom message")) will end up in the logs.
@app.task(pass_context=True)
def my_task(context):
for i in range(100):
if context.should_abort():
raise exceptions.JobAborted
do_something_expensive()
Async tasks¶
For async tasks (coroutines), they are cancelled via the asyncio cancellation mechasnism.
@app.task()
async def my_task():
do_something_synchronous()
# if the job is aborted while it waits for do_something to complete, asyncio.CancelledError will be raised here
await do_something()
If you want to have some custom behavior at cancellation time, use a combination of shielding and capturing except asyncio.CancelledError.
@app.task()
async def my_task():
try:
important_task = asyncio.create_task(something_important())
# shield something_important from being cancelled
await asyncio.shield(important_task)
except asyncio.CancelledError:
# capture the error and waits for something important to complete
await important_task
# raise if the job should be marked as aborted, or swallow CancelledError if the job should be
# marked as suceeeded
raise