Source code for cli2.queue
"""
asyncio.Queue subclass
While useful, you might want to consider the :py:mod:`cli2.tasks` module
instead.
"""
import asyncio
import os
from .log import log
[docs]
class Queue(asyncio.Queue):
"""
An async queue with worker pool for concurrent task processing.
Extends asyncio.Queue to manage a pool of worker tasks that process items
from the queue concurrently.
.. code-block:: python
# will run 2 at the time
queue = cli2.Queue(num_workers=2)
# call like asyncio.run
await queue.run(foo(), bar(), other())
.. py:attribute:: num_workers
Number of concurrent workers (default: cpucount * 2)
.. py:attribute:: results
List of results from completed tasks, order of results not garanteed
due to concurrency.
"""
def __init__(self, *args, num_workers=None, **kwargs):
"""Initialize the queue with worker pool.
:param num_workers: Number of concurrent workers
(default: cpu count * 2)
:paarm *args: Positional arguments for asyncio.Queue
:param **kwargs: Keyword arguments for asyncio.Queue
"""
self.num_workers = num_workers or os.cpu_count() * 2
self.results = []
super().__init__(*args, **kwargs)
[docs]
async def run(self, *tasks):
"""
Run tasks through the worker pool.
:param tasks: Coroutines
"""
self.results = []
for task in tasks:
await self.put(task)
workers = [
asyncio.create_task(self.worker())
for i in range(self.num_workers)
]
await self.join()
for worker in workers:
worker.cancel()
[docs]
async def worker(self):
"""Worker task that processes items from the queue.
Continuously gets tasks from the queue, executes them, and stores
results.
Handles exceptions by logging them.
"""
while True:
task = await self.get()
try:
result = await task
except: # noqa
log.exception()
else:
self.results.append(result)
finally:
self.task_done()