saq.job

Jobs

Module Contents

class saq.job.Status[source]

Bases: str, enum.Enum

Queue Status

class saq.job.CronJob[source]

Bases: Generic[saq.types.CtxType]

Allows scheduling of repeated jobs with cron syntax.

Parameters:
  • function (saq.types.Function) – the async function to run

  • cron (str) – cron string for a job to be repeated, uses croniter

  • unique (bool) – unique jobs only one once per queue, defaults true

The remaining kwargs are pass through to Job: (see equivalent field in Job for more)

  • timeout

  • heartbeat

  • retries

  • ttl

class saq.job.Job[source]

Main job class representing a run of a function.

User specified Arguments:

Parameters:
  • function (str) – the async function name to run

  • kwargs (dict[str, Any] | None) – kwargs to pass to the function

  • queue (saq.queue.Queue) – the saq.Queue object associated with the job

  • key (str) – unique identifier of a job, defaults to uuid1, can be passed in to avoid duplicate jobs

  • timeout (int) – the maximum amount of time a job can run for in seconds, defaults to 10 (0 means disabled)

  • heartbeat (int) – the maximum amount of time a job can survive without a heartbeat in seconds, defaults to 0 (disabled) a heartbeat can be triggered manually within a job by calling await job.update()

  • retries (int) – the maximum number of attempts to retry a job, defaults to 1

  • ttl (int) – the maximum time in seconds to store information about a job including results, defaults to 600 (0 means indefinitely, -1 means disabled)

  • retry_delay (float) – seconds to delay before retrying the job

  • retry_backoff (bool | float) – If true, use exponential backoff for retry delays. The first retry will have whatever retry_delay is. The second retry will have retry_delay*2. The third retry will have retry_delay*4. And so on. This always includes jitter, where the final retry delay is a random number between 0 and the calculated retry delay. If retry_backoff is set to a number, that number is the maximum retry delay, in seconds.

  • scheduled (int) – epoch seconds for when the job should be scheduled, defaults to 0 (schedule right away)

  • progress (float) – job progress 0.0..1.0

  • meta (dict) – arbitrary metadata to attach to the job

Framework Set Properties: Don’t set these, but you can read them.

Parameters:
  • attempts – number of attempts a job has had

  • completed – job completion time epoch seconds

  • queued – job enqueued time epoch seconds

  • started – job started time epoch seconds

  • touched – job touched/updated time epoch seconds

  • result – payload containing the results, this is the return of the function provided, must be serializable, defaults to json

  • error – stack trace if a runtime error occurs

  • status – Status Enum, default to Status.New

  • priority – The priority of a job, only available in postgres.

  • group_key – Only one job per group can be active at any time, only available in postgres.

info(full=False)[source]

String with Job info

Parameters:

full (bool) – If true, will list the full kwargs for the Job, else an abridged version.

Return type:

str

property id: str[source]

Full Job ID

Return type:

str

to_dict()[source]

Serialises the Job to dict

Return type:

dict[str, Any]

duration(kind)[source]

Returns the duration of the job given kind.

Parameters:
  • Kind (DurationKind) – The kind of duration type, can be: * process (how long it took to process) * start (how long it took to start) * total * running

  • kind (saq.types.DurationKind)

Return type:

int | None

property stuck: bool[source]

Checks if an active job is passed its timeout or heartbeat.

Return type:

bool

async enqueue(queue=None)[source]

Enqueues the job to it’s queue or a provided one.

A job that already has a queue cannot be re-enqueued. Job uniqueness is determined by its id. If a job has already been queued, it will update its properties to match what is stored in the db.

Parameters:

queue (saq.queue.Queue | None)

Return type:

None

async abort(error, ttl=5)[source]

Tries to abort the job.

Parameters:
Return type:

None

async finish(status, *, result=None, error=None)[source]

Finishes the job with a Job.Status, result, and or error.

Parameters:
  • status (Status)

  • result (Any)

  • error (str | None)

Return type:

None

async retry(error)[source]

Retries the job by removing it from active and enqueueing it again.

Parameters:

error (str | None)

Return type:

None

async update(**kwargs)[source]

Updates the stored job in redis.

Set properties with passed in kwargs.

If status is not explicitly passed in, the status will not update as this is usually controlled by the workers.

Parameters:

kwargs (Any)

Return type:

None

async refresh(until_complete=None)[source]

Refresh the current job with the latest data from the db.

Parameters:

until_complete (float | None) – None or Numeric seconds. if None (default), don’t wait, else wait seconds until the job is complete or the interval has been reached. 0 means wait forever

Return type:

None

replace(job)[source]

Replace current attributes with job attributes.

Parameters:

job (Job)

Return type:

None