Source code for cli2.asyncio
import aiofiles
import inspect
from . import display
from .queue import Queue
[docs]
def async_iter(obj):
""" Check if an object is an async iterable. """
return inspect.isasyncgen(obj) or hasattr(obj, '__aiter__')
[docs]
async def async_resolve(result, output=False):
"""
Recursively resolve awaitables and async iterables.
:param result: The awaitable or async iterable to resolve
:param output: If True, print results as they are resolved. If False,
collect results.
:return: The resolved value(s). If output is True, returns None. If output
is False, returns a list of resolved values from async iterables.
"""
while inspect.iscoroutine(result):
result = await result
if async_iter(result):
results = []
async for _ in result:
if output:
if (
not inspect.iscoroutine(_)
and not inspect.isasyncgen(_)
):
display.print(_)
else:
await async_resolve(_, output=output)
else:
results.append(await async_resolve(_))
return None if output else results
return result
[docs]
async def files_read(paths, num_workers=None, mode='r', silent=False):
"""
Read a list of files asynchronously with anyio.
:param paths: File paths to read.
:param num_workers: Number of workers, cpucount*2 by default.
:return: Dict of path=content
"""
result = dict()
async def file_read(path):
try:
async with aiofiles.open(str(path), mode) as f:
result[path] = await f.read()
except: # noqa
if not silent:
raise
queue = Queue(num_workers=num_workers)
await queue.run(*[file_read(path) for path in paths])
return {key: result[key] for key in sorted(result)}