Launch a worker¶
You can either go towards the CLI route with:
$ procrastinate --verbose --app=dotted.path.to.app worker [--name=worker-name] [queue [...]]
or, identically, use the code way:
app.run_worker(queues=["queue", ...], name="worker-name")
# or
await app.run_worker_async(queues=["queue", ...], name="worker-name")
In both cases, not specifying queues will tell Procrastinate to listen to every queue. Naming the worker is optional.
Note
App.run_worker() will take care of launching an event loop, opening the app,
running the worker, and when it exits, closing the app and the event loop.
On the other hand, App.run_worker_async() needs to run while the app is open.
The CLI takes care of opening the app.
… Inside an application¶
When running the worker inside a bigger application, you may want to use
install_signal_handlers=False so that the worker doesn’t interfere with
your application’s signal handlers.
For more information about stopping the worker, see Shutdown a worker.
Here is an example FastAPI application that does this:
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from procrastinate import App, PsycopgConnector
logging.basicConfig(level=logging.DEBUG)
task_queue = App(connector=PsycopgConnector())
@task_queue.task
async def sleep(length):
await asyncio.sleep(length)
@asynccontextmanager
async def lifespan(app: FastAPI):
async with task_queue.open_async():
worker = asyncio.create_task(
task_queue.run_worker_async(install_signal_handlers=False)
)
# Set to 100 to test the ungraceful shutdown
await sleep.defer_async(length=5)
print("STARTUP")
yield
print("SHUTDOWN")
worker.cancel()
try:
await asyncio.wait_for(worker, timeout=10)
except asyncio.TimeoutError:
print("Ungraceful shutdown")
except asyncio.CancelledError:
print("Graceful shutdown")
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"Hello": "World"}