Module Contents#

exception saq.queue.JobError(job)[source]#

Bases: Exception

Basic Job error


job (saq.job.Job) –

class saq.queue.Queue(redis, name='default', dump=None, load=None, max_concurrent_ops=20)[source]#

Queue is used to interact with redis.

  • redis (redis.asyncio.client.Redis[bytes]) – instance of redis.asyncio pool

  • name (str) – name of the queue (default “default”)

  • dump (saq.types.DumpType | None) – lambda that takes a dictionary and outputs bytes (default json.dumps)

  • load (saq.types.LoadType | None) – lambda that takes str or bytes and outputs a python dictionary (default json.loads)

  • max_concurrent_ops (int) – maximum concurrent operations. (default 20) This throttles calls to enqueue, job, and abort to prevent the Queue from consuming too many Redis connections.

classmethod from_url(url, **kwargs)[source]#

Create a queue with a redis url a name.

  • url (str) –

  • kwargs (Any) –

Return type:



Return repr(self).

Return type:


async info(jobs=False, offset=0, limit=10)[source]#

Returns info on the queue

  • jobs (bool) – Include job info (default False)

  • offset (int) – Offset of job info for pagination (default 0)

  • limit (int) – Max length of job info (default 10)

Return type:


async stats(ttl=60)[source]#

Returns & updates stats on the queue


ttl (int) – Time-to-live of stats saved in Redis

Return type:


async count(kind)[source]#

Gets count of the kind provided by CountKind


kind (saq.types.CountKind) – The type of Kind you want counts info on

Return type:


async listen(job_keys, callback, timeout=10)[source]#

Listen to updates on jobs.

  • job_keys ([str]) – sequence of job keys

  • callback (saq.types.ListenCallback) – callback function, if it returns truthy, break

  • timeout (float | None) – if timeout is truthy, wait for timeout seconds

Return type:


async enqueue(job_or_func, **kwargs)[source]#

Enqueue a job by instance or string.


job = await queue.enqueue("add", a=1, b=2)
  • job_or_func (str | saq.job.Job) – The job or function to enqueue. If a job instance is passed in, it’s properties are overriden.

  • kwargs (Any) – Kwargs can be arguments of the function or properties of the job.


If the job has already been enqueued, this returns None, else Job

Return type:

saq.job.Job | None

async apply(job_or_func, timeout=None, **kwargs)[source]#

Enqueue a job and wait for its result.

If the job is successful, this returns its result. If the job is unsuccessful, this raises a JobError.


    assert await queue.apply("add", a=1, b=2) == 3
except JobError:
    print("job failed")
  • job_or_func (str) – Same as Queue.enqueue

  • timeout (float | None) – If provided, how long to wait for result, else infinite (default None)

  • kwargs (Any) – Same as Queue.enqueue

Return type:


async map(job_or_func, iter_kwargs, timeout=None, return_exceptions=False, **kwargs)[source]#

Enqueue multiple jobs and collect all of their results.


    assert await
            {"a": 1, "b": 2},
            {"a": 3, "b": 4},
    ) == [3, 7]
except JobError:
    print("any of the jobs failed")
  • job_or_func (str | saq.job.Job) – Same as Queue.enqueue

  • iter_kwargs ([dict[str, Any]]) – Enqueue a job for each item in this sequence. Each item is the same as kwargs for Queue.enqueue.

  • timeout (float | None) – Total seconds to wait for all jobs to complete. If None (default) or 0, wait forever.

  • return_exceptions (bool) – If False (default), an exception is immediately raised as soon as any jobs fail. Other jobs won’t be cancelled and will continue to run. If True, exceptions are treated the same as successful results and aggregated in the result list.

  • kwargs (Any) – Default kwargs for all jobs. These will be overridden by those in iter_kwargs.

Return type:


async batch()[source]#

Context manager to batch enqueue jobs.

This tracks all jobs enqueued within the context manager scope and ensures that all are aborted if any exception is raised.


async with queue.batch():
    await queue.enqueue("test")  # This will get cancelled
    raise asyncio.CancelledError
Return type:[None]

class saq.queue.PubSubMultiplexer(pubsub, prefix)[source]#

Handle multiple in-process channels over a single Redis channel.

We use pubsub for realtime job updates. Each pubsub instance needs a connection, and that connection can’t be reused when its done (that’s an assumption, based on redis-py not releasing the connection and this comment: So we use a single pubsub instance that listens to all job changes on the queue and handle message routing in-process.

  • pubsub (redis.asyncio.client.PubSub) –

  • prefix (str) –