Source code for cli2.proc
"""
Asyncio subprocess wrapper featuring:
- Capture + live logging of stdout/stderr
- ANSI escape code cleaning for captured output: print colored output for
humans, have clean output in a variable for processing, log, cache... and
sending to LLMs!
- Separate start/wait methods for process control
Example usage:
.. code-block:: python
# pass shell command in a string for convenience
proc = cli2.Proc('foo bar')
# or as list, better when building commands
proc = await cli2.Proc('foo', 'bar')
# wait in async loop
await proc.wait()
.. note:: There are also start functions, sync and async, in case you want to
start the proc and wait later.
"""
import asyncio
import os
import shlex
import re
from .log import log
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
[docs]
class Proc:
"""
Asynchronous subprocess manager with advanced IO handling.
.. py:attribute:: args
Full command arguments list used to launch the process
.. py:attribute:: env
Dict of environment variables
.. py:attribute:: cwd
Working directory path to run the command in
.. py:attribute:: rc
Return Code: process exit code (available after process completes)
.. py:attribute:: out
Combined cleaned output with ANSI escape codes removed.
.. py:attribute:: out_ansi
Combined stdout/stderr output with ANSI codes preserved.
.. py:attribute:: stdout
Cleaned stdout output with ANSI escape codes removed.
.. py:attribute:: stderr
Cleaned stdout output with ANSI escape codes removed.
.. py:attribute:: stdout_ansi
Stdout output with ANSI escape codes preserved.
.. py:attribute:: stderr_ansi
Stderr output with ANSI escape codes preserved.
"""
def __init__(self, cmd, *args, quiet=False, inherit=True, timeout=None,
cwd=None, **env):
"""
:param cmd: Command string (will shlex split) or initial argument
:param args: Additional command arguments
:param quiet: Suppress live output printing (default: False)
:param inherit: Inherit parent environment variables (default: True)
:param timeout: Maximum execution time in seconds (default: None)
:param env: Additional environment variables to set
:type env: Environment variables.
"""
if args:
self.args = [cmd] + list(args)
else:
self.args = shlex.split(cmd)
self.cwd = cwd or os.getcwd()
self.quiet = quiet
self.env = dict()
if inherit:
self.env = os.environ.copy()
self.env.update(env)
self.out_raw = bytearray()
self.err_raw = bytearray()
self.raw = bytearray()
self.started = False
self.waited = False
self.timeout = timeout
self.rc = None
self.proc = None
[docs]
def clone(self):
"""
Create a new unstarted Proc instance with identical configuration.
:return: New Proc instance ready for execution
"""
return type(self)(
*self.args, quiet=self.quiet, inherit=True, timeout=self.timeout,
**self.env
)
@property
def cmd(self):
"""
Get/set the command as a shell-joinable string.
:getter: Returns shell-escaped command string
:setter: Parses and updates internal args list
:type: str
"""
return shlex.join(self.args)
@cmd.setter
def cmd(self, value):
self.args = shlex.split(value)
[docs]
async def start(self):
"""
Launch the subprocess asynchronously.
:return: Self reference for method chaining
:raises RuntimeError: If process is already started
"""
if self.started:
raise RuntimeError("Process already started")
if not self.quiet:
log.debug('cmd', cmd=self.cmd)
self.proc = await asyncio.create_subprocess_exec(
*[str(arg) for arg in self.args],
cwd=str(self.cwd),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={str(k): str(v) for k, v in self.env.items()},
)
self.started = True
self.stdout_task = asyncio.create_task(
self._handle_output(self.proc.stdout, 1)
)
self.stderr_task = asyncio.create_task(
self._handle_output(self.proc.stderr, 2)
)
return self
[docs]
async def wait(self):
"""
Wait for process completion with timeout handling.
Terminates process if timeout occurs. Gathers all output streams.
:return: Self reference for method chaining
"""
if not self.started:
await self.start()
try:
if self.timeout:
await asyncio.wait_for(self.proc.wait(), timeout=self.timeout)
else:
await self.proc.wait()
except asyncio.TimeoutError:
print(f"Process timed out after {self.timeout}s")
self.proc.terminate()
await self.proc.wait()
await asyncio.gather(self.stdout_task, self.stderr_task)
self.rc = self.proc.returncode
self.waited = True
return self
async def _handle_output(self, stream, fd):
"""
Internal method for stream handling.
:param stream: Output stream to monitor
:type stream: asyncio.StreamReader
:param fd: Stream identifier (1=stdout, 2=stderr)
:type fd: int
"""
while True:
line = await stream.readline()
if not line: # EOF
break
decoded_line = line.decode().rstrip()
if fd == 1: # stdout
self.out_raw.extend(line)
elif fd == 2: # stderr
self.err_raw.extend(line)
self.raw.extend(line)
if not self.quiet:
print(decoded_line)
@property
def stdout_ansi(self):
return self.out_raw.decode().rstrip()
@property
def stderr_ansi(self):
return self.err_raw.decode().rstrip()
@property
def out_ansi(self):
return self.raw.decode().rstrip()
@property
def stdout(self):
return ansi_escape.sub('', self.stdout_ansi)
@property
def stderr(self):
return ansi_escape.sub('', self.stderr_ansi)
@property
def out(self):
return ansi_escape.sub('', self.out_ansi)