Source code for cli2.command

import asyncio
import inspect
import sys

from docstring_parser import parse

from . import display
from .argument import Argument
from .asyncio import async_resolve
from .colors import colors
from .entry_point import EntryPoint
from .overrides import Overrides


[docs] class Command(EntryPoint, dict): """Represents a command bound to a target callable.""" def __new__(cls, target, *args, **kwargs): overrides = getattr(target, 'cli2', {}) cls = overrides.get('cls', cls) return super().__new__(cls, *args, **kwargs) def __init__(self, target, name=None, color=None, doc=None, posix=False, help_hack=True, outfile=None, log=True, overrides=None): self.posix = posix self.parent = None self.help_hack = help_hack self._overrides = Overrides(overrides or dict()) self._overrides['_cli2']['factory'] = lambda: self self.target = target overrides = getattr(target, 'cli2', {}) for key, value in overrides.items(): setattr(self, key, value) if name: self.name = name elif 'name' not in overrides: self.name = getattr(target, '__name__', type(target).__name__) self.parsed = parse(inspect.getdoc(self.target)) if doc: self.doc = doc elif 'doc' not in overrides: if self.parsed.description: self.doc = self.parsed.description.strip() else: self.doc = '' if color: self.color = color elif 'color' not in overrides: self.color = 'orange' self.positions = dict() EntryPoint.__init__(self, outfile=outfile, log=log) self.args_set = False self.args_setting = False @property def overrides(self): return self._overrides @overrides.setter def overrides(self, value): self._overrides = Overrides(value) def get_overrides(self, name, target=None): target = target or self.target overrides = getattr(target, 'cli2_' + name, {}) for key, value in self.overrides[name].items(): overrides.setdefault(key, value) group = getattr(self, 'group', None) if group and name in group.overrides: for key, value in group.overrides[name].items(): overrides.setdefault(key, value) return overrides @property def target(self): target = self._target if not inspect.ismethod(target): return target if self.name == 'help': return target # let's allow overwriting a bound method's __self__ func_sig = inspect.signature(target.__func__) self_name = [*func_sig.parameters.keys()][0] if 'factory' in self.get_overrides(self_name, target): self.target = target = target.__func__ return target @target.setter def target(self, value): self._target = value @property def sig(self): return inspect.signature(self.target) def __getitem__(self, key): self._setargs() return super().__getitem__(key) def _setargs(self): if self.args_set or self.args_setting: return self.args_setting = True self.setargs() self.args_set = True
[docs] def setargs(self): """Reset arguments.""" for name, param in self.sig.parameters.items(): overrides = self.get_overrides(name) cls = overrides.get('cls', Argument) self[name] = cls(self, param) for key, value in overrides.items(): setattr(self[name], key, value)
@classmethod def cmd(cls, *args, **kwargs): def override(target): overrides = getattr(target, 'cli2', {}) overrides.update(kwargs) overrides['cls'] = cls target.cli2 = overrides if len(args) == 1 and not kwargs: # simple @YourCommand.cmd syntax target = args[0] override(target) return target elif not args: def wrap(cb): override(cb) return cb return wrap else: raise Exception('Only kwargs are supported by Group.cmd')
[docs] def help(self, error=None, short=False, missing=None): """Show help for a command.""" self._setargs() if short: if self.doc: return self.doc_short return '' if missing: error = ( f'missing {len(missing)} required argument' f'{"s" if len(missing) > 1 else ""}' f': {", ".join(missing)}' ) if error: self.print('RED', 'ERROR: ' + colors.reset + error, end='\n\n') self.print('ORANGE', 'SYNOPSYS') chain = [] current = self while current is not None: chain.insert(0, current.name) current = current.parent for arg in self.values(): if not arg.hide: chain.append(str(arg)) self.print(' '.join(map(str, chain)), end='\n\n') self.print('ORANGE', 'DESCRIPTION') self.print(self.doc) shown_posargs = False shown_kwargs = False for arg in self.values(): if arg.hide: continue self.print() varkw = arg.param.kind == arg.param.VAR_KEYWORD if not arg.iskw and not varkw and not shown_posargs: self.print('ORANGE', 'POSITIONAL ARGUMENTS') shown_posargs = True if (arg.iskw or varkw) and not shown_kwargs: self.print('ORANGE', 'NAMED ARGUMENTS') shown_kwargs = True arg.help()
[docs] def parse(self, *argv): """Parse arguments into BoundArguments.""" self._setargs() self.bound = self.sig.bind_partial() extra = [] for current in argv: taken = False for arg in self.values(): taken = arg.take(current) if taken: break if not taken: extra.append(current) if extra: return 'No parameters for these arguments: ' + ', '.join(extra) for name, arg in self.items(factories=None): if arg.factory: if self.async_function(arg.factory): arg.value = 'to_be_computed' else: arg.value = arg.factory_value() continue if not arg.default: continue if name in self.bound.arguments: continue arg.value = arg.default
[docs] def async_function(self, function): """ Return True if function is async """ return ( inspect.iscoroutinefunction(function) or inspect.isasyncgenfunction(function) )
[docs] def async_mode(self): """ Return True if any callable we'll deal with is async """ for arg in self.values(): if self.async_function(arg.factory): return True if ( self.async_function(self.target) or self.async_function(self.post_call) ): return True for name, arg in self.items(factories=True): if arg.factory and self.async_function(arg.factory): return True return False
[docs] def call(self, *args, **kwargs): """Execute command target with bound arguments.""" return self.target(*args, **kwargs)
def missing(self): return [ name for name, arg in self.items() if name not in self.bound.arguments and name not in self.bound.kwargs and arg.param.default == arg.param.empty and arg.param.kind in ( arg.param.POSITIONAL_ONLY, arg.param.POSITIONAL_OR_KEYWORD, ) ] def __call__(self, *argv): """Execute command with args from sysargs.""" self.exit_code = 0 if self.help_hack and '--help' in argv: self.exit_code = 1 return self.help() if self.async_mode(): return asyncio.run(self.async_call(*argv)) error = self.parse(*argv) if error: self.exit_code = 1 return self.help(error=error) missing = self.missing() if missing: self.exit_code = 1 return self.help(missing=missing) try: result = self.call(*self.bound.args, **self.bound.kwargs) if inspect.isgenerator(result): for _ in result: display.print(_) result = None except KeyboardInterrupt: print('exiting') sys.exit(1) finally: self.post_result = self.post_call() return result
[docs] async def async_call(self, *argv): """ Call with async stuff in single event loop """ error = self.parse(*argv) if error: self.exit_code = 1 return self.help(error=error) missing = self.missing() if missing: self.exit_code = 1 return self.help(missing=missing) factories = self.values(factories=True) if factories: results = await asyncio.gather(*[ async_resolve(arg.factory_value()) for arg in factories ]) for _, arg in enumerate(factories): arg.value = results[_] try: result = self.call(*self.bound.args, **self.bound.kwargs) result = await async_resolve(result, output=True) except KeyboardInterrupt: print('exiting') sys.exit(1) finally: self.post_result = await async_resolve(self.post_call()) return result
[docs] def ordered(self, factories=False): """ Order the parameters by priority. :param factories: Show only arguments with factory. """ return {key: self[key] for key in self.keys(factories=factories)}
[docs] def values(self, factories=False): """ Return ordered values. :param factories: Show only arguments with factory. """ return self.ordered(factories=factories).values()
[docs] def items(self, factories=False): """ Return ordered items. :param factories: Show only arguments with factory. """ return self.ordered(factories=factories).items()
[docs] def keys(self, factories=False): """ Return ordered keys. :param factories: Show only arguments with factory. """ self._setargs() order = ( inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.KEYWORD_ONLY, inspect.Parameter.VAR_KEYWORD, ) keys = [] for kind in order: for name, arg in super().items(): if factories is False and arg.factory: continue if factories is True and not arg.factory: continue if name in self.positions: continue if arg.param.kind == kind: keys.append(name) for key, position in self.positions.items(): if factories and not self[key].factory: continue keys.insert(position, key) return keys
def __iter__(self): return self.ordered().__iter__()
[docs] def arg( self, name, *, kind: str = None, position: int = None, doc=None, color=None, default=inspect.Parameter.empty, annotation=inspect.Parameter.empty, ): """ Inject new :py:class:`~cli2.argument.Argument` into this command. The new argument will appear in documentation, but won't be bound to the callable: it will only be avalaible in `self`. For example, you are deleting an "http_client" argument in :py:meth:`setargs()` so that it doesn't appear to the CLI user, to whom you want to expose a couple of arguments such as "base_url" and "ssl_verify" that you are adding programatically with this method, so that you can use `self['base_url'].value` and `self['ssl_verify'].value` in to generate a "http_client" argument in :py:meth:`call()`. The tutorial has a more comprehensive example in the "CLI only arguments" section. :param name: Name of the argument to add :param kind: Name of the inspect parameter kind :param position: Position of the argument in the CLI :param doc: Documentation for the argument :param color: Color of the argument :param default: Default value for the argument :param annotation: Type of argument """ self[name] = Argument( self, inspect.Parameter( name, kind=getattr( inspect.Parameter, kind or "POSITIONAL_OR_KEYWORD", ), default=default, annotation=annotation, ), doc=doc, color=color, ) if position is not None: self.positions[name] = position
[docs] def post_call(self): """ Implement your cleaner here """ pass