saq.job#

Jobs

Module Contents#

class saq.job.Status[source]#

Bases: str, enum.Enum

Queue Status

class saq.job.CronJob[source]#

Allows scheduling of repeated jobs with cron syntax.

Parameters:
  • function (saq.types.Function) – the async function to run

  • cron (str) – cron string for a job to be repeated, uses croniter

  • unique (bool) – unique jobs only one once per queue, defaults true

The remaining kwargs are pass through to Job: (see equivalent field in Job for more)

  • timeout

  • heartbeat

  • retries

  • ttl

class saq.job.Job[source]#

Main job class representing a run of a function.

User specified Arguments:

Parameters:
  • function (str) – the async function name to run

  • kwargs (dict[str, Any] | None) – kwargs to pass to the function

  • queue (saq.queue.Queue) – the saq.Queue object associated with the job

  • key (str) – unique identifier of a job, defaults to uuid1, can be passed in to avoid duplicate jobs

  • timeout (int) – the maximum amount of time a job can run for in seconds, defaults to 10 (0 means disabled)

  • heartbeat (int) – the maximum amount of time a job can survive without a heartbeat in seconds, defaults to 0 (disabled) a heartbeat can be triggered manually within a job by calling await job.update()

  • retries (int) – the maximum number of attempts to retry a job, defaults to 1

  • ttl (int) – the maximum time in seconds to store information about a job including results, defaults to 600 (0 means indefinitely, -1 means disabled)

  • retry_delay (float) – seconds to delay before retrying the job

  • retry_backoff (bool | float) – If true, use exponential backoff for retry delays. The first retry will have whatever retry_delay is. The second retry will have retry_delay*2. The third retry will have retry_delay*4. And so on. This always includes jitter, where the final retry delay is a random number between 0 and the calculated retry delay. If retry_backoff is set to a number, that number is the maximum retry delay, in seconds.

  • scheduled (int) – epoch seconds for when the job should be scheduled, defaults to 0 (schedule right away)

  • progress (float) – job progress 0.0..1.0

  • meta (dict) – arbitrary metadata to attach to the job

Framework Set Properties: Don’t set these, but you can read them.

Parameters:
  • attempts (int) – number of attempts a job has had

  • completed (int) – job completion time epoch seconds

  • queued (int) – job enqueued time epoch seconds

  • started (int) – job started time epoch seconds

  • touched (int) – job touched/updated time epoch seconds

  • result – payload containing the results, this is the return of the function provided, must be serializable, defaults to json

  • error (str | None) – stack trace if a runtime error occurs

  • status (Status) – Status Enum, default to Status.New

property id: str[source]#

Full Job ID

Return type:

str

property stuck: bool[source]#

Checks if an active job is passed its timeout or heartbeat.

Return type:

bool

info(full=False)[source]#

String with Job info

Parameters:

full (bool) – If true, will list the full kwargs for the Job, else an abridged version.

Return type:

str

__repr__()[source]#

Return repr(self).

Return type:

str

__hash__()[source]#

Return hash(self).

Return type:

int

classmethod key_from_id(job_id)[source]#

Key portion of Job ID

Parameters:

job_id (str) –

Return type:

str

to_dict()[source]#

Serialises the Job to dict

Return type:

dict[str, Any]

duration(kind)[source]#

Returns the duration of the job given kind.

Parameters:
  • Kind (DurationKind) – The kind of duration type, can be: * process (how long it took to process) * start (how long it took to start) * total * running

  • kind (saq.types.DurationKind) –

Return type:

int | None

async enqueue(queue=None)[source]#

Enqueues the job to it’s queue or a provided one.

A job that already has a queue cannot be re-enqueued. Job uniqueness is determined by its id. If a job has already been queued, it will update its properties to match what is stored in the db.

Parameters:

queue (saq.queue.Queue | None) –

Return type:

None

async abort(error, ttl=5)[source]#

Tries to abort the job.

Parameters:
  • error (str) –

  • ttl (int) –

Return type:

None

async finish(status, *, result=None, error=None)[source]#

Finishes the job with a Job.Status, result, and or error.

Parameters:
  • status (Status) –

  • result (Any) –

  • error (str | None) –

Return type:

None

async retry(error)[source]#

Retries the job by removing it from active and enqueueing it again.

Parameters:

error (str | None) –

Return type:

None

async update(**kwargs)[source]#

Updates the stored job in redis.

Set properties with passed in kwargs.

Parameters:

kwargs (Any) –

Return type:

None

async refresh(until_complete=None)[source]#

Refresh the current job with the latest data from the db.

Parameters:

until_complete (float | None) – None or Numeric seconds. if None (default), don’t wait, else wait seconds until the job is complete or the interval has been reached. 0 means wait forever

Return type:

None

replace(job)[source]#

Replace current attributes with job attributes.

Parameters:

job (Job) –

Return type:

None