Using an external database connection¶
Procrastinate supports deferring jobs on an externally-managed database connection. This lets you insert the job within the same transaction as your own database operations, enabling patterns like the transactional outbox. Cancelling a job and reading its status can run on an external connection too.
When to use this¶
If you need the job to be created atomically with other database writes (e.g.
inserting a row into your own table and deferring a task that processes it), pass your
connection to configure(connection=...). The job INSERT will execute on your
connection, inside your transaction. You are responsible for committing.
Sync example (psycopg)¶
import psycopg
from procrastinate import App, SyncPsycopgConnector
app = App(connector=SyncPsycopgConnector(conninfo="..."))
app.open()
@app.task
def process_order(order_id):
...
with psycopg.connect("...") as conn:
conn.autocommit = False
conn.execute("INSERT INTO orders (id, ...) VALUES (%s, ...)", [42])
process_order.configure(connection=conn).defer(order_id=42)
conn.commit() # both the order row and the job are committed together
Async example (psycopg)¶
import psycopg
from procrastinate import App, PsycopgConnector
app = App(connector=PsycopgConnector(conninfo="..."))
await app.open_async()
@app.task
def process_order(order_id):
...
async with await psycopg.AsyncConnection.connect("...") as conn:
conn.autocommit = False
await conn.execute("INSERT INTO orders (id, ...) VALUES (%s, ...)", [42])
await process_order.configure(connection=conn).defer_async(order_id=42)
await conn.commit()
SQLAlchemy example¶
from procrastinate import App
from procrastinate.contrib.sqlalchemy import SQLAlchemyPsycopg2Connector
connector = SQLAlchemyPsycopg2Connector(dsn="postgresql+psycopg2:///mydb")
app = App(connector=connector)
app.open()
@app.task
def process_order(order_id):
...
with connector.engine.connect() as conn:
conn.exec_driver_sql("INSERT INTO orders (id) VALUES (%s)", [42])
process_order.configure(connection=conn).defer(order_id=42)
conn.commit()
Cancelling jobs and reading status¶
cancel_job_by_id / cancel_job_by_id_async and get_job_status /
get_job_status_async accept the same connection argument. Useful if you update
your own tables when cancelling a job and want both changes to commit (or roll back)
together:
job_id = 33
async with await psycopg.AsyncConnection.connect("...") as conn:
await conn.execute("UPDATE orders SET state = 'cancelled' WHERE id = %s", [42])
await app.job_manager.cancel_job_by_id_async(job_id, connection=conn)
await conn.commit() # the order update and the cancellation commit together
Supported connectors¶
SyncPsycopgConnector— pass apsycopg.ConnectionPsycopgConnector— pass apsycopg.AsyncConnectionSQLAlchemyPsycopg2Connector— pass asqlalchemy.engine.Connection
Other connectors will raise a ConnectorException if connection is provided.
Important notes¶
NOTIFY is delayed: The database NOTIFY that wakes workers fires when you commit. This is expected and acceptable. The same applies when cancelling a running job with
abort=True: the worker only sees the abort request once you commit.Error handling: Errors from the job INSERT (e.g. queueing lock violations) are wrapped as usual via procrastinate’s exception hierarchy, but they may abort your transaction. Use savepoints if you need to isolate the defer from the rest of your transaction.
No validation: Procrastinate does not validate the connection type. Passing the wrong type will produce a runtime error from the database driver.