Source code for saq.job

"""
Jobs
"""

from __future__ import annotations

import dataclasses
import enum
import typing as t

from saq.utils import exponential_backoff, now, seconds, uuid1


if t.TYPE_CHECKING:
    from saq.queue import Queue
    from saq.types import DurationKind, Function

ABORT_ID_PREFIX = "saq:abort:"


def get_default_job_key() -> str:
    return uuid1()


[docs] class Status(str, enum.Enum): """ Queue Status """ NEW = "new" QUEUED = "queued" ACTIVE = "active" ABORTING = "aborting" ABORTED = "aborted" FAILED = "failed" COMPLETE = "complete"
ACTIVE_STATUSES = {Status.NEW, Status.QUEUED, Status.ACTIVE} TERMINAL_STATUSES = {Status.COMPLETE, Status.FAILED, Status.ABORTED} UNSUCCESSFUL_TERMINAL_STATUSES = TERMINAL_STATUSES - {Status.COMPLETE} @dataclasses.dataclass
[docs] class CronJob: """ Allows scheduling of repeated jobs with cron syntax. Args: function (saq.types.Function): the async function to run cron (str): cron string for a job to be repeated, uses croniter unique (bool): unique jobs only one once per queue, defaults true **The remaining kwargs are pass through to Job:** (see equivalent field in Job for more) * timeout * heartbeat * retries * ttl """ function: Function cron: str unique: bool = True timeout: int | None = None heartbeat: int | None = None retries: int | None = None ttl: int | None = None kwargs: dict[str, t.Any] | None = None
@dataclasses.dataclass
[docs] class Job: """ Main job class representing a run of a function. **User specified Arguments:** Args: function (str): the async function name to run kwargs (dict[str, Any] | None): kwargs to pass to the function queue (saq.queue.Queue): the saq.Queue object associated with the job key (str): unique identifier of a job, defaults to uuid1, can be passed in to avoid duplicate jobs timeout (int): the maximum amount of time a job can run for in seconds, defaults to 10 (0 means disabled) heartbeat (int): the maximum amount of time a job can survive without a heartbeat in seconds, defaults to 0 (disabled) a heartbeat can be triggered manually within a job by calling await job.update() retries (int): the maximum number of attempts to retry a job, defaults to 1 ttl (int): the maximum time in seconds to store information about a job including results, defaults to 600 (0 means indefinitely, -1 means disabled) retry_delay (float): seconds to delay before retrying the job retry_backoff (bool | float): If true, use exponential backoff for retry delays. The first retry will have whatever retry_delay is. The second retry will have retry_delay*2. The third retry will have retry_delay*4. And so on. This always includes jitter, where the final retry delay is a random number between 0 and the calculated retry delay. If retry_backoff is set to a number, that number is the maximum retry delay, in seconds. scheduled (int): epoch seconds for when the job should be scheduled, defaults to 0 (schedule right away) progress (float): job progress 0.0..1.0 meta (dict): arbitrary metadata to attach to the job **Framework Set Properties:** Don't set these, but you can read them. Parameters: attempts: number of attempts a job has had completed: job completion time epoch seconds queued: job enqueued time epoch seconds started: job started time epoch seconds touched: job touched/updated time epoch seconds result: payload containing the results, this is the return of the function provided, must be serializable, defaults to json error: stack trace if a runtime error occurs status: Status Enum, default to Status.New priority: The priority of a job, only available in postgres. group_key: Only one job per group can be active at any time, only available in postgres. """ function: str kwargs: dict[str, t.Any] | None = None queue: Queue | None = None key: str = dataclasses.field(default_factory=get_default_job_key) timeout: int = 10 heartbeat: int = 0 retries: int = 1 ttl: int = 600 retry_delay: float = 0.0 retry_backoff: bool | float = False scheduled: int = 0 progress: float = 0.0 attempts: int = 0 completed: int = 0 queued: int = 0 started: int = 0 touched: int = 0 result: t.Any = None error: str | None = None status: Status = Status.NEW priority: int = 0 group_key: str | None = None meta: dict[t.Any, t.Any] = dataclasses.field(default_factory=dict) _EXCLUDE_NON_FULL = { "kwargs", "scheduled", "progress", "total_ms", "result", "error", "status", "meta", }
[docs] def info(self, full: bool = False) -> str: """ String with Job info Args: full: If true, will list the full kwargs for the Job, else an abridged version. """ # Using an exclusion list preserves order for kwargs below excluded = set() if full else self._EXCLUDE_NON_FULL kwargs = ", ".join( f"{k}={v}" for k, v in { "function": self.function, "kwargs": self.kwargs, "queue": self.get_queue().name, "id": self.id, "scheduled": self.scheduled, "progress": self.progress, "process_ms": self.duration("process"), "start_ms": self.duration("start"), "total_ms": self.duration("total"), "attempts": self.attempts, "result": self.result, "error": self.error, "status": self.status, "meta": self.meta, }.items() if v is not None and k not in excluded ) return f"Job<{kwargs}>"
def __repr__(self) -> str: return self.info(True) def __hash__(self) -> int: return hash(self.key) def __eq__(self, other: t.Any) -> bool: return isinstance(other, Job) and self.key == other.key @property
[docs] def id(self) -> str: """Full Job ID""" return self.get_queue().job_id(self.key)
@property def abort_id(self) -> str: return f"{ABORT_ID_PREFIX}{self.key}"
[docs] def to_dict(self) -> dict[str, t.Any]: """ Serialises the Job to dict """ result = {} for field in dataclasses.fields(self): key = field.name value = getattr(self, key) if value == field.default: continue if key == "meta" and not value: continue if key == "queue" and value: value = value.name result[key] = value return result
[docs] def duration(self, kind: DurationKind) -> int | None: """ Returns the duration of the job given kind. Args: Kind (DurationKind): The kind of duration type, can be: * `process` (how long it took to process) * `start` (how long it took to start) * `total` * `running` """ if kind == "process": return self._duration(self.completed, self.started) if kind == "start": return self._duration(self.started, self.queued) if kind == "total": return self._duration(self.completed, self.queued) if kind == "running": return self._duration(now(), self.started) raise ValueError(f"Unknown duration type: {kind}")
def _duration(self, a: int, b: int) -> int | None: return a - b if a and b else None @property
[docs] def stuck(self) -> bool: """Checks if an active job is passed its timeout or heartbeat.""" current = now() return (self.status == Status.ACTIVE or self.status == Status.ABORTING) and bool( (self.timeout and seconds(current - self.started) > self.timeout) or (self.heartbeat and seconds(current - self.touched) > self.heartbeat) )
@property def retryable(self) -> bool: return self.retries > self.attempts def next_retry_delay(self) -> float: if self.retry_backoff is not False: max_delay = None if self.retry_backoff is True else self.retry_backoff return exponential_backoff( attempts=self.attempts, base_delay=self.retry_delay, max_delay=max_delay, jitter=True, ) return self.retry_delay
[docs] async def enqueue(self, queue: Queue | None = None) -> None: """ Enqueues the job to it's queue or a provided one. A job that already has a queue cannot be re-enqueued. Job uniqueness is determined by its id. If a job has already been queued, it will update its properties to match what is stored in the db. """ queue = queue or self.get_queue() if not await queue.enqueue(self): await self.refresh()
[docs] async def abort(self, error: str, ttl: int = 5) -> None: """Tries to abort the job.""" await self.get_queue().abort(self, error, ttl=ttl)
[docs] async def finish( self, status: Status, *, result: t.Any = None, error: str | None = None ) -> None: """Finishes the job with a Job.Status, result, and or error.""" await self.get_queue().finish(self, status, result=result, error=error)
[docs] async def retry(self, error: str | None) -> None: """Retries the job by removing it from active and enqueueing it again.""" await self.get_queue().retry(self, error)
[docs] async def update(self, **kwargs: t.Any) -> None: """ Updates the stored job in redis. Set properties with passed in kwargs. If status is not explicitly passed in, the status will not update as this is usually controlled by the workers. """ await self.get_queue().update(self, **kwargs)
[docs] async def refresh(self, until_complete: float | None = None) -> None: """ Refresh the current job with the latest data from the db. Args: until_complete (float | None): None or Numeric seconds. if None (default), don't wait, else wait seconds until the job is complete or the interval has been reached. 0 means wait forever """ job = await self.get_queue().job(self.key) if not job: raise RuntimeError(f"{self} doesn't exist") self.replace(job) if until_complete is not None and not self.completed: def callback(_id: str, status: Status) -> bool: return status in TERMINAL_STATUSES await self.get_queue().listen([self.key], callback, until_complete) await self.refresh()
[docs] def replace(self, job: Job) -> None: """Replace current attributes with job attributes.""" for field in dataclasses.fields(job): setattr(self, field.name, getattr(job, field.name))
def get_queue(self) -> Queue: if self.queue is None: raise TypeError( "`Job` must be associated with a `Queue` before this operation can proceed" ) return self.queue