[docs]defasync_iter(obj):""" Check if an object is an async iterable. """returninspect.isasyncgen(obj)orhasattr(obj,'__aiter__')
[docs]asyncdefasync_resolve(result,output=False):""" Recursively resolve awaitables and async iterables. :param result: The awaitable or async iterable to resolve :param output: If True, print results as they are resolved. If False, collect results. :return: The resolved value(s). If output is True, returns None. If output is False, returns a list of resolved values from async iterables. """whileinspect.iscoroutine(result):result=awaitresultifasync_iter(result):results=[]asyncfor_inresult:ifoutput:if(notinspect.iscoroutine(_)andnotinspect.isasyncgen(_)):display.print(_)else:awaitasync_resolve(_,output=output)else:results.append(awaitasync_resolve(_))returnNoneifoutputelseresultsreturnresult
[docs]classQueue(asyncio.Queue):""" An async queue with worker pool for concurrent task processing. Extends asyncio.Queue to manage a pool of worker tasks that process items from the queue concurrently. .. code-block:: python # will run 2 at the time queue = cli2.Queue(num_workers=2) # call like asyncio.run await queue.run(foo(), bar(), other()) .. py:attribute:: num_workers Number of concurrent workers (default: 12) .. py:attribute:: results List of results from completed tasks, order of results not garanteed due to concurrency. """def__init__(self,*args,num_workers=12,**kwargs):"""Initialize the queue with worker pool. :param num_workers: Number of concurrent workers (default: 12) :paarm *args: Positional arguments for asyncio.Queue :param **kwargs: Keyword arguments for asyncio.Queue """self.num_workers=num_workersor12self.results=[]super().__init__(*args,**kwargs)
[docs]asyncdefrun(self,*tasks):""" Run tasks through the worker pool. :param tasks: Coroutines """self.results=[]fortaskintasks:awaitself.put(task)workers=[asyncio.create_task(self.worker())foriinrange(self.num_workers)]awaitself.join()forworkerinworkers:worker.cancel()
[docs]asyncdefworker(self):"""Worker task that processes items from the queue. Continuously gets tasks from the queue, executes them, and stores results. Handles exceptions by logging them. """whileTrue:task=awaitself.get()try:result=awaittaskexcept:# noqalog.exception()else:self.results.append(result)finally:self.task_done()