Retry stalled jobs¶
When a worker gets a SIGINT or SIGTERM signal requesting it to terminate it
waits for running jobs to finish before actually exiting. But, if the worker gets
a second SIGINT or SIGTERM signal, or if it’s killed with SIGKILL, it will
terminate immediately, possibly leaving jobs with the doing status in the queue.
And, if no specific action is taken, these stalled jobs will remain in the queue
forever, and their execution will never resume.
To address this problem, Procrastinate workers update heartbeats at a regular
interval (every 10 seconds by default). If a worker is terminated without a regular
shutdown, the heartbeat of that worker will not be updated, and the worker will be
considered stalled. Jobs in the doing state of such stalled workers are considered
stalled as well and can be fetched by the with the JobManager.get_stalled_jobs()
method.
Note
Regular worker shutdowns delete the worker’s heartbeat from the database. Heartbeats of stalled worker are also pruned after a certain duration (30 seconds by default) to avoid having too many heartbeats of old worker runs in the database, but stalled jobs can still be detected.
Those stalled jobs can then be retried for example by a periodic task. To enable this add this task to your code:
@app.periodic(cron="*/10 * * * *")
@app.task(queueing_lock="retry_stalled_jobs", pass_context=True)
async def retry_stalled_jobs(context, timestamp):
stalled_jobs = await app.job_manager.get_stalled_jobs()
for job in stalled_jobs:
await app.job_manager.retry_job(job)
This defines a periodic task, configured to be deferred at every 10th minute. The task
retrieves all the jobs that have been in the doing status of workers that have not
received a heartbeat since the last (by default) 30 seconds. This duration can be
configured with the seconds_since_heartbeat parameter of the get_stalled_jobs method.
Note
If you change the seconds_since_heartbeat parameter, make sure to also check the
update_heartbeat_interval and stalled_worker_timeout parameters of the worker
and adjust them accordingly.
With this, if you have multiple workers, and, for some reason, one of them gets killed
while running jobs, then one of the remaining workers will run the
retry_stalled_jobs task, marking the stalled jobs for retry.
If you have specific rules for task retry (e.g. only some tasks should be retried, based
on specific parameters, or the duration before a task is considered stalled should
depend on the task), you’re free to make the periodic task function more complex and add
your logic to it. See JobManager.get_stalled_jobs() for details.
Warning
get_stalled_jobs also accepts a nb_seconds parameter, which if set fetches
stalled jobs that have been in the doing state for more than the specified seconds
without even considering the worker heartbeat. This parameter is deprecated and will be
removed in a next major release as it may lead to wrongly retrying jobs that are still
running.