API Reference

App

class procrastinate.App(*, connector, import_paths=None, worker_defaults=None, periodic_defaults=None)

The App is the main entry point for procrastinate integration.

Instantiate a single App in your code and use it to decorate your tasks with App.task.

You can run a worker with App.run_worker.

tasks

The mapping of all tasks known by the app. Only procrastinate is expected to make changes to this mapping.

Type

Dict[str, tasks.Task]

job_manager

The JobManager linked to the application

Type

manager.JobManager

Parameters
  • connector (BaseConnector) – Typically an AiopgConnector. It will be responsible for all communications with the database. Mandatory.

  • import_paths (Optional[Iterable[str]]) – List of python dotted paths of modules to import, to make sure that the workers know about all possible tasks. If there are tasks in a module that is neither imported as a side effect of importing the App, nor specified in this list, and a worker encounters a task defined in that module, the task will fail (TaskNotFound). While it is not mandatory to specify paths to modules that you know have already been imported, it’s a good idea to do so.

  • worker_defaults (Optional[Dict]) – All the values passed here will override the default values sent when launching a worker. See App.run_worker for details.

  • periodic_defaults (Optional[Dict]) –

    Parameters for fine tuning the periodic tasks deferrer. Available parameters are:

    • max_delay: float, in seconds. When a worker starts and there’s a periodic task that has not been deferred, the worker will defer the task if it’s been due for less that this amount of time. This avoids new periodic tasks to be immediately deferred just after their first deployment. (defaults to 10 minutes)

add_task_alias(task, alias)

Add an alias to a task. This can be useful if a task was in a given Blueprint and moves to a different blueprint.

Parameters
  • task (Task) – Task to alias

  • alias (str) – New alias (including potential namespace, separated with :)

Return type

None

add_tasks_from(blueprint, *, namespace)

Copies over all tasks from a different blueprint, prefixing their names with the given namespace (using : as namespace separator).

Parameters
  • blueprint (Blueprint) – Blueprint to copy tasks from

  • namespace (str) – All task names (but not aliases) will be prefixed by this name, uniqueness will be enforced.

Raises

TaskAlreadyRegistered: – When trying to use a namespace that has already been used before

Return type

None

configure_task(name, *, allow_unknown=True, **kwargs)

Configure a task for deferring, using its name

Parameters
  • name (str) – Name of the task. If not explicitly defined, this will be the dotted path to the task (my.module.my_task)

  • **kwargs (Any) – Parameters from Task.configure

Returns

Launch .defer(**task_kwargs) on this object to defer your job.

Return type

jobs.JobDeferrer

classmethod from_path(dotted_path)

Create an App object by dynamically loading the object at the given path.

Parameters

dotted_path (str) – Dotted path to the object to load (e.g. mymodule.submodule.procrastinate_app)

Return type

App

periodic(*, cron, periodic_id='', **kwargs)

Task decorator, marks task as being scheduled for periodic deferring (see Launch a task periodically).

Parameters
  • cron (str) – Cron-like string. Optionally add a 6th column for seconds.

  • periodic_id (str) – Task name suffix. Used to distinct periodic tasks with different kwargs.

  • **kwargs – Additional parameters are passed to Task.configure.

run_worker(**kwargs)

Run a worker. This worker will run in the foreground and execute the jobs in the provided queues. If wait is True, the function will not return until the worker stops (most probably when it receives a stop signal). The default values of all parameters presented here can be overridden at the App level.

Parameters
  • queues (Optional[Iterable[str]]) – List of queues to listen to, or None to listen to every queue (defaults to None).

  • wait (bool) – If False, the worker will terminate as soon as it has caught up with the queues. If True, the worker will work until it is stopped by a signal (ctrl+c, SIGINT, SIGTERM) (defaults to True).

  • concurrency (int) – Indicates how many asynchronous jobs the worker can run in parallel. Do not use concurrency if you have synchronous blocking tasks. See Execute multiple jobs at the same time (defaults to 1).

  • name (Optional[str]) – Name of the worker. Will be passed in the JobContext and used in the logs (defaults to None which will result in the worker named worker).

  • timeout (float) – Indicates the maximum duration (in seconds) the worker waits between each database job poll. Raising this parameter can lower the rate at which the worker makes queries to the database for requesting jobs. (defaults to 5.0)

  • listen_notify (bool) – If True, the worker will dedicate a connection from the pool to listening to database events, notifying of newly available jobs. If False, the worker will just poll the database periodically (see timeout). (defaults to True)

  • delete_jobs (str) – If always, the worker will automatically delete all jobs on completion. If successful the worker will only delete successful jobs. If never, the worker will keep the jobs in the database. (defaults to never)

  • additional_context (Optional[Dict[str, Any]]) – If set extend the context received by the tasks when pass_context is set to True in the task definition.

This method is the synchronous counterpart of run_worker_async.

Return type

None

async run_worker_async(**kwargs)

Run a worker. This worker will run in the foreground and execute the jobs in the provided queues. If wait is True, the function will not return until the worker stops (most probably when it receives a stop signal). The default values of all parameters presented here can be overridden at the App level.

Parameters
  • queues (Optional[Iterable[str]]) – List of queues to listen to, or None to listen to every queue (defaults to None).

  • wait (bool) – If False, the worker will terminate as soon as it has caught up with the queues. If True, the worker will work until it is stopped by a signal (ctrl+c, SIGINT, SIGTERM) (defaults to True).

  • concurrency (int) – Indicates how many asynchronous jobs the worker can run in parallel. Do not use concurrency if you have synchronous blocking tasks. See Execute multiple jobs at the same time (defaults to 1).

  • name (Optional[str]) – Name of the worker. Will be passed in the JobContext and used in the logs (defaults to None which will result in the worker named worker).

  • timeout (float) – Indicates the maximum duration (in seconds) the worker waits between each database job poll. Raising this parameter can lower the rate at which the worker makes queries to the database for requesting jobs. (defaults to 5.0)

  • listen_notify (bool) – If True, the worker will dedicate a connection from the pool to listening to database events, notifying of newly available jobs. If False, the worker will just poll the database periodically (see timeout). (defaults to True)

  • delete_jobs (str) – If always, the worker will automatically delete all jobs on completion. If successful the worker will only delete successful jobs. If never, the worker will keep the jobs in the database. (defaults to never)

  • additional_context (Optional[Dict[str, Any]]) – If set extend the context received by the tasks when pass_context is set to True in the task definition.

This method is the asynchronous counterpart of run_worker.

Return type

None

task(_func=None, *, name=None, aliases=None, retry=False, pass_context=False, queue='default', lock=None, queueing_lock=None)

Declare a function as a task. This method is meant to be used as a decorator:

@app.task(...)
def my_task(args):
    ...

or:

@app.task
def my_task(args):
    ...

The second form will use the default value for all parameters.

Parameters
  • _func (Optional[Callable]) – The decorated function

  • queue (str) – The name of the queue in which jobs from this task will be launched, if the queue is not overridden at launch. Default is "default". When a worker is launched, it can listen to specific queues, or to all queues.

  • lock (Optional[str]) – Default value for the lock (see Task.defer).

  • queueing_lock (Optional[str]) – Default value for the queueing_lock (see Task.defer).

  • name (Optional[str]) – Name of the task, by default the full dotted path to the decorated function. if the function is nested or dynamically defined, it is important to give it a unique name, and to make sure the module that defines this function is listed in the import_paths of the procrastinate.App.

  • aliases (Optional[List[str]]) – Additional names for the task. The main use case is to gracefully rename tasks by moving the old name to aliases (these tasks can have been scheduled in a distant future, so the aliases can remain for a long time).

  • retry (Union[bool, int, RetryStrategy]) –

    Details how to auto-retry the task if it fails. Can be:

    • A boolean: will either not retry or retry indefinitely

    • An int: the number of retries before it gives up

    • A procrastinate.RetryStrategy instance for complex cases

    Default is no retry.

  • pass_context (bool) – Passes the task execution context in the task as first

Return type

Any

with_connector(connector)

Create another app instance sychronized with this one, with a different connector. For all things regarding periodic tasks, the original app (and its original connector) will be used, even when the new app’s methods are used.

Returns

A new compatible app.

Return type

App

Connectors

class procrastinate.AiopgConnector(*, json_dumps=None, json_loads=None, **kwargs)

Create a PostgreSQL connector using aiopg. The connector uses an aiopg.Pool, which is created internally, or set into the connector by calling AiopgConnector.open_async.

The pool connection parameters can be provided here. Alternatively, an already existing aiopg.Pool can be provided in the App.open_async, via the pool parameter.

All other arguments than json_dumps and json_loads are passed to aiopg.create_pool() (see aiopg documentation), with default values that may differ from those of aiopg (see the list of parameters below).

Parameters
  • json_dumps (Optional[Callable]) – The JSON dumps function to use for serializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc.

  • json_loads (Optional[Callable]) – The JSON loads function to use for deserializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc. Unused if the pool is externally created and set into the connector through the App.open_async method.

  • dsn (Optional[str]) – Passed to aiopg. Default is “” instead of None, which means if no argument is passed, it will connect to localhost:5432 instead of a Unix-domain local socket file.

  • enable_json (bool) – Passed to aiopg. Default is False instead of True to avoid messing with the global state.

  • enable_hstore (bool) – Passed to aiopg. Default is False instead of True to avoid messing with the global state.

  • enable_uuid (bool) – Passed to aiopg. Default is False instead of True to avoid messing with the global state.

  • cursor_factory (psycopg2.extensions.cursor) – Passed to aiopg. Default is psycopg2.extras.RealDictCursor instead of standard cursor. There is no identified use case for changing this.

  • maxsize (int) – Passed to aiopg. If value is 1, then listen/notify feature will be deactivated.

  • minsize (int) – Passed to aiopg. Initial connections are not opened when the connector is created, but at first use of the pool.

class procrastinate.Psycopg2Connector(*, json_dumps=None, json_loads=None, **kwargs)

Synchronous connector based on a psycopg2.pool.ThreadedConnectionPool.

This is used if you want your .defer() calls to be purely synchronous, not asynchronous with a sync wrapper. You may need this if your program is multi-threaded and doesn’t handle async loops well (see Synchronous deferring).

All other arguments than json_dumps are passed to ThreadedConnectionPool() (see psycopg2 documentation), with default values that may differ from those of psycopg2 (see a partial list of parameters below).

Parameters
  • json_dumps (Optional[Callable]) – The JSON dumps function to use for serializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc.

  • json_loads (Optional[Callable]) – The JSON loads function to use for deserializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc. Unused if the pool is externally created and set into the connector through the App.open method.

  • minconn (int) – Passed to psycopg2, default set to 1 (same as aiopg).

  • maxconn (int) – Passed to psycopg2, default set to 10 (same as aiopg).

  • dsn (Optional[str]) – Passed to psycopg2. Default is “” instead of None, which means if no argument is passed, it will connect to localhost:5432 instead of a Unix-domain local socket file.

  • cursor_factory (psycopg2.extensions.cursor) – Passed to psycopg2. Default is psycopg2.extras.RealDictCursor instead of standard cursor. There is no identified use case for changing this.

class procrastinate.testing.InMemoryConnector

An InMemoryConnector may be used for testing only. Tasks are not persisted and will be lost when the process ends.

While implementing the Connector interface, it also adds a few methods and attributes to ease testing.

jobs

Mapping of {<job id>: <Job database row as a dictionary>}

Type

Dict[int, Dict]

reset()

Removes anything the in-memory pseudo-database contains, to ensure test independence.

Tasks

class procrastinate.tasks.Task(func, *, blueprint, name=None, aliases=None, retry=False, pass_context=False, queue, lock=None, queueing_lock=None)

A task is a function that should be executed later. It is linked to a default queue, and expects keyword arguments.

name

Name of the task, usually the dotted path of the decorated function.

Type

str

aliases

Additional names for the task.

Type

List[str]

retry_strategy

Value indicating the retry conditions in case of procrastinate.jobs.Job error.

Type

RetryStrategy

pass_context

If True, passes the task execution context as first positional argument on procrastinate.jobs.Job execution.

Type

bool

queue

Default queue to send deferred jobs to. The queue can be overridden when a job is deferred.

Type

str

lock

Default lock. The lock can be overridden when a job is deferred.

Type

Optional[str]

queueing_lock

Default queueing lock. The queuing lock can be overridden when a job is deferred.

Type

Optional[str]

configure(*, lock=None, queueing_lock=None, task_kwargs=None, schedule_at=None, schedule_in=None, queue=None)

Configure the job with all the specific settings, defining how the job should be launched.

You should call the defer method (see Task.defer) on the resulting object, with the job task parameters.

Parameters
  • lock (Optional[str]) – No two jobs with the same lock string can run simultaneously

  • queueing_lock (Optional[str]) – No two jobs with the same queueing lock can be waiting in the queue. Task.defer will raise an AlreadyEnqueued exception if there already is a job waiting in the queue with same queueing lock.

  • task_kwargs (Optional[Dict[str, Union[str, int, float, bool, None, Dict[str, Any], List[Any]]]]) – Arguments for the job task. You can also pass them to Task.defer. If you pass both, they will be updated (Task.defer has priority)

  • schedule_at (Optional[datetime]) – A datetime before which the job should not be launched (incompatible with schedule_in)

  • schedule_in (Optional[Dict[str, int]]) – A dict with kwargs for a python timedelta, for example {'minutes': 5}. Converted to schedule_at internally. See python timedelta documentation (incompatible with schedule_at)

  • queue (Optional[str]) – By setting a queue on the job launch, you override the task default queue

Returns

An object with a defer method, identical to Task.defer

Return type

jobs.JobDeferrer

Raises

ValueError – If you try to define both schedule_at and schedule_in

defer(**task_kwargs)

Create a job from this task and the given arguments. The job will be created with default parameters, if you want to better specify when and how to launch this job, see Task.configure.

Return type

int

async defer_async(**task_kwargs)

Create a job from this task and the given arguments. The job will be created with default parameters, if you want to better specify when and how to launch this job, see Task.configure.

Return type

int

When tasks are created with argument pass_context, they are provided a JobContext argument:

class procrastinate.JobContext(*, app=None, worker_name=None, worker_queues=None, worker_id=None, job=None, task=None, job_result=NOTHING, additional_context=NOTHING)

Execution context of a running job. In theory, all attributes are optional. In practice, in a task, they will always be set to their proper value.

app

Procrastinate App running this job

Type

App

worker_name

Name of the worker (may be useful for logging)

Type

str

worker_queues

Queues listened by this worker

Type

Optional[Iterable[str]]

worker_id

In case there are multiple async sub-workers, this is the id of the sub-worker.

Type

int`

job

Current Job instance

Type

Job

task

Current Task instance

Type

Task

Method generated by attrs for class JobContext.

Blueprints

class procrastinate.blueprints.Blueprint

A Blueprint provides a way to declare tasks that can be registered on an App later:

# Create blueprint for all tasks related to the cat
cat_blueprint = Blueprint()

# Declare tasks
@cat_blueprint.task(lock="...")
def feed_cat():
    ...

# Register blueprint (will register ``cat:path.to.feed_cat``)
app.add_tasks_from(cat_blueprint, namespace="cat")

A blueprint can add tasks from another blueprint:

blueprint_a, blueprint_b = Blueprint(), Blueprint()

@blueprint_b.task(lock="...")
def my_task():
    ...

blueprint_a.add_tasks_from(blueprint_b, namespace="b")

# Registers task "a:b:path.to.my_task"
app.add_tasks_from(blueprint_a, namespace="a")
Raises

UnboundTaskError: – Calling a blueprint task before the it is bound to an App will raise a UnboundTaskError error:: blueprint = Blueprint() # Declare tasks @blueprint.task def my_task(): … >>> my_task.defer() Traceback (most recent call last): File “…” UnboundTaskError: …

add_task_alias(task, alias)

Add an alias to a task. This can be useful if a task was in a given Blueprint and moves to a different blueprint.

Parameters
  • task (Task) – Task to alias

  • alias (str) – New alias (including potential namespace, separated with :)

Return type

None

add_tasks_from(blueprint, *, namespace)

Copies over all tasks from a different blueprint, prefixing their names with the given namespace (using : as namespace separator).

Parameters
  • blueprint (Blueprint) – Blueprint to copy tasks from

  • namespace (str) – All task names (but not aliases) will be prefixed by this name, uniqueness will be enforced.

Raises

TaskAlreadyRegistered: – When trying to use a namespace that has already been used before

Return type

None

task(_func=None, *, name=None, aliases=None, retry=False, pass_context=False, queue='default', lock=None, queueing_lock=None)

Declare a function as a task. This method is meant to be used as a decorator:

@app.task(...)
def my_task(args):
    ...

or:

@app.task
def my_task(args):
    ...

The second form will use the default value for all parameters.

Parameters
  • _func (Optional[Callable]) – The decorated function

  • queue (str) – The name of the queue in which jobs from this task will be launched, if the queue is not overridden at launch. Default is "default". When a worker is launched, it can listen to specific queues, or to all queues.

  • lock (Optional[str]) – Default value for the lock (see Task.defer).

  • queueing_lock (Optional[str]) – Default value for the queueing_lock (see Task.defer).

  • name (Optional[str]) – Name of the task, by default the full dotted path to the decorated function. if the function is nested or dynamically defined, it is important to give it a unique name, and to make sure the module that defines this function is listed in the import_paths of the procrastinate.App.

  • aliases (Optional[List[str]]) – Additional names for the task. The main use case is to gracefully rename tasks by moving the old name to aliases (these tasks can have been scheduled in a distant future, so the aliases can remain for a long time).

  • retry (Union[bool, int, RetryStrategy]) –

    Details how to auto-retry the task if it fails. Can be:

    • A boolean: will either not retry or retry indefinitely

    • An int: the number of retries before it gives up

    • A procrastinate.RetryStrategy instance for complex cases

    Default is no retry.

  • pass_context (bool) – Passes the task execution context in the task as first

Return type

Any

Builtin tasks

Procrastinate has builtin tasks that are all available from the CLI. For all tasks, the context argument will be passed automatically. The name of the tasks will be: builtin:procrastinate.builtin.<task_name>

Jobs

class procrastinate.jobs.Job(*, id=None, status=None, queue, lock, queueing_lock, task_name, task_kwargs=NOTHING, scheduled_at=None, attempts=0)

A job is the launching of a specific task with specific values for the keyword arguments.

id

Internal id uniquely identifying the job.

Type

Optional[int]

status

Status of the job.

Type

Optional[str]

queue

Queue name the job will be run in.

Type

str

lock

No two jobs with the same lock string can run simultaneously

Type

Optional[str]

queueing_lock

No two jobs with the same queueing lock can be waiting in the queue.

Type

Optional[str]

task_name

Name of the associated task.

Type

str

task_kwargs

Arguments used to call the task.

Type

Dict[str, Union[str, int, float, bool, None, Dict[str, Any], List[Any]]]

scheduled_at

Date and time after which the job is expected to run.

Type

Optional[datetime.datetime]

attempts

Number of times the job has been tried.

Type

int

Method generated by attrs for class Job.

Retry strategies

A retry strategy class lets procrastinate know what to do when a job fails: should it try again? And when?

class procrastinate.RetryStrategy(*, max_attempts=None, wait=0, linear_wait=0, exponential_wait=0, retry_exceptions=None)

The RetryStrategy class should handle classic retry strategies.

You can mix and match several waiting strategies. The formula is:

total_wait = wait + lineal_wait * attempts + exponential_wait ** (attempts + 1)
Parameters
  • max_attempts (Optional[int]) – The maximum number of attempts the job should be retried

  • wait (int) – Use this if you want to use a constant backoff. Give a number of seconds as argument, it will be used to compute the backoff. (e.g. if 3, then successive runs will wait 3, 3, 3, 3, 3 seconds)

  • linear_wait (int) – Use this if you want to use a linear backoff. Give a number of seconds as argument, it will be used to compute the backoff. (e.g. if 3, then successive runs will wait 0, 3, 6, 9, 12 seconds)

  • exponential_wait (int) – Use this if you want to use an exponential backoff. Give a number of seconds as argument, it will be used to compute the backoff. (e.g. if 3, then successive runs will wait 3, 9, 27, 81, 243 seconds)

  • retry_exceptions (Optional[Iterable[Type[Exception]]]) – Define the exception types you want to retry on. If you don’t, jobs will be retried on any type of exceptions

Method generated by attrs for class RetryStrategy.

class procrastinate.BaseRetryStrategy

If you want to implement your own retry strategy, you can inherit from this class. Child classes only need to implement get_schedule_in.

get_schedule_in(*, exception, attempts)
Parameters

attempts (int) – The number of previous attempts for the current job. The first time a job is run, attempts will be 0.

Returns

If a job should not be retried, this function should return None. Otherwise, it should return the duration after which to schedule the new job run, in seconds.

Return type

Optional[int]

Exceptions

exception procrastinate.exceptions.AlreadyEnqueued(message=None)

There is already a job waiting in the queue with the same queueing lock.

exception procrastinate.exceptions.AppNotOpen(message=None)

App was not open. Procrastinate App needs to be opened using:

  • app.open(),

  • await app.open_async(),

  • with app.open():,

  • async with app.open_async():.

exception procrastinate.exceptions.ConnectorException(message=None)

Database error.

exception procrastinate.exceptions.LoadFromPathError

App was not found at the provided path, or the loaded object is not an App.

exception procrastinate.exceptions.ProcrastinateException(message=None)

Unexpected Procrastinate error.

exception procrastinate.exceptions.TaskNotFound(message=None)

Task cannot be imported.

exception procrastinate.exceptions.UnboundTaskError(message=None)

The Task was used before it was bound to an App. If the task was defined on a Blueprint, ensure that you called App.add_tasks_from before deferring the task.

Job statuses

class procrastinate.jobs.Status(value)

An enumeration with all the possible job statuses.

DOING = 'doing'

A worker is running the job

FAILED = 'failed'

The job ended with an error

SUCCEEDED = 'succeeded'

The job ended successfully

TODO = 'todo'

The job is waiting in a queue

Accessing the jobs in the Database

class procrastinate.manager.JobManager(connector)
async check_connection()

Dummy query, check that the main Procrastinate SQL table exists. Raises if there’s a connection problem.

Returns

True if the table exists, False otherwise.

Return type

bool

defer_job(job)

Sync version of defer_job_async.

Return type

Job

async defer_job_async(job)

Add a job in its queue for later processing by a worker.

Parameters

job (jobs.Job) –

Returns

A copy of the job instance with the id set.

Return type

jobs.Job

async defer_periodic_job(job, periodic_id, defer_timestamp)

Defer a periodic job, ensuring that no other worker will defer a job for the same timestamp.

If the job was deferred, return its id. If the job was not deferred, return None.

Return type

Optional[int]

async delete_old_jobs(nb_hours, queue=None, include_error=False)

Delete jobs that have reached a final state (succeeded or failed).

Parameters
  • nb_hours (int) – Consider jobs that been in a final state for more than nb_hours

  • queue (Optional[str]) – Filter by job queue name

  • include_error (Optional[bool]) – If True, only succeeded jobs will be considered. If False, both succeeded and failed jobs will be considered, False by default

Return type

None

async fetch_job(queues)

Select a job in the queue, and mark it as doing. The worker selecting a job is then responsible for running it, and then to update the DB with the new status once it’s done.

Parameters

queues (Optional[Iterable[str]]) – Filter by job queue names

Returns

None if no suitable job was found. The job otherwise.

Return type

Optional[jobs.Job]

async finish_job(job, status, delete_job)

Set a job to its final state (succeeded or failed).

Parameters
Return type

None

finish_job_by_id(job_id, status, delete_job)

This method is the synchronous counterpart of finish_job_by_id_async.

Return type

None

async finish_job_by_id_async(job_id, status, delete_job)

This method is the asynchronous counterpart of finish_job_by_id.

Return type

None

async get_stalled_jobs(nb_seconds, queue=None, task_name=None)

Return all jobs that have been in doing state for more than a given time.

Parameters
  • nb_seconds (int) – Only jobs that have been in doing state for longer than this will be returned

  • queue (Optional[str]) – Filter by job queue name

  • task_name (Optional[str]) – Filter by job task name

Returns

Return type

Iterable[jobs.Job]

list_jobs(id=None, queue=None, task=None, status=None, lock=None, queueing_lock=None)

List all procrastinate jobs given query filters.

Parameters
  • id (int) – Filter by job ID

  • queue (str) – Filter by job queue name

  • task (str) – Filter by job task name

  • status (str) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str) – Filter by job lock

  • queueing_lock (str) – Filter by job queueing_lock

Returns

Return type

Iterable[jobs.Job]

This method is the synchronous counterpart of list_jobs_async.

async list_jobs_async(id=None, queue=None, task=None, status=None, lock=None, queueing_lock=None)

List all procrastinate jobs given query filters.

Parameters
  • id (int) – Filter by job ID

  • queue (str) – Filter by job queue name

  • task (str) – Filter by job task name

  • status (str) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str) – Filter by job lock

  • queueing_lock (str) – Filter by job queueing_lock

Returns

Return type

Iterable[jobs.Job]

This method is the asynchronous counterpart of list_jobs.

list_locks(queue=None, task=None, status=None, lock=None)

List all locks and number of jobs per lock for each lock value.

Parameters
  • queue (str) – Filter by job queue name

  • task (str) – Filter by job task name

  • status (str) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str) – Filter by job lock

Returns

A list of dictionaries representing locks stats (name, jobs_count, todo, doing, succeeded, failed).

Return type

List[Dict[str, Any]]

This method is the synchronous counterpart of list_locks_async.

async list_locks_async(queue=None, task=None, status=None, lock=None)

List all locks and number of jobs per lock for each lock value.

Parameters
  • queue (str) – Filter by job queue name

  • task (str) – Filter by job task name

  • status (str) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str) – Filter by job lock

Returns

A list of dictionaries representing locks stats (name, jobs_count, todo, doing, succeeded, failed).

Return type

List[Dict[str, Any]]

This method is the asynchronous counterpart of list_locks.

list_queues(queue=None, task=None, status=None, lock=None)

List all queues and number of jobs per status for each queue.

Parameters
  • queue (str) – Filter by job queue name

  • task (str) – Filter by job task name

  • status (str) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str) – Filter by job lock

Returns

A list of dictionaries representing queues stats (name, jobs_count, todo, doing, succeeded, failed).

Return type

List[Dict[str, Any]]

This method is the synchronous counterpart of list_queues_async.

async list_queues_async(queue=None, task=None, status=None, lock=None)

List all queues and number of jobs per status for each queue.

Parameters
  • queue (str) – Filter by job queue name

  • task (str) – Filter by job task name

  • status (str) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str) – Filter by job lock

Returns

A list of dictionaries representing queues stats (name, jobs_count, todo, doing, succeeded, failed).

Return type

List[Dict[str, Any]]

This method is the asynchronous counterpart of list_queues.

list_tasks(queue=None, task=None, status=None, lock=None)

List all tasks and number of jobs per status for each task.

Parameters
  • queue (str) – Filter by job queue name

  • task (str) – Filter by job task name

  • status (str) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str) – Filter by job lock

Returns

A list of dictionaries representing tasks stats (name, jobs_count, todo, doing, succeeded, failed).

Return type

List[Dict[str, Any]]

This method is the synchronous counterpart of list_tasks_async.

async list_tasks_async(queue=None, task=None, status=None, lock=None)

List all tasks and number of jobs per status for each task.

Parameters
  • queue (str) – Filter by job queue name

  • task (str) – Filter by job task name

  • status (str) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str) – Filter by job lock

Returns

A list of dictionaries representing tasks stats (name, jobs_count, todo, doing, succeeded, failed).

Return type

List[Dict[str, Any]]

This method is the asynchronous counterpart of list_tasks.

async listen_for_jobs(*, event, queues=None)

Listens to defer operation in the database, and raises the event each time an defer operation is seen.

This coroutine either returns None upon calling if it cannot start listening or does not return and needs to be canceled to end.

Parameters
  • event (asyncio.Event) – This event will be set each time a defer operation occurs

  • queues (Optional[Iterable[str]]) – If None, all defer operations will be considered. If an iterable of queue names is passed, only defer operations on those queues will be considered. Defaults to None

Return type

None

async retry_job(job, retry_at=None)

Indicates that a job should be retried later.

Parameters
  • job (jobs.Job) –

  • retry_at (Optional[datetime.datetime]) – If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC). Defaults to present time.

Return type

None

retry_job_by_id(job_id, retry_at)

This method is the synchronous counterpart of retry_job_by_id_async.

Return type

None

async retry_job_by_id_async(job_id, retry_at)

This method is the asynchronous counterpart of retry_job_by_id.

Return type

None

Django

procrastinate.contrib.django.connector_params(alias='default')

Returns parameters for in a format that is suitable to be passed to a connector constructor (see Use Procrastinate in a Django application).

Parameters

alias (str) – Alias of the database, to read in the keys of settings.DATABASES, by default default.

Returns

Provide these keyword arguments when instantiating your connector

Return type

Dict[str, Any]

SQLAlchemy

class procrastinate.contrib.sqlalchemy.SQLAlchemyPsycopg2Connector(*, dsn='postgresql://', json_dumps=None, json_loads=None, **kwargs)

Synchronous connector based on SQLAlchemy with Psycopg2.

This is used if you want your .defer() calls to be purely synchronous, not asynchronous with a sync wrapper. You may need this if your program is multi-threaded and doen’t handle async loops well (see Synchronous deferring).

All other arguments than dsn, json_dumps, and json_loads are passed to create_engine() (see SQLAlchemy documentation).

Parameters
  • dsn (The dsn string or URL object passed to SQLAlchemy’s create_engine) – function. Ignored if the engine is externally created and set into the connector through the App.open method.

  • json_dumps (Optional[Callable]) – The JSON dumps function to use for serializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc.

  • json_loads (Optional[Callable]) – The JSON loads function to use for deserializing job arguments. Defaults Python’s json.loads function.