Source code for saq.types

"""
Types
"""

from __future__ import annotations

import typing as t
from collections.abc import Collection


if t.TYPE_CHECKING:
    from asyncio import Task

    from saq.job import CronJob, Job, Status
    from saq.worker import Worker
    from saq.queue import Queue
    from typing_extensions import Required


[docs] class Context(t.TypedDict, total=False): """ Task context. Extra context fields are allowed. """
[docs] worker: Required[Worker]
"Worker currently executing the task"
[docs] job: Job
"Job() instance of the task"
[docs] queue: Queue
"Queue the task is running on"
[docs] exception: t.Optional[Exception]
"Exception raised by the task if any"
[docs] class JobTaskContext(t.TypedDict, total=False): """ Jobs Task Context """
[docs] task: Task[t.Any]
"asyncio Task of the Job"
[docs] aborted: t.Optional[str]
"If this task has been aborted, this is the reason"
[docs] class QueueInfo(t.TypedDict): """ Queue Info """
[docs] workers: dict[str, dict[str, t.Any]]
"Worker information"
[docs] name: str
"Queue name"
[docs] queued: int
"Number of jobs currently in the queue"
[docs] active: int
"Number of jobs currently active" scheduled: int
[docs] jobs: list[dict[str, t.Any]]
"A truncated list containing the jobs that are scheduled to execute soonest"
[docs] class QueueStats(t.TypedDict): """ Queue Stats """
[docs] complete: int
"Number of complete tasks"
[docs] failed: int
"Number of failed tasks"
[docs] retried: int
"Number of retries"
[docs] aborted: int
"Number of aborted tasks"
[docs] uptime: int
"Queue uptime in milliseconds"
[docs] class TimersDict(t.TypedDict): """ Timers Dictionary """
[docs] schedule: int
"How often we poll to schedule jobs in seconds (default 1)"
[docs] stats: int
"How often to update stats in seconds (default 10)"
[docs] sweep: int
"How often to clean up stuck jobs in seconds (default 60)"
[docs] abort: int
"How often to check if a job is aborted in seconds (default 1)"
[docs] class PartialTimersDict(TimersDict, total=False): """ For argument to `Worker`, all keys are not required """
[docs] class SettingsDict(t.TypedDict, total=False): """ Settings """ queue: Queue functions: Required[Collection[Function | tuple[str, Function]]] concurrency: int cron_jobs: Collection[CronJob] startup: ReceivesContext shutdown: ReceivesContext before_process: ReceivesContext after_process: ReceivesContext timers: PartialTimersDict dequeue_timeout: float
BeforeEnqueueType = t.Callable[["Job"], t.Awaitable[t.Any]] CountKind = t.Literal["queued", "active", "incomplete"] DumpType = t.Callable[[t.Mapping[t.Any, t.Any]], t.Union[bytes, str]] DurationKind = t.Literal["process", "start", "total", "running"] Function = t.Callable[..., t.Any] ListenCallback = t.Callable[[str, "Status"], t.Any] LoadType = t.Callable[[t.Union[bytes, str]], t.Any] ReceivesContext = t.Callable[[Context], t.Any] VersionTuple = t.Tuple[int, ...]