Jobs#

Jobs can be scheduled to run as:

Sample code:

# schedule a job normally
job = await queue.enqueue("test", a=1)

# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)

# run a job and return the result
print(await queue.apply("test", a=2))

# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)

Common#

Explicit vs Implicit job calling#

TODO: queue.enqueue(Job(…)) vs known-parameters

Job defaults#

TODO: Discuss example to subclass Queue to configure defaults for enqueue (e.g. retries)

Retries#

TODOL Discuss that retries ALWAYS jitter

Enqueue#

async saq.queue.Queue.enqueue(job_or_func, **kwargs)

Enqueue a job by instance or string.

Example

job = await queue.enqueue("add", a=1, b=2)
print(job.id)
Parameters:
  • job_or_func (str | saq.job.Job) – The job or function to enqueue. If a job instance is passed in, it’s properties are overriden.

  • kwargs (Any) – Kwargs can be arguments of the function or properties of the job.

Returns:

If the job has already been enqueued, this returns None, else Job

Return type:

saq.job.Job | None

Apply#

async saq.queue.Queue.apply(job_or_func, timeout=None, **kwargs)

Enqueue a job and wait for its result.

If the job is successful, this returns its result. If the job is unsuccessful, this raises a JobError.

Example

try:
    assert await queue.apply("add", a=1, b=2) == 3
except JobError:
    print("job failed")
Parameters:
  • job_or_func (str) – Same as Queue.enqueue

  • timeout (float | None) – If provided, how long to wait for result, else infinite (default None)

  • kwargs (Any) – Same as Queue.enqueue

Return type:

Any

Map#

async saq.queue.Queue.map(job_or_func, iter_kwargs, timeout=None, return_exceptions=False, **kwargs)

Enqueue multiple jobs and collect all of their results.

Example

try:
    assert await queue.map(
        "add",
        [
            {"a": 1, "b": 2},
            {"a": 3, "b": 4},
        ]
    ) == [3, 7]
except JobError:
    print("any of the jobs failed")
Parameters:
  • job_or_func (str | saq.job.Job) – Same as Queue.enqueue

  • iter_kwargs (collections.abc.Sequence[dict[str, Any]]) – Enqueue a job for each item in this sequence. Each item is the same as kwargs for Queue.enqueue.

  • timeout (float | None) – Total seconds to wait for all jobs to complete. If None (default) or 0, wait forever.

  • return_exceptions (bool) – If False (default), an exception is immediately raised as soon as any jobs fail. Other jobs won’t be cancelled and will continue to run. If True, exceptions are treated the same as successful results and aggregated in the result list.

  • kwargs (Any) – Default kwargs for all jobs. These will be overridden by those in iter_kwargs.

Return type:

list[Any]

Batch#

async saq.queue.Queue.batch()

Context manager to batch enqueue jobs.

This tracks all jobs enqueued within the context manager scope and ensures that all are aborted if any exception is raised.

Example

async with queue.batch():
    await queue.enqueue("test")  # This will get cancelled
    raise asyncio.CancelledError
Return type:

collections.abc.AsyncIterator[None]