Source code for cli2.command

import asyncio
import inspect

from docstring_parser import parse

from .argument import Argument
from .colors import colors
from .entry_point import EntryPoint


[docs]class Command(EntryPoint, dict): """Represents a command bound to a target callable.""" def __new__(cls, target, *args, **kwargs): overrides = getattr(target, 'cli2', {}) cls = overrides.get('cls', cls) return super().__new__(cls, *args, **kwargs) def __init__(self, target, name=None, color=None, doc=None, posix=False, outfile=None, log=True): self.target = target self.posix = posix self.parent = None overrides = getattr(target, 'cli2', {}) for key, value in overrides.items(): setattr(self, key, value) if name: self.name = name elif 'name' not in overrides: self.name = getattr(target, '__name__', type(target).__name__) self.parsed = parse(inspect.getdoc(target)) if doc: self.doc = doc elif 'doc' not in overrides: self.doc = '' if self.parsed.short_description: self.doc += self.parsed.short_description.replace('\n', ' ') if self.parsed.long_description: if self.doc: self.doc += '\n' self.doc += self.parsed.long_description.replace('\n', ' ') if color: self.color = color elif 'color' not in overrides: self.color = 'orange' self.sig = inspect.signature(target) self.setargs() EntryPoint.__init__(self, outfile=outfile, log=log)
[docs] def setargs(self): """Reset arguments.""" for name, param in self.sig.parameters.items(): overrides = getattr(self.target, 'cli2_' + name, {}) cls = overrides.get('cls', Argument) self[name] = cls(self, param) for key, value in overrides.items(): setattr(self[name], key, value)
@classmethod def cmd(cls, *args, **kwargs): def override(target): overrides = getattr(target, 'cli2', {}) overrides.update(kwargs) overrides['cls'] = cls target.cli2 = overrides if len(args) == 1 and not kwargs: # simple @YourCommand.cmd syntax target = args[0] override(target) return target elif not args: def wrap(cb): override(cb) return cb return wrap else: raise Exception('Only kwargs are supported by Group.cmd')
[docs] def help(self, error=None, short=False): """Show help for a command.""" if short: if self.doc: return self.doc.replace('\n', ' ').split('.')[0] return '' if error: self.print('RED', 'ERROR: ' + colors.reset + error, end='\n\n') self.print('ORANGE', 'SYNOPSYS') chain = [] current = self while current is not None: chain.insert(0, current.name) current = current.parent for arg in self.values(): chain.append(str(arg)) self.print(' '.join(map(str, chain)), end='\n\n') self.print('ORANGE', 'DESCRIPTION') self.print(self.doc) shown_posargs = False shown_kwargs = False for arg in self.values(): self.print() if not arg.iskw and not shown_posargs: self.print('ORANGE', 'POSITIONAL ARGUMENTS') shown_posargs = True varkw = arg.param.kind == arg.param.VAR_KEYWORD if (arg.iskw or varkw) and not shown_kwargs: self.print('ORANGE', 'NAMED ARGUMENTS') shown_kwargs = True arg.help()
[docs] def parse(self, *argv): """Parse arguments into BoundArguments.""" self.setargs() self.bound = self.sig.bind_partial() extra = [] for current in argv: taken = False for arg in self.values(): taken = arg.take(current) if taken: break if not taken: extra.append(current) if extra: return 'No parameters for these arguments: ' + ', '.join(extra)
[docs] def call(self, *args, **kwargs): """Execute command target with bound arguments.""" return self.target(*args, **kwargs)
def argskwargs(self): for name, arg in self.items(): if not arg.default: continue if name in self.bound.arguments: continue arg.value = arg.default return self.bound.args, self.bound.kwargs def __call__(self, *argv): """Execute command with args from sysargs.""" self.exit_code = 0 error = self.parse(*argv) if error: return self.help(error=error) args, kwargs = self.argskwargs() try: result = self.call(*args, **kwargs) except TypeError as exc: self.exit_code = 1 if hasattr(self.target, '__name__'): rep = getattr(self.target, '__name__') elif hasattr(self.target, '__call__'): rep = '__call__' error = str(exc) function = error.split(' ')[0].split('.')[-1] if function.startswith(rep + '('): return self.help(error=error.replace(rep + '()', self.name)) raise if inspect.iscoroutine(result): try: result = asyncio.run(result) except KeyboardInterrupt: print('exiting') import sys sys.exit(1) return result