saq.job¶
Jobs
Module Contents¶
- class saq.job.CronJob[source]¶
Allows scheduling of repeated jobs with cron syntax.
- Parameters:
The remaining kwargs are pass through to Job: (see equivalent field in Job for more)
timeout
heartbeat
retries
ttl
- class saq.job.Job[source]¶
Main job class representing a run of a function.
User specified Arguments:
- Parameters:
function (
str
) – the async function name to runkwargs (
dict[str, Any] | None
) – kwargs to pass to the functionqueue (
saq.queue.Queue
) – the saq.Queue object associated with the jobkey (
str
) – unique identifier of a job, defaults to uuid1, can be passed in to avoid duplicate jobstimeout (
int
) – the maximum amount of time a job can run for in seconds, defaults to 10 (0 means disabled)heartbeat (
int
) – the maximum amount of time a job can survive without a heartbeat in seconds, defaults to 0 (disabled) a heartbeat can be triggered manually within a job by calling await job.update()retries (
int
) – the maximum number of attempts to retry a job, defaults to 1ttl (
int
) – the maximum time in seconds to store information about a job including results, defaults to 600 (0 means indefinitely, -1 means disabled)retry_delay (
float
) – seconds to delay before retrying the jobretry_backoff (
bool | float
) – If true, use exponential backoff for retry delays. The first retry will have whatever retry_delay is. The second retry will have retry_delay*2. The third retry will have retry_delay*4. And so on. This always includes jitter, where the final retry delay is a random number between 0 and the calculated retry delay. If retry_backoff is set to a number, that number is the maximum retry delay, in seconds.scheduled (
int
) – epoch seconds for when the job should be scheduled, defaults to 0 (schedule right away)progress (
float
) – job progress 0.0..1.0meta (
dict
) – arbitrary metadata to attach to the job
Framework Set Properties: Don’t set these, but you can read them.
- Parameters:
attempts (
int
) – number of attempts a job has hadcompleted (
int
) – job completion time epoch secondsqueued (
int
) – job enqueued time epoch secondsstarted (
int
) – job started time epoch secondstouched (
int
) – job touched/updated time epoch secondsresult – payload containing the results, this is the return of the function provided, must be serializable, defaults to json
error (
str | None
) – stack trace if a runtime error occursstatus (
Status
) – Status Enum, default to Status.New
- duration(kind)[source]¶
Returns the duration of the job given kind.
- Parameters:
Kind (
DurationKind
) – The kind of duration type, can be: * process (how long it took to process) * start (how long it took to start) * total * runningkind (saq.types.DurationKind)
- Return type:
int | None
- property stuck: bool[source]¶
Checks if an active job is passed its timeout or heartbeat.
- Return type:
- async enqueue(queue=None)[source]¶
Enqueues the job to it’s queue or a provided one.
A job that already has a queue cannot be re-enqueued. Job uniqueness is determined by its id. If a job has already been queued, it will update its properties to match what is stored in the db.
- Parameters:
queue (saq.queue.Queue | None)
- Return type:
None
- async finish(status, *, result=None, error=None)[source]¶
Finishes the job with a Job.Status, result, and or error.
- async retry(error)[source]¶
Retries the job by removing it from active and enqueueing it again.
- Parameters:
error (str | None)
- Return type:
None
- async update(**kwargs)[source]¶
Updates the stored job in redis.
Set properties with passed in kwargs.
- Parameters:
kwargs (Any)
- Return type:
None
- async refresh(until_complete=None)[source]¶
Refresh the current job with the latest data from the db.
- Parameters:
until_complete (
float | None
) – None or Numeric seconds. if None (default), don’t wait, else wait seconds until the job is complete or the interval has been reached. 0 means wait forever- Return type:
None