saq.queue#

Queues

Module Contents#

exception saq.queue.JobError(job)[source]#

Bases: Exception

Basic Job error

Parameters:

job (saq.job.Job) –

class saq.queue.Queue(redis, name='default', dump=None, load=None, max_concurrent_ops=20)[source]#

Queue is used to interact with redis.

Parameters:
  • redis (redis.asyncio.client.Redis[bytes]) – instance of redis.asyncio pool

  • name (str) – name of the queue (default “default”)

  • dump (saq.types.DumpType | None) – lambda that takes a dictionary and outputs bytes (default json.dumps)

  • load (saq.types.LoadType | None) – lambda that takes str or bytes and outputs a python dictionary (default json.loads)

  • max_concurrent_ops (int) – maximum concurrent operations. (default 20) This throttles calls to enqueue, job, and abort to prevent the Queue from consuming too many Redis connections.

classmethod from_url(url, **kwargs)[source]#

Create a queue with a redis url a name.

Parameters:
  • url (str) –

  • kwargs (Any) –

Return type:

QueueT

__repr__()[source]#

Return repr(self).

Return type:

str

async info(jobs=False, offset=0, limit=10)[source]#

Returns info on the queue

Parameters:
  • jobs (bool) – Include job info (default False)

  • offset (int) – Offset of job info for pagination (default 0)

  • limit (int) – Max length of job info (default 10)

Return type:

saq.types.QueueInfo

async stats(ttl=60)[source]#

Returns & updates stats on the queue

Parameters:

ttl (int) – Time-to-live of stats saved in Redis

Return type:

saq.types.QueueStats

async count(kind)[source]#

Gets count of the kind provided by CountKind

Parameters:

kind (saq.types.CountKind) – The type of Kind you want counts info on

Return type:

int

async listen(job_keys, callback, timeout=10)[source]#

Listen to updates on jobs.

Parameters:
  • job_keys (collections.abc.Iterable[str]) – sequence of job keys

  • callback (saq.types.ListenCallback) – callback function, if it returns truthy, break

  • timeout (float | None) – if timeout is truthy, wait for timeout seconds

Return type:

None

async enqueue(job_or_func, **kwargs)[source]#

Enqueue a job by instance or string.

Example

job = await queue.enqueue("add", a=1, b=2)
print(job.id)
Parameters:
  • job_or_func (str | saq.job.Job) – The job or function to enqueue. If a job instance is passed in, it’s properties are overriden.

  • kwargs (Any) – Kwargs can be arguments of the function or properties of the job.

Returns:

If the job has already been enqueued, this returns None, else Job

Return type:

saq.job.Job | None

async apply(job_or_func, timeout=None, **kwargs)[source]#

Enqueue a job and wait for its result.

If the job is successful, this returns its result. If the job is unsuccessful, this raises a JobError.

Example

try:
    assert await queue.apply("add", a=1, b=2) == 3
except JobError:
    print("job failed")
Parameters:
  • job_or_func (str) – Same as Queue.enqueue

  • timeout (float | None) – If provided, how long to wait for result, else infinite (default None)

  • kwargs (Any) – Same as Queue.enqueue

Return type:

Any

async map(job_or_func, iter_kwargs, timeout=None, return_exceptions=False, **kwargs)[source]#

Enqueue multiple jobs and collect all of their results.

Example

try:
    assert await queue.map(
        "add",
        [
            {"a": 1, "b": 2},
            {"a": 3, "b": 4},
        ]
    ) == [3, 7]
except JobError:
    print("any of the jobs failed")
Parameters:
  • job_or_func (str | saq.job.Job) – Same as Queue.enqueue

  • iter_kwargs (collections.abc.Sequence[dict[str, Any]]) – Enqueue a job for each item in this sequence. Each item is the same as kwargs for Queue.enqueue.

  • timeout (float | None) – Total seconds to wait for all jobs to complete. If None (default) or 0, wait forever.

  • return_exceptions (bool) – If False (default), an exception is immediately raised as soon as any jobs fail. Other jobs won’t be cancelled and will continue to run. If True, exceptions are treated the same as successful results and aggregated in the result list.

  • kwargs (Any) – Default kwargs for all jobs. These will be overridden by those in iter_kwargs.

Return type:

list[Any]

async batch()[source]#

Context manager to batch enqueue jobs.

This tracks all jobs enqueued within the context manager scope and ensures that all are aborted if any exception is raised.

Example

async with queue.batch():
    await queue.enqueue("test")  # This will get cancelled
    raise asyncio.CancelledError
Return type:

collections.abc.AsyncIterator[None]

class saq.queue.PubSubMultiplexer(pubsub, prefix)[source]#

Handle multiple in-process channels over a single Redis channel.

We use pubsub for realtime job updates. Each pubsub instance needs a connection, and that connection can’t be reused when its done (that’s an assumption, based on redis-py not releasing the connection and this comment: https://github.com/redis/go-redis/issues/785#issuecomment-394596158). So we use a single pubsub instance that listens to all job changes on the queue and handle message routing in-process.

Parameters:
  • pubsub (redis.asyncio.client.PubSub) –

  • prefix (str) –