API Reference

App

class procrastinate.App(*, connector: BaseConnector, import_paths: Iterable[str] | None = None, worker_defaults: WorkerOptions | None = None, periodic_defaults: dict | None = None)

The App is the main entry point for procrastinate integration.

Instantiate a single App in your code and use it to decorate your tasks with App.task.

You can run a worker with App.run_worker.

Parameters:
  • connector (connector_module.BaseConnector) – Typically an AiopgConnector. It will be responsible for all communications with the database. Mandatory.

  • import_paths (Iterable[str] | None) – List of python dotted paths of modules to import, to make sure that the workers know about all possible tasks. If there are tasks in a module that is neither imported as a side effect of importing the App, nor specified in this list, and a worker encounters a task defined in that module, the task will fail (TaskNotFound). While it is not mandatory to specify paths to modules that you know have already been imported, it’s a good idea to do so.

  • worker_defaults (WorkerOptions | None) – All the values passed here will override the default values sent when launching a worker. See App.run_worker for details.

  • periodic_defaults (dict | None) –

    Parameters for fine tuning the periodic tasks deferrer. Available parameters are:

    • max_delay: float, in seconds. When a worker starts and there’s a periodic task that hasn’t been deferred yet, it will try to defer the task only if it has been overdue for less time than specified by this parameter. If the task has been overdue for longer, the worker will wait until the next scheduled execution. This mechanism prevents newly added periodic tasks from being immediately deferred. Additionally, it ensures that periodic tasks, which were not deferred due to system outages, are not deferred upon application recovery (provided that the outage duration is longer than max_delay), that’s especially important for tasks intended to run during off-peak hours, such as intensive nightly tasks. (defaults to 10 minutes)

add_task_alias(task: Task, alias: str) None

Add an alias to a task. This can be useful if a task was in a given Blueprint and moves to a different blueprint.

Parameters:
  • task (Task) – Task to alias

  • alias (str) – New alias (including potential namespace, separated with :)

Return type:

None

add_tasks_from(blueprint: Blueprint, *, namespace: str) None

Copies over all tasks from a different blueprint, prefixing their names with the given namespace (using : as namespace separator).

Parameters:
  • blueprint (Blueprint) – Blueprint to copy tasks from

  • namespace (str) – All task names (but not aliases) will be prefixed by this name, uniqueness will be enforced.

Raises:

TaskAlreadyRegistered: – When trying to use a namespace that has already been used before

Return type:

None

configure_task(name: str, *, allow_unknown: bool = True, **kwargs: Any) JobDeferrer

Configure a task for deferring, using its name

Parameters:
  • name (str) – Name of the task. If not explicitly defined, this will be the dotted path to the task (my.module.my_task)

  • **kwargs (Any) – Parameters from Task.configure

  • allow_unknown (bool)

  • **kwargs

Returns:

Launch .defer(**task_kwargs) on this object to defer your job.

Return type:

JobDeferrer

classmethod from_path(dotted_path: str) App

Create an App object by dynamically loading the object at the given path.

Parameters:

dotted_path (str) – Dotted path to the object to load (e.g. mymodule.submodule.procrastinate_app)

Return type:

App

job_manager: manager.JobManager

The JobManager linked to the application

open(pool_or_engine: Any | None = None) App

Open the app synchronously.

Parameters:

pool_or_engine (Any | None) – Optional pool. Procrastinate can use an existing pool. Connection parameters passed in the constructor will be ignored. In case the SQLAlchemy connector is used, this can be an engine.

Return type:

App

open_async(pool: Any | None = None) AwaitableContext[App]

Open the app asynchronously.

Parameters:

pool (Any | None) – Optional pool. Procrastinate can use an existing pool. Connection parameters passed in the constructor will be ignored.

Return type:

AwaitableContext[App]

periodic(*, cron: str, periodic_id: str = '', **configure_kwargs: Unpack[ConfigureTaskOptions]) Callable[[Task[P, R, Concatenate[int, periodic.Args]]], Task[P, R, periodic.Args]]

Task decorator, marks task as being scheduled for periodic deferring (see howto/advanced/cron).

Parameters:
  • cron (str) – Cron-like string. Optionally add a 6th column for seconds.

  • periodic_id (str) – Task name suffix. Used to distinguish periodic tasks with different kwargs.

  • **kwargs – Additional parameters are passed to Task.configure.

  • configure_kwargs (Unpack[ConfigureTaskOptions])

Return type:

Callable[[Task[P, R, Concatenate[int, periodic.Args]]], Task[P, R, periodic.Args]]

run_worker(**kwargs: Any) None

Synchronous version of App.run_worker_async. Create the event loop and open the app, then run the worker. The app and the event loop are closed at the end of the function.

Parameters:

kwargs (Any)

Return type:

None

async run_worker_async(**kwargs: Unpack[WorkerOptions]) None

Run a worker. This worker will run in the foreground and execute the jobs in the provided queues. If wait is True, the function will not return until the worker stops (most probably when it receives a stop signal). The default values of all parameters presented here can be overridden at the App level.

Parameters:
  • queues (Optional[Iterable[str]]) – List of queues to listen to, or None to listen to every queue (defaults to None).

  • wait (bool) – If False, the worker will terminate as soon as it has caught up with the queues. If True, the worker will work until it is stopped by a signal (ctrl+c, SIGINT, SIGTERM) (defaults to True).

  • concurrency (int) – Indicates how many asynchronous jobs the worker can run in parallel. Do not use concurrency if you have synchronous blocking tasks. See howto/production/concurrency (defaults to 1).

  • name (Optional[str]) – Name of the worker. Will be passed in the JobContext and used in the logs (defaults to None which will result in the worker named worker).

  • fetch_job_polling_interval (float) –

    Maximum time (in seconds) between database job polls.

    Controls the frequency of database queries for new jobs to start.

    When listen_notify is True, the polling interval acts as a fallback mechanism and can reasonably be set to a higher value.

    (defaults to 5.0)

  • abort_job_polling_interval (float) –

    Maximum time (in seconds) between database abort requet polls.

    Controls the frequency of database queries for abort requests

    When listen_notify is True, the polling interval acts as a fallback mechanism and can reasonably be set to a higher value.

    (defaults to 5.0)

  • shutdown_graceful_timeout (float) –

    Indicates the maximum duration (in seconds) the worker waits for jobs to complete when requested to stop. Jobs that have not been completed by that time are aborted. A value of None corresponds to no timeout.

    (defaults to None)

  • listen_notify (bool) –

    If True, allocates a connection from the pool to listen for: - new job availability - job abort requests

    Provides lower latency for job updates compared to polling alone.

    Note: Worker polls the database regardless of this setting. (defaults to True)

  • delete_jobs (str) – If always, the worker will automatically delete all jobs on completion. If successful the worker will only delete successful jobs. If never, the worker will keep the jobs in the database. (defaults to never)

  • additional_context (Optional[Dict[str, Any]]) – If set extend the context received by the tasks when pass_context is set to True in the task definition.

  • install_signal_handlers (bool) – If True, the worker will install signal handlers to gracefully stop the worker. Use False if you want to handle signals yourself (e.g. if you run the work as an async task in a bigger application) (defaults to True)

  • update_heartbeat_interval (float) – Time in seconds between heartbeat updates of the worker. (defaults to 10)

  • stalled_worker_timeout (float) – Time in seconds after which a worker is considered stalled if no heartbeat has been received. A worker prunes stalled workers from the database at startup. (defaults to 30)

  • task_middleware (Optional[list[TaskMiddleware]]) – A list of middlewares wrapping every task this worker runs. Sync middlewares apply to sync tasks, async middlewares to async tasks. See howto/advanced/middleware. (defaults to no middleware)

  • worker_middleware (Optional[list[WorkerMiddleware]]) – A list of always-async middlewares wrapping every job this worker runs, on the event loop (both sync and async tasks). See howto/advanced/middleware. (defaults to no middleware)

  • kwargs (Unpack[WorkerOptions])

Return type:

None

task(_func: Callable[P, R] | None = None, *, name: str | None = None, aliases: list[str] | None = None, retry: retry.RetryValue = False, pass_context: bool = False, queue: str = 'default', priority: int = 0, lock: str | None = None, queueing_lock: str | None = None, task_middleware: list[middleware.TaskMiddleware] | None = None)

Declare a function as a task. This method is meant to be used as a decorator:

@app.task(...)
def my_task(args):
    ...

or:

@app.task
def my_task(args):
    ...

The second form will use the default value for all parameters.

Parameters:
  • _func (Callable[P, R] | None) – The decorated function, when the decorator is used without parentheses (@app.task). Don’t pass it explicitly.

  • name (str | None) – Name of the task, by default the full dotted path to the decorated function. If the function is nested or dynamically defined, it is important to give it a unique name, and to make sure the module that defines this function is listed in the import_paths of the procrastinate.App.

  • aliases (list[str] | None) – Additional names for the task. The main use case is to gracefully rename tasks by moving the old name to aliases (these tasks can have been scheduled in a distant future, so the aliases can remain for a long time).

  • retry (retry.RetryValue) –

    Details how to auto-retry the task if it fails. Can be:

    • A boolean: will either not retry or retry indefinitely

    • An int: the number of retries before it gives up

    • A procrastinate.RetryStrategy instance for complex cases

    Default is no retry.

  • pass_context (bool) – Passes the task execution context in the task as first argument.

  • queue (str) – The name of the queue in which jobs from this task will be launched, if the queue is not overridden at launch. Default is "default". When a worker is launched, it can listen to specific queues, or to all queues.

  • priority (int) – Default priority (an integer) of jobs that are deferred from this task. Jobs with higher priority are run first. Priority can be positive or negative. If no default priority is set then the default priority is 0.

  • lock (str | None) – Default value for the lock (see Task.defer).

  • queueing_lock (str | None) – Default value for the queueing_lock (see Task.defer).

  • task_middleware (list[middleware.TaskMiddleware] | None) – A list of middlewares wrapping this task’s execution. Each must match the task’s nature: sync middleware (a plain function) for a sync task, async middleware (a coroutine function) for an async task. See howto/advanced/middleware.

tasks: dict[str, Task]

The mapping of all tasks known by the app. Only procrastinate is expected to make changes to this mapping.

with_connector(connector: BaseConnector) App

Create another app instance sychronized with this one, with a different connector.

Deprecated since version 2.14.0: Use replace_connector instead. Because this method creates a new app that references the same tasks, and the task have a link back to the app, using this method can lead to unexpected behavior.

Parameters:

connector (BaseConnector) – The new connector to use.

Returns:

A new app with the same tasks.

Return type:

App

Connectors

class procrastinate.PsycopgConnector(*, json_dumps: ~collections.abc.Callable | None = None, json_loads: ~collections.abc.Callable | None = None, pool_factory: ~collections.abc.Callable[[...], ~psycopg_pool.pool_async.AsyncConnectionPool] = <class 'psycopg_pool.pool_async.AsyncConnectionPool'>, **kwargs: ~typing.Any)

Create a PostgreSQL connector using psycopg. The connector uses an psycopg_pool.AsyncConnectionPool, which is created internally, or set into the connector by calling App.open_async. You can also pass custom callable which returns psycopg_pool.AsyncConnectionPool instance as pool_factory kwarg.

All other arguments than pool_factory, json_dumps and json_loads are passed to pool_factory callable (see psycopg documentation).

json_dumps and json_loads are used to configure new connections created by the pool with psycopg.types.json.set_json_dumps and psycopg.types.json.set_json_loads.

Parameters:
  • json_dumps (collections.abc.Callable | None) – A function to serialize JSON objects to a string. If not provided, JSON objects will be serialized using psycopg’s default JSON serializer.

  • json_loads (collections.abc.Callable | None) – A function to deserialize JSON objects from a string. If not provided, JSON objects will be deserialized using psycopg’s default JSON deserializer.

  • pool_factory (Callable[..., psycopg_pool.AsyncConnectionPool]) – A callable which returns psycopg_pool.AsyncConnectionPool instance. kwargs will be passed to this callable as keyword arguments. Default is psycopg_pool.AsyncConnectionPool. You can set this to psycopg_pool.AsyncNullConnectionPool to disable pooling.

  • kwargs (Any)

class procrastinate.SyncPsycopgConnector(*, json_dumps: Callable | None = None, json_loads: Callable | None = None, **kwargs: Any)

Create a PostgreSQL connector using psycopg. The connector uses an psycopg_pool.ConnectionPool, which is created internally, or set into the connector by calling App.open.

Note that if you want to use a psycopg_pool.NullConnectionPool, you will need to initialize it yourself and pass it to the connector through the App.open method.

All other arguments than json_dumps and json_loads are passed to psycopg_pool.ConnectionPool (see psycopg documentation).

json_dumps and json_loads are used to configure new connections created by the pool with psycopg.types.json.set_json_dumps and psycopg.types.json.set_json_loads.

Parameters:
  • json_dumps (collections.abc.Callable | None) – A function to serialize JSON objects to a string. If not provided, JSON objects will be serialized using psycopg’s default JSON serializer.

  • json_loads (collections.abc.Callable | None) – A function to deserialize JSON objects from a string. If not provided, JSON objects will be deserialized using psycopg’s default JSON deserializer.

  • kwargs (Any)

class procrastinate.contrib.aiopg.AiopgConnector(*, json_dumps: Callable | None = None, json_loads: Callable | None = None, **kwargs: Any)

Create a PostgreSQL connector using aiopg. The connector uses an aiopg.Pool, which is created internally, or set into the connector by calling AiopgConnector.open_async.

The pool connection parameters can be provided here. Alternatively, an already existing aiopg.Pool can be provided in the App.open_async, via the pool parameter.

All other arguments than json_dumps and json_loads are passed to aiopg.create_pool() (see aiopg documentation), with default values that may differ from those of aiopg (see the list of parameters below).

Parameters:
  • json_dumps (collections.abc.Callable | None) – The JSON dumps function to use for serializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc.

  • json_loads (collections.abc.Callable | None) – The JSON loads function to use for deserializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc. Unused if the pool is externally created and set into the connector through the App.open_async method.

  • dsn (Optional[str]) – Passed to aiopg. Default is “” instead of None, which means if no argument is passed, it will connect to localhost:5432 instead of a Unix-domain local socket file.

  • enable_json (bool) – Passed to aiopg. Default is False instead of True to avoid messing with the global state.

  • enable_hstore (bool) – Passed to aiopg. Default is False instead of True to avoid messing with the global state.

  • enable_uuid (bool) – Passed to aiopg. Default is False instead of True to avoid messing with the global state.

  • cursor_factory (psycopg2.extensions.cursor) – Passed to aiopg. Default is psycopg2.extras.RealDictCursor instead of standard cursor. There is no identified use case for changing this.

  • maxsize (int) – Passed to aiopg. If value is 1, then listen/notify feature will be deactivated.

  • minsize (int) – Passed to aiopg. Initial connections are not opened when the connector is created, but at first use of the pool.

  • kwargs (Any)

class procrastinate.contrib.psycopg2.Psycopg2Connector(*, json_dumps: Callable | None = None, json_loads: Callable | None = None, **kwargs: Any)

Synchronous connector based on a psycopg2.pool.ThreadedConnectionPool.

All other arguments than json_dumps are passed to ThreadedConnectionPool() (see psycopg2 documentation), with default values that may differ from those of psycopg2 (see a partial list of parameters below).

Parameters:
  • json_dumps (collections.abc.Callable | None) – The JSON dumps function to use for serializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc.

  • json_loads (collections.abc.Callable | None) – The JSON loads function to use for deserializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc. Unused if the pool is externally created and set into the connector through the App.open method.

  • minconn (int) – Passed to psycopg2, default set to 1 (same as aiopg).

  • maxconn (int) – Passed to psycopg2, default set to 10 (same as aiopg).

  • dsn (Optional[str]) – Passed to psycopg2. Default is “” instead of None, which means if no argument is passed, it will connect to localhost:5432 instead of a Unix-domain local socket file.

  • cursor_factory (psycopg2.extensions.cursor) – Passed to psycopg2. Default is psycopg2.extras.RealDictCursor instead of standard cursor. There is no identified use case for changing this.

  • kwargs (Any)

class procrastinate.testing.InMemoryConnector

An InMemoryConnector may be used for testing only. Tasks are not persisted and will be lost when the process ends.

While implementing the Connector interface, it also adds a few methods and attributes to ease testing.

jobs: dict[int, JobRow]

Mapping of {<job id>: <Job database row as a dictionary>}

reset() None

Removes anything the in-memory pseudo-database contains, to ensure test independence.

Return type:

None

Tasks

class procrastinate.tasks.Task(func: Callable[P, R], *, blueprint: blueprints.Blueprint, name: str | None = None, aliases: list[str] | None = None, retry: retry_module.RetryValue = False, pass_context: bool = False, queue: str, priority: int = 0, lock: str | None = None, queueing_lock: str | None = None, task_middleware: list[middleware_module.TaskMiddleware] | None = None)

A task is a function that should be executed later. It is linked to a default queue, and expects keyword arguments.

Parameters:
  • func (Callable[P, R])

  • blueprint (blueprints.Blueprint)

  • name (str | None)

  • aliases (list[str] | None)

  • retry (retry_module.RetryValue)

  • pass_context (bool)

  • queue (str)

  • priority (int)

  • lock (str | None)

  • queueing_lock (str | None)

  • task_middleware (list[middleware_module.TaskMiddleware] | None)

aliases: list[str]

Additional names for the task.

configure(**options: Unpack[ConfigureTaskOptions]) JobDeferrer

Configure the job with all the specific settings, defining how the job should be launched.

You should call the defer method (see Task.defer) on the resulting object, with the job task parameters.

Parameters:
  • lock – No two jobs with the same lock string can run simultaneously

  • queueing_lock – No two jobs with the same queueing lock can be waiting in the queue. Task.defer will raise an AlreadyEnqueued exception if there already is a job waiting in the queue with same queueing lock.

  • task_kwargs – Arguments for the job task. You can also pass them to Task.defer. If you pass both, they will be updated (Task.defer has priority)

  • schedule_at – A datetime before which the job should not be launched (incompatible with schedule_in)

  • schedule_in – A dict with kwargs for a python timedelta, for example {'minutes': 5}. Converted to schedule_at internally. See python timedelta documentation (incompatible with schedule_at)

  • queue – By setting a queue on the job launch, you override the task default queue

  • priority – Set the priority of the job as an integer. Jobs with higher priority are run first. Priority can be positive or negative. The default priority is 0.

  • connection – An optional external database connection. When provided, the job INSERT will run on this connection instead of the connector’s pool, allowing atomic deferral within a user-managed transaction. The user is responsible for committing. Supported by SyncPsycopgConnector, PsycopgConnector, and SQLAlchemyPsycopg2Connector.

  • options (Unpack[ConfigureTaskOptions])

Returns:

An object with a defer method, identical to Task.defer

Raises:

ValueError – If you try to define both schedule_at and schedule_in

Return type:

JobDeferrer

defer(*_: ~Args, **task_kwargs: ~Args) int

Create a job from this task and the given arguments. The job will be created with default parameters, if you want to better specify when and how to launch this job, see Task.configure.

Parameters:
  • _ (~Args)

  • task_kwargs (~Args)

Return type:

int

async defer_async(*_: ~Args, **task_kwargs: ~Args) int

Create a job from this task and the given arguments. The job will be created with default parameters, if you want to better specify when and how to launch this job, see Task.configure.

Parameters:
  • _ (~Args)

  • task_kwargs (~Args)

Return type:

int

lock: str | None

Default lock. The lock can be overridden when a job is deferred.

name: str

Name of the task, usually the dotted path of the decorated function.

pass_context: bool

If True, passes the task execution context as first positional argument on procrastinate.jobs.Job execution.

queue: str

Default queue to send deferred jobs to. The queue can be overridden when a job is deferred.

queueing_lock: str | None

Default queueing lock. The queuing lock can be overridden when a job is deferred.

retry_strategy: retry_module.BaseRetryStrategy | None

Value indicating the retry conditions in case of procrastinate.jobs.Job error.

When tasks are created with argument pass_context, they are provided a JobContext argument:

class procrastinate.JobContext(*, app: App, worker_name: str | None = None, worker_queues: Iterable[str] | None = None, job: Job, start_timestamp: float, additional_context: dict = NOTHING, abort_reason: Callable[[], AbortReason | None])

Execution context of a running job.

Method generated by attrs for class JobContext.

Parameters:
  • app (App)

  • worker_name (str | None)

  • worker_queues (Iterable[str] | None)

  • job (Job)

  • start_timestamp (float)

  • additional_context (dict)

  • abort_reason (Callable[[], AbortReason | None])

app: App

Procrastinate App running this job

job: Job

Corresponding Job

should_abort() bool

Returns True if the job should be aborted: in that case, the job should stop processing as soon as possible and raise raise JobAborted

Return type:

bool

property task: Task

The Task associated to the job

worker_name: str | None

Name of the worker (may be useful for logging)

worker_queues: Iterable[str] | None

Queues listened by this worker

Middleware

procrastinate.middleware.TaskMiddleware

A task middleware wraps the execution of a single task. It is a callable taking (call_next, context, worker) — where call_next runs the next middleware or the task itself, context is the JobContext and worker is the running worker — and must call (or await) call_next() and return the result. Sync middlewares (plain def) wrap sync tasks; async middlewares (async def) wrap async tasks. See Add middleware.

procrastinate.middleware.WorkerMiddleware

A worker middleware wraps the execution of every job a worker runs, on the event loop. It is a callable taking (call_next, context, worker) and must be async def. Unlike a task middleware it is always async and wraps both sync and async tasks. See Add middleware.

alias of Callable[[Callable[[], Awaitable[Any]], job_context.JobContext, worker.Worker], Awaitable[Any]]

Registering a task middleware whose sync/async nature doesn’t match the task or a worker middleware that isn’t async raises MiddlewareKindMismatch.

Blueprints

class procrastinate.blueprints.Blueprint

A Blueprint provides a way to declare tasks that can be registered on an App later:

# Create blueprint for all tasks related to the cat
cat_blueprint = Blueprint()

# Declare tasks
@cat_blueprint.task(lock="...")
def feed_cat():
    ...

# Register blueprint (will register ``cat:path.to.feed_cat``)
app.add_tasks_from(cat_blueprint, namespace="cat")

A blueprint can add tasks from another blueprint:

blueprint_a, blueprint_b = Blueprint(), Blueprint()

@blueprint_b.task(lock="...")
def my_task():
    ...

blueprint_a.add_tasks_from(blueprint_b, namespace="b")

# Registers task "a:b:path.to.my_task"
app.add_tasks_from(blueprint_a, namespace="a")
Raises:

UnboundTaskError: – Calling a blueprint task before the it is bound to an App will raise a UnboundTaskError error:: blueprint = Blueprint() # Declare tasks @blueprint.task def my_task(): … >>> my_task.defer() Traceback (most recent call last): File “…” UnboundTaskError: …

add_task_alias(task: Task, alias: str) None

Add an alias to a task. This can be useful if a task was in a given Blueprint and moves to a different blueprint.

Parameters:
  • task (Task) – Task to alias

  • alias (str) – New alias (including potential namespace, separated with :)

Return type:

None

add_tasks_from(blueprint: Blueprint, *, namespace: str) None

Copies over all tasks from a different blueprint, prefixing their names with the given namespace (using : as namespace separator).

Parameters:
  • blueprint (Blueprint) – Blueprint to copy tasks from

  • namespace (str) – All task names (but not aliases) will be prefixed by this name, uniqueness will be enforced.

Raises:

TaskAlreadyRegistered: – When trying to use a namespace that has already been used before

Return type:

None

periodic(*, cron: str, periodic_id: str = '', **configure_kwargs: Unpack[ConfigureTaskOptions]) Callable[[Task[P, R, Concatenate[int, periodic.Args]]], Task[P, R, periodic.Args]]

Task decorator, marks task as being scheduled for periodic deferring (see howto/advanced/cron).

Parameters:
  • cron (str) – Cron-like string. Optionally add a 6th column for seconds.

  • periodic_id (str) – Task name suffix. Used to distinguish periodic tasks with different kwargs.

  • **kwargs – Additional parameters are passed to Task.configure.

  • configure_kwargs (Unpack[ConfigureTaskOptions])

Return type:

Callable[[Task[P, R, Concatenate[int, periodic.Args]]], Task[P, R, periodic.Args]]

task(*, _func: None = None, name: str | None = None, aliases: list[str] | None = None, retry: bool | int | BaseRetryStrategy = False, pass_context: Literal[False] = False, queue: str = jobs.DEFAULT_QUEUE, priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, task_middleware: list[middleware.TaskMiddleware] | None = None) Callable[[Callable[P, R]], Task[P, R, P]]
task(*, _func: None = None, name: str | None = None, aliases: list[str] | None = None, retry: bool | int | BaseRetryStrategy = False, pass_context: Literal[True], queue: str = jobs.DEFAULT_QUEUE, priority: int = jobs.DEFAULT_PRIORITY, lock: str | None = None, queueing_lock: str | None = None, task_middleware: list[middleware.TaskMiddleware] | None = None) Callable[[Callable[Concatenate[JobContext, P], R]], Task[Concatenate[JobContext, P], R, P]]
task(_func: Callable[[P], R]) Task[P, R, P]

Declare a function as a task. This method is meant to be used as a decorator:

@app.task(...)
def my_task(args):
    ...

or:

@app.task
def my_task(args):
    ...

The second form will use the default value for all parameters.

Parameters:
  • _func – The decorated function, when the decorator is used without parentheses (@app.task). Don’t pass it explicitly.

  • name – Name of the task, by default the full dotted path to the decorated function. If the function is nested or dynamically defined, it is important to give it a unique name, and to make sure the module that defines this function is listed in the import_paths of the procrastinate.App.

  • aliases – Additional names for the task. The main use case is to gracefully rename tasks by moving the old name to aliases (these tasks can have been scheduled in a distant future, so the aliases can remain for a long time).

  • retry

    Details how to auto-retry the task if it fails. Can be:

    • A boolean: will either not retry or retry indefinitely

    • An int: the number of retries before it gives up

    • A procrastinate.RetryStrategy instance for complex cases

    Default is no retry.

  • pass_context – Passes the task execution context in the task as first argument.

  • queue – The name of the queue in which jobs from this task will be launched, if the queue is not overridden at launch. Default is "default". When a worker is launched, it can listen to specific queues, or to all queues.

  • priority – Default priority (an integer) of jobs that are deferred from this task. Jobs with higher priority are run first. Priority can be positive or negative. If no default priority is set then the default priority is 0.

  • lock – Default value for the lock (see Task.defer).

  • queueing_lock – Default value for the queueing_lock (see Task.defer).

  • task_middleware – A list of middlewares wrapping this task’s execution. Each must match the task’s nature: sync middleware (a plain function) for a sync task, async middleware (a coroutine function) for an async task. See howto/advanced/middleware.

Builtin tasks

Procrastinate has builtin tasks that are all available from the CLI. For all tasks, the context argument will be passed automatically. The name of the tasks will be: builtin:procrastinate.builtin.<task_name>

procrastinate.builtin_tasks.remove_old_jobs(context: JobContext, *, max_hours: int, queue: str | None = None, remove_failed: bool | None = False, remove_cancelled: bool | None = False, remove_aborted: bool | None = False) None

This task cleans your database by removing old jobs. Note that jobs and linked events will be irreversibly removed from the database when running this task.

Parameters:
  • max_hours (int) – Only jobs which were finished more than max_hours ago will be deleted.

  • queue (str | None) – The name of the queue in which jobs will be deleted. If not specified, the task will delete jobs from all queues.

  • remove_failed (bool | None) – By default only successful jobs will be removed. When this parameter is True failed jobs will also be deleted.

  • remove_cancelled (bool | None) – By default only successful jobs will be removed. When this parameter is True cancelled jobs will also be deleted.

  • remove_aborted (bool | None) – By default only successful jobs will be removed. When this parameter is True aborted jobs will also be deleted.

  • context (JobContext)

Return type:

None

Jobs

class procrastinate.jobs.Job(*, id: int | None = None, status: str | None = None, queue: str, priority: int = 0, lock: str | None, queueing_lock: str | None, task_name: str, task_kwargs: types.JSONDict = NOTHING, scheduled_at: datetime.datetime | None = None, attempts: int = 0, abort_requested: bool = False, worker_id: int | None = None)

A job is the launching of a specific task with specific values for the keyword arguments.

Method generated by attrs for class Job.

Parameters:
  • id (int | None)

  • status (str | None)

  • queue (str)

  • priority (int)

  • lock (str | None)

  • queueing_lock (str | None)

  • task_name (str)

  • task_kwargs (types.JSONDict)

  • scheduled_at (datetime.datetime | None)

  • attempts (int)

  • abort_requested (bool)

  • worker_id (int | None)

abort_requested: bool

True if the job is requested to abort

attempts: int

Number of times the job has been tried.

id: int | None

Internal id uniquely identifying the job.

lock: str | None

No two jobs with the same lock string can run simultaneously

priority: int

Priority of the job.

queue: str

Queue name the job will be run in.

queueing_lock: str | None

No two jobs with the same queueing lock can be waiting in the queue.

scheduled_at: datetime.datetime | None

Date and time after which the job is expected to run.

status: str | None

Status of the job.

task_kwargs: types.JSONDict

Arguments used to call the task.

task_name: str

Name of the associated task.

worker_id: int | None

ID of the worker that is processing the job

Retry strategies

A retry strategy class lets procrastinate know what to do when a job fails: should it try again? And when?

class procrastinate.RetryStrategy(*, max_attempts: int | None = None, wait: int = 0, linear_wait: int = 0, exponential_wait: int = 0, retry_exceptions: Iterable[type[Exception]] | None = None)

The RetryStrategy class should handle classic retry strategies.

You can mix and match several waiting strategies. The formula is:

total_wait = wait + lineal_wait * attempts + exponential_wait ** (attempts + 1)
Parameters:
  • max_attempts (int | None) – The maximum number of attempts the job should be retried

  • wait (int) – Use this if you want to use a constant backoff. Give a number of seconds as argument, it will be used to compute the backoff. (e.g. if 3, then successive runs will wait 3, 3, 3, 3, 3 seconds)

  • linear_wait (int) – Use this if you want to use a linear backoff. Give a number of seconds as argument, it will be used to compute the backoff. (e.g. if 3, then successive runs will wait 0, 3, 6, 9, 12 seconds)

  • exponential_wait (int) – Use this if you want to use an exponential backoff. Give a number of seconds as argument, it will be used to compute the backoff. (e.g. if 3, then successive runs will wait 3, 9, 27, 81, 243 seconds)

  • retry_exceptions (collections.abc.Iterable[type[Exception]] | None) – Define the exception types you want to retry on. If you don’t, jobs will be retried on any type of exceptions

Method generated by attrs for class RetryStrategy.

class procrastinate.BaseRetryStrategy

If you want to implement your own retry strategy, you can inherit from this class. Child classes only need to implement get_retry_decision.

get_retry_decision(*, exception: BaseException, job: Job) RetryDecision | None
Parameters:
  • exception (BaseException) – The exception raised by the job

  • job (Job) – The current job

Return type:

RetryDecision | None

get_schedule_in(*, exception: BaseException, attempts: int) int | None
Parameters:
  • attempts (int) – The number of previous attempts for the current job. The first time a job is run, attempts will be 0.

  • exception (BaseException)

Returns:

If a job should not be retried, this function should return None. Otherwise, it should return the duration after which to schedule the new job run, in seconds.

Return type:

int | None

Notes

This function is deprecated and will be removed in a future version. Use get_retry_decision instead.

Deprecated since version 2.9: The get_schedule_in method is deprecated.

class procrastinate.RetryDecision(*, retry_at: datetime | None = None, priority: int | None = None, queue: str | None = None, lock: str | None = None)
class procrastinate.RetryDecision(*, retry_in: TimeDeltaParams | None = None, priority: int | None = None, queue: str | None = None, lock: str | None = None)

Specifies when and how a job should be retried.

Parameters:
  • retry_at (datetime.datetime | None) – If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC). Defaults to present time.

  • retry_in (types.TimeDeltaParams | None) – If set, the job will be retried after this duration. If not set, the job will be retried immediately.

  • priority (int | None) – If set, the job will be retried with this priority. If not set, the priority remains unchanged.

  • queue (str | None) – If set, the job will be retried on this queue. If not set, the queue remains unchanged.

  • lock (str | None) – If set, the job will be retried with this lock. If not set, the lock remains unchanged.

Exceptions

exception procrastinate.exceptions.AlreadyEnqueued(message: str | None = None)

There is already a job waiting in the queue with the same queueing lock.

Parameters:

message (str | None)

exception procrastinate.exceptions.AppNotOpen(message: str | None = None)

App was not open. Procrastinate App needs to be opened using:

  • app.open(),

  • await app.open_async(),

  • with app.open():,

  • async with app.open_async():.

Parameters:

message (str | None)

exception procrastinate.exceptions.ConnectorException(message: str | None = None)

Database error.

Parameters:

message (str | None)

exception procrastinate.exceptions.JobAborted(message: str | None = None)

Job was aborted.

Parameters:

message (str | None)

exception procrastinate.exceptions.LoadFromPathError(message: str | None = None)

App was not found at the provided path, or the loaded object is not an App.

Parameters:

message (str | None)

exception procrastinate.exceptions.MiddlewareKindMismatch(message: str | None = None)

A sync task was given an async middleware (or vice versa). Sync tasks require sync middleware; async tasks require async middleware.

Parameters:

message (str | None)

exception procrastinate.exceptions.ProcrastinateException(message: str | None = None)

Unexpected Procrastinate error.

Parameters:

message (str | None)

exception procrastinate.exceptions.TaskNotFound(message: str | None = None)

Task cannot be imported.

Parameters:

message (str | None)

exception procrastinate.exceptions.UnboundTaskError(message: str | None = None)

The Task was used before it was bound to an App. If the task was defined on a Blueprint, ensure that you called App.add_tasks_from before deferring the task.

Parameters:

message (str | None)

Job statuses

class procrastinate.jobs.Status(*values)

An enumeration with all the possible job statuses.

ABORTED = 'aborted'

The job was aborted

ABORTING = 'aborting'

legacy, not used anymore

CANCELLED = 'cancelled'

The job was cancelled

DOING = 'doing'

A worker is running the job

FAILED = 'failed'

The job ended with an error

SUCCEEDED = 'succeeded'

The job ended successfully

TODO = 'todo'

The job is waiting in a queue

Accessing the jobs in the Database

class procrastinate.manager.JobManager(connector: BaseConnector)
Parameters:

connector (connector.BaseConnector)

batch_defer_jobs(jobs: list[Job], connection: Any | None = None) list[Job]

Sync version of batch_defer_jobs_async.

Parameters:
  • jobs (list[Job])

  • connection (Any | None)

Return type:

list[Job]

async batch_defer_jobs_async(jobs: list[Job], connection: Any | None = None) list[Job]

Add multiple jobs in their queue for later processing by a worker.

Parameters:
  • jobs (list[Job]) – The jobs to defer

  • connection (Any | None) – Optional external database connection to use for the query.

Returns:

A list of jobs with their id set.

Return type:

list[Job]

cancel_job_by_id(job_id: int, abort: bool = False, delete_job: bool = False, connection: Any | None = None) bool

Cancel a job by id.

Parameters:
  • job_id (int) – The id of the job to cancel

  • abort (bool) – If True, a job will be marked for abortion, but the task itself has to respect the abortion request. If False, only jobs in todo state will be set to cancelled and won’t be processed by a worker anymore.

  • delete_job (bool) – If True, the job will be deleted from the database after being cancelled. Does not affect the jobs that should be aborted.

  • connection (Any | None) – Optional external database connection to use for the query.

Returns:

If True, the job was cancelled (or its abortion was requested). If False, nothing was done: either there is no job with this id or it’s not in a state where it may be cancelled (i.e. todo or doing)

Return type:

bool

async cancel_job_by_id_async(job_id: int, abort: bool = False, delete_job: bool = False, connection: Any | None = None) bool

Cancel a job by id.

Parameters:
  • job_id (int) – The id of the job to cancel

  • abort (bool) – If True, a job will be marked for abortion, but the task itself has to respect the abortion request. If False, only jobs in todo state will be set to cancelled and won’t be processed by a worker anymore.

  • delete_job (bool) – If True, the job will be deleted from the database after being cancelled. Does not affect the jobs that should be aborted.

  • connection (Any | None) – Optional external database connection to use for the query.

Returns:

If True, the job was cancelled (or its abortion was requested). If False, nothing was done: either there is no job with this id or it’s not in a state where it may be cancelled (i.e. todo or doing)

Return type:

bool

check_connection() bool

Sync version of check_connection_async.

Return type:

bool

async check_connection_async() bool

Dummy query, check that the main Procrastinate SQL table exists. Raises if there’s a connection problem.

Returns:

True if the table exists, False otherwise.

Return type:

bool

defer_job(job: Job, connection: Any | None = None) Job

Sync version of defer_job_async.

Parameters:
  • job (Job)

  • connection (Any | None)

Return type:

Job

async defer_job_async(job: Job, connection: Any | None = None) Job

Add a job in its queue for later processing by a worker.

Parameters:
  • job (Job) – The job to defer

  • connection (Any | None) – Optional external database connection to use for the query.

Returns:

A copy of the job instance with the id set.

Return type:

Job

async defer_periodic_job(job: Job, periodic_id: str, defer_timestamp: int) int | None

Defer a periodic job, ensuring that no other worker will defer a job for the same timestamp.

If the job was deferred, return its id. If the job was not deferred, return None.

Parameters:
  • job (Job)

  • periodic_id (str)

  • defer_timestamp (int)

Return type:

int | None

async delete_old_jobs(nb_hours: int, queue: str | None = None, include_failed: bool | None = False, include_cancelled: bool | None = False, include_aborted: bool | None = False) None

Delete jobs that have reached a final state (succeeded, failed, cancelled, or aborted). By default, only considers jobs that have succeeded.

Parameters:
  • nb_hours (int) – Consider jobs that been in a final state for more than nb_hours

  • queue (str | None) – Filter by job queue name

  • include_failed (bool | None) – If True, also consider errored jobs. False by default

  • include_cancelled (bool | None) – If True, also consider cancelled jobs. False by default.

  • include_aborted (bool | None) – If True, also consider aborted jobs. False by default.

Return type:

None

async fetch_job(queues: Iterable[str] | None, worker_id: int) Job | None

Select a job in the queue, and mark it as doing. The worker selecting a job is then responsible for running it, and then to update the DB with the new status once it’s done.

Parameters:
  • queues (Iterable[str] | None) – Filter by job queue names

  • worker_id (int)

Returns:

None if no suitable job was found. The job otherwise.

Return type:

Job | None

async finish_job(job: Job, status: Status, delete_job: bool) None

Set a job to its final state (succeeded, failed or aborted).

Parameters:
  • job (Job)

  • status (Status) – succeeded, failed or aborted

  • delete_job (bool)

Return type:

None

get_job_status(job_id: int, connection: Any | None = None) Status

Get the status of a job by id.

Parameters:
  • job_id (int) – The id of the job to get the status of

  • connection (Any | None) – Optional external database connection to use for the query.

Return type:

Status

async get_job_status_async(job_id: int, connection: Any | None = None) Status

Get the status of a job by id.

Parameters:
  • job_id (int) – The id of the job to get the status of

  • connection (Any | None) – Optional external database connection to use for the query.

Return type:

Status

async get_stalled_jobs(nb_seconds: int | None = None, queue: str | None = None, task_name: str | None = None, seconds_since_heartbeat: float = 30) Iterable[Job]

Return all jobs that have been in doing state for more than a given time.

Parameters:
  • nb_seconds (int | None) – If set then jobs that have been in doing state for longer than that time in seconds will be returned without considering stalled workers and heartbeats. This parameter is DEPRECATED and will be removed in a next major version. Use this method without this parameter instead to get stalled jobs based on stalled workers and heartbeats.

  • queue (str | None) – Filter by job queue name

  • task_name (str | None) – Filter by job task name

  • seconds_since_heartbeat (float) – Get stalled jobs based on workers that have not sent a heartbeat for longer than this time in seconds. Only used if nb_seconds is not set. Defaults to 30 seconds. When changing it then check also the update_heartbeat_interval and stalled_worker_timeout parameters of the worker.

Return type:

Iterable[Job]

list_jobs(id: int | None = None, queue: str | None = None, task: str | None = None, status: str | None = None, lock: str | None = None, queueing_lock: str | None = None, worker_id: int | None = None) list[Job]

Sync version of list_jobs_async

Parameters:
  • id (int | None)

  • queue (str | None)

  • task (str | None)

  • status (str | None)

  • lock (str | None)

  • queueing_lock (str | None)

  • worker_id (int | None)

Return type:

list[Job]

async list_jobs_async(id: int | None = None, queue: str | None = None, task: str | None = None, status: str | None = None, lock: str | None = None, queueing_lock: str | None = None, worker_id: int | None = None) Iterable[Job]

List all procrastinate jobs given query filters.

Parameters:
  • id (int | None) – Filter by job ID

  • queue (str | None) – Filter by job queue name

  • task (str | None) – Filter by job task name

  • status (str | None) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str | None) – Filter by job lock

  • queueing_lock (str | None) – Filter by job queueing_lock

  • worker_id (int | None) – Filter by worker ID

Return type:

Iterable[Job]

async list_jobs_to_abort_async(queue: str | None = None) Iterable[int]

List ids of running jobs to abort

Parameters:

queue (str | None)

Return type:

Iterable[int]

list_locks(queue: str | None = None, task: str | None = None, status: str | None = None, lock: str | None = None) Iterable[dict[str, Any]]

Sync version of list_queues

Parameters:
  • queue (str | None)

  • task (str | None)

  • status (str | None)

  • lock (str | None)

Return type:

Iterable[dict[str, Any]]

async list_locks_async(queue: str | None = None, task: str | None = None, status: str | None = None, lock: str | None = None) Iterable[dict[str, Any]]

List all locks and number of jobs per lock for each lock value.

Parameters:
  • queue (str | None) – Filter by job queue name

  • task (str | None) – Filter by job task name

  • status (str | None) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str | None) – Filter by job lock

Returns:

A list of dictionaries representing locks stats (name, jobs_count, todo, doing, succeeded, failed, cancelled, aborted).

Return type:

Iterable[dict[str, Any]]

list_queues(queue: str | None = None, task: str | None = None, status: str | None = None, lock: str | None = None) Iterable[dict[str, Any]]

Sync version of list_queues_async

Parameters:
  • queue (str | None)

  • task (str | None)

  • status (str | None)

  • lock (str | None)

Return type:

Iterable[dict[str, Any]]

async list_queues_async(queue: str | None = None, task: str | None = None, status: str | None = None, lock: str | None = None) Iterable[dict[str, Any]]

List all queues and number of jobs per status for each queue.

Parameters:
  • queue (str | None) – Filter by job queue name

  • task (str | None) – Filter by job task name

  • status (str | None) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str | None) – Filter by job lock

Returns:

A list of dictionaries representing queues stats (name, jobs_count, todo, doing, succeeded, failed, cancelled, aborted).

Return type:

Iterable[dict[str, Any]]

list_tasks(queue: str | None = None, task: str | None = None, status: str | None = None, lock: str | None = None) Iterable[dict[str, Any]]

Sync version of list_queues

Parameters:
  • queue (str | None)

  • task (str | None)

  • status (str | None)

  • lock (str | None)

Return type:

Iterable[dict[str, Any]]

async list_tasks_async(queue: str | None = None, task: str | None = None, status: str | None = None, lock: str | None = None) Iterable[dict[str, Any]]

List all tasks and number of jobs per status for each task.

Parameters:
  • queue (str | None) – Filter by job queue name

  • task (str | None) – Filter by job task name

  • status (str | None) – Filter by job status (todo/doing/succeeded/failed)

  • lock (str | None) – Filter by job lock

Returns:

A list of dictionaries representing tasks stats (name, jobs_count, todo, doing, succeeded, failed, cancelled, aborted).

Return type:

Iterable[dict[str, Any]]

async listen_for_jobs(*, on_notification: NotificationCallback, queues: Iterable[str] | None = None) None

Listens to job notifications from the database, and invokes the callback each time an notification is received.

This coroutine either returns None upon calling if it cannot start listening or does not return and needs to be cancelled to end.

Parameters:
  • on_notification (connector.Notify) – A coroutine that will be called and awaited every time a notification is received

  • queues (Optional[Iterable[str]]) – If None, all notification will be considered. If an iterable of queue names is passed, only defer operations on those queues will be considered. Defaults to None

Return type:

None

async prune_stalled_workers(seconds_since_heartbeat: float) list[int]

Delete the workers that have not sent a heartbeat for more than a given time.

Parameters:

seconds_since_heartbeat (float) – Only workers that have not sent a heartbeat for longer than this will be deleted

Returns:

A list of worker IDs that have been deleted

Return type:

list[int]

async register_worker() int

Register a newly started worker (with a initial heartbeat) in the database.

Returns:

The ID of the registered worker

Return type:

int

async retry_job(job: Job, retry_at: datetime | None = None, priority: int | None = None, queue: str | None = None, lock: str | None = None) None

Indicates that a job should be retried later.

Parameters:
  • job (Job)

  • retry_at (datetime | None) – If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC). Defaults to present time.

  • priority (int | None) – If set, the job will be retried with this priority. If not set, the priority remains unchanged.

  • queue (str | None) – If set, the job will be retried on this queue. If not set, the queue remains unchanged.

  • lock (str | None) – If set, the job will be retried with this lock. If not set, the lock remains unchanged.

Return type:

None

retry_job_by_id(job_id: int, retry_at: datetime, priority: int | None = None, queue: str | None = None, lock: str | None = None) None

Sync version of retry_job_by_id_async.

Parameters:
  • job_id (int)

  • retry_at (datetime)

  • priority (int | None)

  • queue (str | None)

  • lock (str | None)

Return type:

None

async retry_job_by_id_async(job_id: int, retry_at: datetime, priority: int | None = None, queue: str | None = None, lock: str | None = None) None

Indicates that a job should be retried later.

Parameters:
  • job_id (int)

  • retry_at (datetime) – If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC).

  • priority (int | None) – If set, the job will be retried with this priority. If not set, the priority remains unchanged.

  • queue (str | None) – If set, the job will be retried on this queue. If not set, the queue remains unchanged.

  • lock (str | None) – If set, the job will be retried with this lock. If not set, the lock remains unchanged.

Return type:

None

async unregister_worker(worker_id: int) None

Unregister a shut down worker and also delete its heartbeat from the database.

Parameters:

worker_id (int) – The ID of the worker to delete

Return type:

None

async update_heartbeat(worker_id: int) None

Update the heartbeat of a worker.

Parameters:

worker_id (int) – The ID of the worker to update the heartbeat

Return type:

None

Django

class procrastinate.contrib.django.DjangoApp(*, worker_defaults: WorkerOptions | None = None, **kwargs: Any)

A procrastinate.App preconfigured to manage Django’s per-thread database connections around each task (see close_db_connections()).

This is the app the Django contrib runs (procrastinate.contrib.django.app). It is also the app to use when building a throwaway app in a test — e.g. to avoid leaving a task registered on the global app — so that tasks run by a worker get the same connection cleanup and don’t leak connections at test database teardown:

from procrastinate.contrib.django import DjangoApp, app

new_app = DjangoApp(connector=app.connector)

The cleanup is configured by merging the DB-cleanup middlewares into worker_defaults (see with_db_cleanup()); any worker_defaults you pass are preserved.

Parameters:
  • connector – Typically an AiopgConnector. It will be responsible for all communications with the database. Mandatory.

  • import_paths – List of python dotted paths of modules to import, to make sure that the workers know about all possible tasks. If there are tasks in a module that is neither imported as a side effect of importing the App, nor specified in this list, and a worker encounters a task defined in that module, the task will fail (TaskNotFound). While it is not mandatory to specify paths to modules that you know have already been imported, it’s a good idea to do so.

  • worker_defaults (WorkerOptions | None) – All the values passed here will override the default values sent when launching a worker. See App.run_worker for details.

  • periodic_defaults

    Parameters for fine tuning the periodic tasks deferrer. Available parameters are:

    • max_delay: float, in seconds. When a worker starts and there’s a periodic task that hasn’t been deferred yet, it will try to defer the task only if it has been overdue for less time than specified by this parameter. If the task has been overdue for longer, the worker will wait until the next scheduled execution. This mechanism prevents newly added periodic tasks from being immediately deferred. Additionally, it ensures that periodic tasks, which were not deferred due to system outages, are not deferred upon application recovery (provided that the outage duration is longer than max_delay), that’s especially important for tasks intended to run during off-peak hours, such as intensive nightly tasks. (defaults to 10 minutes)

  • kwargs (Any)

SQLAlchemy

class procrastinate.contrib.sqlalchemy.SQLAlchemyPsycopg2Connector(*, dsn: str = 'postgresql://', json_dumps: Callable[[...], Any] | None = None, json_loads: Callable[[...], Any] | None = None, **kwargs: Any)

Synchronous connector based on SQLAlchemy with Psycopg2.

All other arguments than dsn, json_dumps, and json_loads are passed to create_engine() (see SQLAlchemy documentation).

Parameters:
  • dsn (The dsn string or URL object passed to SQLAlchemy’s create_engine) – function. Ignored if the engine is externally created and set into the connector through the App.open method.

  • json_dumps (collections.abc.Callable | None) – The JSON dumps function to use for serializing job arguments. Defaults to the function used by psycopg2. See the psycopg2 doc.

  • json_loads (collections.abc.Callable | None) – The JSON loads function to use for deserializing job arguments. Defaults Python’s json.loads function.

  • kwargs (Any)