"""Asyncio subprocess wrapper featuring:- Capture + live logging of stdout/stderr- ANSI escape code cleaning for captured output: print colored output for humans, have clean output in a variable for processing, log, cache... and sending to LLMs!- Separate start/wait methods for process controlExample usage:.. code-block:: python # pass shell command in a string for convenience proc = cli2.Proc('foo bar') # or as list, better when building commands proc = cli2.Proc('foo', 'bar') # run in sync mode (ie. for jinja2) proc.wait() # OR run in async loop await proc.waita() # You can chain proc = cli2.Proc('hi').wait() proc = await cli2.Proc('hi').waita().. note:: There are also start functions, sync and async, in case you want to start the proc and wait later."""importasyncioimportosimportshleximportreimportsubprocessfrom.logimportlogansi_escape=re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
[docs]classProc:""" Asynchronous subprocess manager with advanced IO handling. .. py:attribute:: args Full command arguments list used to launch the process .. py:attribute:: rc Return Code: process exit code (available after process completes) .. py:attribute:: out Combined cleaned output with ANSI escape codes removed. .. py:attribute:: out_ansi Combined stdout/stderr output with ANSI codes preserved. .. py:attribute:: stdout Cleaned stdout output with ANSI escape codes removed. .. py:attribute:: stderr Cleaned stdout output with ANSI escape codes removed. .. py:attribute:: stdout_ansi Stdout output with ANSI escape codes preserved. .. py:attribute:: stderr_ansi Stderr output with ANSI escape codes preserved. """def__init__(self,cmd,*args,quiet=False,inherit=True,timeout=None,**env):""" :param cmd: Command string (will shlex split) or initial argument :param args: Additional command arguments :param quiet: Suppress live output printing (default: False) :param inherit: Inherit parent environment variables (default: True) :param timeout: Maximum execution time in seconds (default: None) :param env: Additional environment variables to set :type env: Environment variables. """ifargs:self.args=[cmd]+list(args)else:self.args=shlex.split(cmd)self.quiet=quietself.env=dict()ifinherit:self.env=os.environ.copy()self.env.update(env)self.out_raw=bytearray()self.err_raw=bytearray()self.raw=bytearray()self.started=Falseself.waited=Falseself.timeout=timeoutself.rc=Noneself.proc=None
[docs]defclone(self):""" Create a new unstarted Proc instance with identical configuration. :return: New Proc instance ready for execution """returntype(self)(*self.args,quiet=self.quiet,inherit=True,timeout=self.timeout,**self.env)
@propertydefcmd(self):""" Get/set the command as a shell-joinable string. :getter: Returns shell-escaped command string :setter: Parses and updates internal args list :type: str """returnshlex.join(self.args)@cmd.setterdefcmd(self,value):self.args=shlex.split(value)
[docs]asyncdefstarta(self):""" Launch the subprocess asynchronously. :return: Self reference for method chaining :raises RuntimeError: If process is already started """ifself.started:raiseRuntimeError("Process already started")ifnotself.quiet:log.debug('cmd',cmd=self.cmd)self.proc=awaitasyncio.create_subprocess_exec(*[str(arg)forarginself.args],stdin=asyncio.subprocess.PIPE,stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE,env={str(k):str(v)fork,vinself.env.items()},)self.started=Trueself.stdout_task=asyncio.create_task(self._handle_output(self.proc.stdout,1))self.stderr_task=asyncio.create_task(self._handle_output(self.proc.stderr,2))returnself
[docs]asyncdefwaita(self):""" Wait for process completion with timeout handling. Terminates process if timeout occurs. Gathers all output streams. :return: Self reference for method chaining """ifnotself.started:awaitself.starta()try:ifself.timeout:awaitasyncio.wait_for(self.proc.wait(),timeout=self.timeout)else:awaitself.proc.wait()exceptasyncio.TimeoutError:print(f"Process timed out after {self.timeout}s")self.proc.terminate()awaitself.proc.wait()awaitasyncio.gather(self.stdout_task,self.stderr_task)self.rc=self.proc.returncodeself.waited=Truereturnself
asyncdef_handle_output(self,stream,fd):""" Internal method for stream handling. :param stream: Output stream to monitor :type stream: asyncio.StreamReader :param fd: Stream identifier (1=stdout, 2=stderr) :type fd: int """whileTrue:line=awaitstream.readline()ifnotline:# EOFbreakdecoded_line=line.decode().rstrip()iffd==1:# stdoutself.out_raw.extend(line)eliffd==2:# stderrself.err_raw.extend(line)self.raw.extend(line)ifnotself.quiet:print(decoded_line)
[docs]defstart(self):""" Start the subprocess synchronously without waiting for output. """ifself.started:raiseRuntimeError("Process already started")ifnotself.quiet:log.debug('cmd',cmd=self.cmd)self.proc=subprocess.Popen(self.args,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env=self.env,universal_newlines=True)self.started=True# Do NOT start output handling here; defer to waitreturnself
[docs]defwait(self):""" Wait for process completion synchronously with timeout handling. Collects output streams after waiting. """ifnotself.started:self.start()try:ifself.timeout:self.rc=self.proc.wait(timeout=self.timeout)else:self.rc=self.proc.wait()exceptsubprocess.TimeoutExpired:print(f"Process timed out after {self.timeout}s")self.proc.terminate()try:# Grace period for terminationself.rc=self.proc.wait(timeout=1)exceptsubprocess.TimeoutExpired:self.proc.kill()self.rc=self.proc.wait()# Final wait after kill# Collect output after process has finished or been terminatedstdout,stderr=self.proc.communicate()ifstdout:self.out_raw.extend(stdout.encode())self.raw.extend(stdout.encode())ifnotself.quiet:print(stdout.rstrip())ifstderr:self.err_raw.extend(stderr.encode())self.raw.extend(stderr.encode())ifnotself.quiet:print(stderr.rstrip())self.waited=Truereturnself