Source code for saq.types
"""
Types
"""
from __future__ import annotations
import typing as t
from collections.abc import Collection
if t.TYPE_CHECKING:
from asyncio import Task
from saq.job import CronJob, Job, Status
from saq.worker import Worker
from saq.queue import Queue
from typing_extensions import Required
[docs]
class Context(t.TypedDict, total=False):
"""
Task context.
Extra context fields are allowed.
"""
[docs]
worker: Required[Worker]
"Worker currently executing the task"
"Job() instance of the task"
"Queue the task is running on"
[docs]
exception: t.Optional[Exception]
"Exception raised by the task if any"
[docs]
class JobTaskContext(t.TypedDict, total=False):
"""
Jobs Task Context
"""
"asyncio Task of the Job"
[docs]
aborted: t.Optional[str]
"If this task has been aborted, this is the reason"
[docs]
class QueueInfo(t.TypedDict):
"""
Queue Info
"""
[docs]
workers: dict[str, dict[str, t.Any]]
"Worker information"
"Queue name"
"Number of jobs currently in the queue"
"Number of jobs currently active"
scheduled: int
[docs]
jobs: list[dict[str, t.Any]]
"A truncated list containing the jobs that are scheduled to execute soonest"
[docs]
class QueueStats(t.TypedDict):
"""
Queue Stats
"""
"Number of complete tasks"
"Number of failed tasks"
"Number of retries"
"Number of aborted tasks"
"Queue uptime in milliseconds"
[docs]
class TimersDict(t.TypedDict):
"""
Timers Dictionary
"""
"How often we poll to schedule jobs in seconds (default 1)"
"How often to update stats in seconds (default 10)"
"How often to clean up stuck jobs in seconds (default 60)"
"How often to check if a job is aborted in seconds (default 1)"
[docs]
class PartialTimersDict(TimersDict, total=False):
"""
For argument to `Worker`, all keys are not required
"""
[docs]
class SettingsDict(t.TypedDict, total=False):
"""
Settings
"""
queue: Queue
functions: Required[Collection[Function | tuple[str, Function]]]
concurrency: int
cron_jobs: Collection[CronJob]
startup: ReceivesContext
shutdown: ReceivesContext
before_process: ReceivesContext
after_process: ReceivesContext
timers: PartialTimersDict
dequeue_timeout: float
BeforeEnqueueType = t.Callable[["Job"], t.Awaitable[t.Any]]
CountKind = t.Literal["queued", "active", "incomplete"]
DumpType = t.Callable[[t.Mapping[t.Any, t.Any]], t.Union[bytes, str]]
DurationKind = t.Literal["process", "start", "total", "running"]
Function = t.Callable[..., t.Any]
ListenCallback = t.Callable[[str, "Status"], t.Any]
LoadType = t.Callable[[t.Union[bytes, str]], t.Any]
ReceivesContext = t.Callable[[Context], t.Any]
VersionTuple = t.Tuple[int, ...]