Launch a worker

You can either go towards the CLI route with:

$ procrastinate --verbose --app=dotted.path.to.app worker [--name=worker-name] [queue [...]]

or, identically, use the code way:

app.run_worker(queues=["queue", ...], name="worker-name")
# or
await app.run_worker_async(queues=["queue", ...], name="worker-name")

In both cases, not specifying queues will tell Procrastinate to listen to every queue. Naming the worker is optional.

Note

App.run_worker() will take care of launching an event loop, opening the app, running the worker, and when it exits, closing the app and the event loop.

On the other hand, App.run_worker_async() needs to run while the app is open. The CLI takes care of opening the app.

… Inside an application

When running the worker inside a bigger application, you may want to use install_signal_handlers=False so that the worker doesn’t interfere with your application’s signal handlers.

For more information about stopping the worker, see Shutdown a worker.

Here is an example FastAPI application that does this:

import asyncio
import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI

from procrastinate import App, PsycopgConnector

logging.basicConfig(level=logging.DEBUG)


task_queue = App(connector=PsycopgConnector())


@task_queue.task
async def sleep(length):
    await asyncio.sleep(length)


@asynccontextmanager
async def lifespan(app: FastAPI):
    async with task_queue.open_async():
        worker = asyncio.create_task(
            task_queue.run_worker_async(install_signal_handlers=False)
        )
        # Set to 100 to test the ungraceful shutdown
        await sleep.defer_async(length=5)

        print("STARTUP")
        yield
        print("SHUTDOWN")

        worker.cancel()
        try:
            await asyncio.wait_for(worker, timeout=10)
        except asyncio.TimeoutError:
            print("Ungraceful shutdown")
        except asyncio.CancelledError:
            print("Graceful shutdown")


app = FastAPI(lifespan=lifespan)


@app.get("/")
async def root():
    return {"Hello": "World"}