Launch a worker#

You can either go towards the CLI route with:

$ procrastinate --verbose --app=dotted.path.to.app worker [--name=worker-name] [queue [...]]

or, identically, use the code way:

app.run_worker(queues=["queue", ...], name="worker-name")
# or
await app.run_worker_async(queues=["queue", ...], name="worker-name")

In both cases, not specifying queues will tell Procrastinate to listen to every queue. Naming the worker is optional.

Note

App.run_worker() will take care of launching an event loop, opening the app, running the worker, and when it exists, closing the app and the event loop.

On the other hand, App.run_worker_async() needs to run while the app is open. The CLI takes care of opening the app.

… Inside an application#

When running the worker inside a bigger application, you may want to use install_signal_handlers=False so that the worker doesn’t interfere with your application’s signal handlers.

Note

When you run the worker as a task, at any point, you can call task.cancel() to request the worker to gracefully stop at the next opportunity. You may then wait for it to actually stop using await task if you’re ready to wait indefinitely, or asyncio.wait_for(task, timeout) if you want to set a timeout.

Here is an example FastAPI application that does this:

import asyncio
import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI

from procrastinate import App, PsycopgConnector

logging.basicConfig(level=logging.DEBUG)


task_queue = App(connector=PsycopgConnector())


@task_queue.task
async def sleep(length):
    await asyncio.sleep(length)


@asynccontextmanager
async def lifespan(app: FastAPI):
    async with task_queue.open_async():
        worker = asyncio.create_task(
            task_queue.run_worker_async(install_signal_handlers=False)
        )
        # Set to 100 to test the ungraceful shutdown
        await sleep.defer_async(length=5)

        print("STARTUP")
        yield
        print("SHUTDOWN")

        worker.cancel()
        try:
            await asyncio.wait_for(worker, timeout=10)
        except asyncio.TimeoutError:
            print("Ungraceful shutdown")
        except asyncio.CancelledError:
            print("Graceful shutdown")


app = FastAPI(lifespan=lifespan)


@app.get("/")
async def root():
    return {"Hello": "World"}