saq.queue
#
Queues
Module Contents#
- exception saq.queue.JobError(job)[source]#
Bases:
Exception
Basic Job error
- Parameters:
job (saq.job.Job) –
- class saq.queue.Queue(redis, name='default', dump=None, load=None, max_concurrent_ops=20)[source]#
Queue is used to interact with redis.
- Parameters:
redis (redis.asyncio.client.Redis[bytes]) – instance of redis.asyncio pool
name (str) – name of the queue (default “default”)
dump (saq.types.DumpType | None) – lambda that takes a dictionary and outputs bytes (default json.dumps)
load (saq.types.LoadType | None) – lambda that takes str or bytes and outputs a python dictionary (default json.loads)
max_concurrent_ops (int) – maximum concurrent operations. (default 20) This throttles calls to enqueue, job, and abort to prevent the Queue from consuming too many Redis connections.
- classmethod from_url(url, **kwargs)[source]#
Create a queue with a redis url a name.
- Parameters:
url (str) –
kwargs (Any) –
- Return type:
QueueT
- async info(jobs=False, offset=0, limit=10)[source]#
Returns info on the queue
- Parameters:
- Return type:
- async stats(ttl=60)[source]#
Returns & updates stats on the queue
- Parameters:
ttl (int) – Time-to-live of stats saved in Redis
- Return type:
- async count(kind)[source]#
Gets count of the kind provided by CountKind
- Parameters:
kind (saq.types.CountKind) – The type of Kind you want counts info on
- Return type:
- async listen(job_keys, callback, timeout=10)[source]#
Listen to updates on jobs.
- Parameters:
job_keys (collections.abc.Iterable[str]) – sequence of job keys
callback (saq.types.ListenCallback) – callback function, if it returns truthy, break
timeout (float | None) – if timeout is truthy, wait for timeout seconds
- Return type:
None
- async enqueue(job_or_func, **kwargs)[source]#
Enqueue a job by instance or string.
Example
job = await queue.enqueue("add", a=1, b=2) print(job.id)
- Parameters:
job_or_func (str | saq.job.Job) – The job or function to enqueue. If a job instance is passed in, it’s properties are overriden.
kwargs (Any) – Kwargs can be arguments of the function or properties of the job.
- Returns:
If the job has already been enqueued, this returns None, else Job
- Return type:
saq.job.Job | None
- async apply(job_or_func, timeout=None, **kwargs)[source]#
Enqueue a job and wait for its result.
If the job is successful, this returns its result. If the job is unsuccessful, this raises a JobError.
Example
try: assert await queue.apply("add", a=1, b=2) == 3 except JobError: print("job failed")
- async map(job_or_func, iter_kwargs, timeout=None, return_exceptions=False, **kwargs)[source]#
Enqueue multiple jobs and collect all of their results.
Example
try: assert await queue.map( "add", [ {"a": 1, "b": 2}, {"a": 3, "b": 4}, ] ) == [3, 7] except JobError: print("any of the jobs failed")
- Parameters:
job_or_func (str | saq.job.Job) – Same as Queue.enqueue
iter_kwargs (collections.abc.Sequence[dict[str, Any]]) – Enqueue a job for each item in this sequence. Each item is the same as kwargs for Queue.enqueue.
timeout (float | None) – Total seconds to wait for all jobs to complete. If None (default) or 0, wait forever.
return_exceptions (bool) – If False (default), an exception is immediately raised as soon as any jobs fail. Other jobs won’t be cancelled and will continue to run. If True, exceptions are treated the same as successful results and aggregated in the result list.
kwargs (Any) – Default kwargs for all jobs. These will be overridden by those in iter_kwargs.
- Return type:
list[Any]
- async batch()[source]#
Context manager to batch enqueue jobs.
This tracks all jobs enqueued within the context manager scope and ensures that all are aborted if any exception is raised.
Example
async with queue.batch(): await queue.enqueue("test") # This will get cancelled raise asyncio.CancelledError
- Return type:
- class saq.queue.PubSubMultiplexer(pubsub, prefix)[source]#
Handle multiple in-process channels over a single Redis channel.
We use pubsub for realtime job updates. Each pubsub instance needs a connection, and that connection can’t be reused when its done (that’s an assumption, based on redis-py not releasing the connection and this comment: https://github.com/redis/go-redis/issues/785#issuecomment-394596158). So we use a single pubsub instance that listens to all job changes on the queue and handle message routing in-process.
- Parameters:
pubsub (redis.asyncio.client.PubSub) –
prefix (str) –