saq.worker¶
Workers
Module Contents¶
- class saq.worker.Worker(queue, functions, *, id=None, concurrency=10, cron_jobs=None, cron_tz=timezone.utc, startup=None, shutdown=None, before_process=None, after_process=None, timers=None, dequeue_timeout=0.0, burst=False, max_burst_jobs=None, shutdown_grace_period_s=None, cancellation_hard_deadline_s=1.0, metadata=None, poll_interval=0.0)[source]¶
Bases:
Generic[saq.types.CtxType]Worker is used to process and monitor jobs.
- Parameters:
id (Optional[str]) – optional override for the worker id, if not provided, uuid will be used
queue (saq.queue.Queue) – instance of saq.queue.Queue
functions (saq.types.FunctionsType[saq.types.CtxType]) – list of async functions
concurrency (int) – number of jobs to process concurrently
cron_jobs (collections.abc.Collection[saq.job.CronJob[saq.types.CtxType]] | None) – List of CronJob instances.
cron_tz (datetime.tzinfo) – timezone for cron scheduler
startup (saq.types.LifecycleFunctionsType[saq.types.CtxType] | None) – async function to call on startup
shutdown (saq.types.LifecycleFunctionsType[saq.types.CtxType] | None) – async function to call on shutdown
before_process (saq.types.LifecycleFunctionsType[saq.types.CtxType] | None) – async function to call before a job processes
after_process (saq.types.LifecycleFunctionsType[saq.types.CtxType] | None) – async function to call after a job processes
timers (saq.types.PartialTimersDict | None) – dict with various timer overrides in seconds schedule: how often we poll to schedule jobs worker_info: how often to update worker info, stats and metadata sweep: how often to clean up stuck jobs abort: how often to check if a job is aborted
dequeue_timeout (float) – how long it will wait to dequeue
burst (bool) – whether to stop the worker once all jobs have been processed
max_burst_jobs (int | None) – the maximum number of jobs to process in burst mode
shutdown_grace_period_s (int | None) – how long to wait for jobs to finish before sending cancellation signals.
cancellation_hard_deadline_s (float) – how long to wait for a job to finish after sending a cancellation signal.
metadata (Optional[JsonDict]) – arbitrary data to pass to the worker which it will register with saq
poll_interval (float) – If > 0.0, dequeue will use polling instead of listen/notify to trigger dequeues. This only affects Postgres. (default 0.0)
- async upkeep()[source]¶
Start various upkeep tasks async.
- Return type:
list[asyncio.Task[None]]