saq.worker

Workers

Module Contents

class saq.worker.Worker(queue, functions, *, id=None, concurrency=10, cron_jobs=None, cron_tz=timezone.utc, startup=None, shutdown=None, before_process=None, after_process=None, timers=None, dequeue_timeout=0.0, burst=False, max_burst_jobs=None, shutdown_grace_period_s=None, cancellation_hard_deadline_s=1.0, metadata=None, poll_interval=0.0)[source]

Bases: Generic[saq.types.CtxType]

Worker is used to process and monitor jobs.

Parameters:
  • id (Optional[str]) – optional override for the worker id, if not provided, uuid will be used

  • queue (saq.queue.Queue) – instance of saq.queue.Queue

  • functions (saq.types.FunctionsType[saq.types.CtxType]) – list of async functions

  • concurrency (int) – number of jobs to process concurrently

  • cron_jobs (collections.abc.Collection[saq.job.CronJob[saq.types.CtxType]] | None) – List of CronJob instances.

  • cron_tz (datetime.tzinfo) – timezone for cron scheduler

  • startup (saq.types.LifecycleFunctionsType[saq.types.CtxType] | None) – async function to call on startup

  • shutdown (saq.types.LifecycleFunctionsType[saq.types.CtxType] | None) – async function to call on shutdown

  • before_process (saq.types.LifecycleFunctionsType[saq.types.CtxType] | None) – async function to call before a job processes

  • after_process (saq.types.LifecycleFunctionsType[saq.types.CtxType] | None) – async function to call after a job processes

  • timers (saq.types.PartialTimersDict | None) – dict with various timer overrides in seconds schedule: how often we poll to schedule jobs worker_info: how often to update worker info, stats and metadata sweep: how often to clean up stuck jobs abort: how often to check if a job is aborted

  • dequeue_timeout (float) – how long it will wait to dequeue

  • burst (bool) – whether to stop the worker once all jobs have been processed

  • max_burst_jobs (int | None) – the maximum number of jobs to process in burst mode

  • shutdown_grace_period_s (int | None) – how long to wait for jobs to finish before sending cancellation signals.

  • cancellation_hard_deadline_s (float) – how long to wait for a job to finish after sending a cancellation signal.

  • metadata (Optional[JsonDict]) – arbitrary data to pass to the worker which it will register with saq

  • poll_interval (float) – If > 0.0, dequeue will use polling instead of listen/notify to trigger dequeues. This only affects Postgres. (default 0.0)

async start()[source]

Start processing jobs and upkeep tasks.

Return type:

None

async stop()[source]

Stop the worker and cleanup.

Return type:

None

async upkeep()[source]

Start various upkeep tasks async.

Return type:

list[asyncio.Task[None]]