Source code for cansible.action

"""
Base class for Ansible Actions.
"""

import asyncio
import chttpx
import cli2
import copy
import difflib
import os
import re
import traceback

from ansible.plugins.action import ActionBase
from ansible.plugins.action import display
from ansible.plugins.filter.core import to_nice_yaml

# colors:
# black
# bright gray
# blue
# white
# green
# cyan
# bright green
# red
# bright cyan
# purple
# bright red
# yellow
# bright purple
# dark gray
# magenta
# bright magenta
# normal

# 7-bit C1 ANSI sequences
ansi_escape = re.compile(r'''
    \x1B  # ESC
    (?:   # 7-bit C1 Fe (except CSI)
        [@-Z\\-_]
    |     # or [ for CSI, followed by a control sequence
        \[
        [0-?]*  # Parameter bytes
        [ -/]*  # Intermediate bytes
        [@-~]   # Final byte
    )
''', re.VERBOSE)


UNSET_DEFAULT = '__UNSET__DEFAULT__'


[docs] class Option: """ Ansible Option descriptor. .. py:attribute:: arg Name of the task argument to get this option value from .. py:attribute:: fact Name of the fact, if any, to get a value for this option if no task arg is provided .. py:attribute:: default Default value, if any, in case neither of arg and fact were defined. """ UNSET_DEFAULT = UNSET_DEFAULT def __init__(self, arg=None, fact=None, default=UNSET_DEFAULT, cast_in=None, cast_out=None): self.arg = arg self.fact = fact self.default = default self.cast_in = cast_in self.cast_out = cast_out self.casted_in = False @property def kwargs(self): kwargs = dict(default=self.default) if self.arg: kwargs['arg_name'] = self.arg if self.fact: kwargs['fact_name'] = self.fact return kwargs def __set__(self, obj, value): if self.fact and self.fact not in obj.facts_initial: obj.facts_initial[self.fact] = obj.task_vars.get( self.fact, self.UNSET_DEFAULT, ) obj.facts_values[self.fact] = value def __get__(self, obj, objtype=None): if obj is None: return self if self.arg and self.arg in obj._task.args: # args have priority value = obj._task.args[self.arg] elif self.fact and self.fact in obj.facts_values: # if we're holding a cached fact then use that value = obj.facts_values[self.fact] elif self.fact and self.fact in obj.task_vars: # get the fact and cache it value = obj.task_vars[self.fact] elif self.default != self.UNSET_DEFAULT: value = copy.deepcopy(self.default) if self.fact: obj.facts_initial[self.fact] = None else: raise AnsibleOptionError(self) if not self.casted_in and self.cast_in: value = self.cast_in(value) self.casted_in = True if self.fact: if self.fact not in obj.facts_initial: # save initial value obj.facts_initial[self.fact] = copy.deepcopy(value) if self.fact not in obj.facts_values and cli2.mutable(value): # that can't happen in __set__ for mutable values obj.facts_values[self.fact] = value return value
[docs] class AnsibleError(Exception): pass
[docs] class AnsibleOptionError(AnsibleError): def __init__(self, option): self.option = option super().__init__(option.kwargs) @property def message(self): message = ['Missing'] if self.option.arg: message.append(f'arg `{self.option.arg}`') if self.option.fact: message.append('or') if self.option.fact: message.append(f'fact `{self.option.fact}`') return ' '.join(message)
[docs] class ActionBase(ActionBase): """ Base action class .. py:attribute:: result Result dict that will be returned to Ansible .. py:attribute:: task_vars The task_vars that the module was called with .. py:attribute:: client The client object generated by :py:meth:`client_factory` if you implement it. .. py:attribute:: masked_keys Declare a list of keys to mask: .. code-block:: python class AnsibleModule(ansible.ActionBase): masked_keys = ['secret', 'password'] .. py:attribute:: mask :py:class:`~cli2.mask.Mask` object """ mask_keys = Option(fact='mask_keys', default=set()) mask_values = Option( fact='mask_values', default=set(), cast_in=set, cast_out=list, ) masked_keys = []
[docs] def run(self, tmp=None, task_vars=None): self.tmp = tmp self.task_vars = task_vars self.facts_values = dict() self.facts_initial = dict() self.result = super().run(tmp, task_vars) asyncio.run(self.run_wrapped_async()) return self.result
def mask_init(self): # use ansible template value renderer self.mask.renderer = lambda value: self._templar.template(value) self.facts_initial['mask_values'] = set([ self._templar.template(value) for value in self.task_vars.get('mask_values', []) ]) def _mask_key(key): self.mask.keys.add(key) # discover values from facts if value := self.task_vars.get(key, None): self.mask.values.add(value) # discover value from args if value := self._task.args.get(key, None): self.mask.values.add(value) for key in self.mask_keys: _mask_key(key) for key in self.masked_keys: _mask_key(key) for value in self.facts_initial['mask_values']: self.mask.values.add(value) # reference self.mask.values to our own mask_values type(self).mask_values.casted_in = True self.facts_values['mask_values'] = self.mask.values async def run_wrapped_async(self): self.verbosity = self.task_vars.get('ansible_verbosity', 0) if 'LOG_LEVEL' not in os.environ and 'DEBUG' not in os.environ: if self.verbosity == 1: os.environ['LOG_LEVEL'] = 'INFO' elif self.verbosity >= 2: os.environ['LOG_LEVEL'] = 'DEBUG' cli2.configure() try: try: self.client = await self.client_factory() except NotImplementedError: self.client = None self.mask = cli2.Mask() else: self.mask = copy.deepcopy(self.client.mask) self.mask_init() await self.run_async() except Exception as exc: self.result['failed'] = True if self.verbosity: traceback.print_exc() if isinstance(exc, AnsibleError): self.result['error'] = exc.message elif isinstance(exc, chttpx.ResponseError): self.result.update(dict( method=exc.method, url=exc.url, status_code=exc.status_code, )) key, value = self.client.response_log_data(exc.response) if key: self.result[f'response_{key}'] = value key, value = self.client.request_log_data(exc.request) if key: self.result[f'request_{key}'] = value # for pytest to raise self.exc = exc finally: changed = dict() for key, value in self.facts_values.items(): initial_value = self.facts_initial[key] if value and value != initial_value: changed[key] = value if changed: if 'ansible_facts' not in self.result: self.result['ansible_facts'] = dict() for key, value in changed.items(): for name in dir(type(self)): option = getattr(type(self), name) if not isinstance(option, Option): continue if option.fact == key and option.cast_out: changed[key] = option.cast_out(value) self.result['ansible_facts'].update(changed) if self.mask and self.verbosity: # this task has a mask, so it's probably a no_log, we're # running verbose, print masked result self.print_yaml(self.result) if ( self._before_data != UNSET_DEFAULT and self._after_data != UNSET_DEFAULT ): diff = difflib.unified_diff( to_nice_yaml( self.mask(self._before_data) ).splitlines(), to_nice_yaml( self.mask(self._after_data) ).splitlines(), self._before_label, self._after_label, ) cli2.diff(diff)
[docs] def print_yaml(self, data): """ Render data as masked yaml. """ data = self.mask(data) yaml = to_nice_yaml(data) rendered = cli2.yaml_highlight(yaml) self.print(rendered, mask=False)
[docs] def print(self, data, mask=True): """ Print that masks by default. """ if mask: data = self.mask(data) display.display(data)
[docs] async def run_async(self): """ The method you are supposed to implement. It should: - provision the :py:attr:`result` dict - find task_vars in :py:attr:`task_vars` """
[docs] async def client_factory(self): """ Return a client instance. :raise NotImplementedError: By default """ raise NotImplementedError()
[docs] @classmethod async def run_test_async(cls, args=None, facts=None, client=None, fail=False): """ Test run the module in a mocked context. :param args: Dict of task arguments :param facts: Dict of play facts :param client: Client instance, overrides the factory :param fail: Allow this test to fail without exception """ from unittest import mock from ansible.template import Templar from ansible.parsing.dataloader import DataLoader loader = DataLoader() obj = cls( task=mock.Mock(), connection=mock.Mock(), play_context=mock.Mock(), loader=loader, templar=Templar(loader, variables=facts), shared_loader_obj=mock.Mock(), ) obj.facts_values = dict() obj.facts_initial = dict() obj.tmp = None obj.result = dict() obj._task.args = args or {} obj.task_vars = facts or {} obj.task_vars.setdefault('ansible_verbosity', 2) obj.exc = False if client: async def _factory(): return client obj.client_factory = _factory old = obj.client_factory async def set_tries(): client = await old() client.handler.tries = 0 return client obj.client_factory = set_tries await obj.run_wrapped_async() if obj.exc and not fail: raise obj.exc if obj.result.get('failed', False) and not fail: raise Exception('Module failed, and fail is not True {obj.result}') return obj
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._before_data = UNSET_DEFAULT self._after_data = UNSET_DEFAULT
[docs] def before_set(self, data, label='before'): """ Set the data we're going to display the diff for at the end. :param data: Dictionnary of data :param label: Label to show in diff """ self._before_data = data self._before_label = label
[docs] def after_set(self, data, label='after'): """ Set the data we're going to display the diff for at the end. :param data: Dictionnary of data :param label: Label to show in diff """ self._after_data = data self._after_label = label
[docs] def action_run(self, name, **kwargs): """ """ new_task = self._task.copy() new_task.args = kwargs shell_action = self._shared_loader_obj.action_loader.get( name, task=new_task, connection=self._connection, play_context=self._play_context, loader=self._loader, templar=self._templar, shared_loader_obj=self._shared_loader_obj, ) return shell_action.run(task_vars=self.task_vars.copy())
[docs] def subprocess_remote(self, cmd, callback=None, print=True, **kwargs): """ Execute a shell command on the remote in a masked context :param cmd: Command to run :param kwargs: Other shell args, such as creates etc :param print: Wether to print stdout/stderr, defaults to True :param callback: Callback function called before output is printed and returned, use that to collect secrets and provision self.mask_values. """ if self.verbosity: display.display( f'<{self.task_vars["inventory_hostname"]}> + ' + self.mask(cmd), color='blue', ) result = self.action_run( 'ansible.builtin.shell', _raw_params=cmd, **kwargs, ) if callback: callback(result) if self.verbosity and print: if 'stderr_lines' in result: self.print(result['stderr']) if 'stdout_lines' in result: self.print(result['stdout']) result.pop('invocation') return result