"""
Base class for Ansible Actions.
"""
import asyncio
import chttpx
import cli2
import copy
import difflib
import os
import re
import traceback
from ansible.plugins.action import ActionBase
from ansible.plugins.action import display
from ansible.plugins.filter.core import to_nice_yaml
# colors:
# black
# bright gray
# blue
# white
# green
# cyan
# bright green
# red
# bright cyan
# purple
# bright red
# yellow
# bright purple
# dark gray
# magenta
# bright magenta
# normal
# 7-bit C1 ANSI sequences
ansi_escape = re.compile(r'''
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
''', re.VERBOSE)
UNSET_DEFAULT = '__UNSET__DEFAULT__'
[docs]
class Option:
"""
Ansible Option descriptor.
.. py:attribute:: arg
Name of the task argument to get this option value from
.. py:attribute:: fact
Name of the fact, if any, to get a value for this option if no task arg
is provided
.. py:attribute:: default
Default value, if any, in case neither of arg and fact were defined.
"""
UNSET_DEFAULT = UNSET_DEFAULT
def __init__(self, arg=None, fact=None, default=UNSET_DEFAULT,
cast_in=None, cast_out=None):
self.arg = arg
self.fact = fact
self.default = default
self.cast_in = cast_in
self.cast_out = cast_out
self.casted_in = False
@property
def kwargs(self):
kwargs = dict(default=self.default)
if self.arg:
kwargs['arg_name'] = self.arg
if self.fact:
kwargs['fact_name'] = self.fact
return kwargs
def __set__(self, obj, value):
if self.fact and self.fact not in obj.facts_initial:
obj.facts_initial[self.fact] = obj.task_vars.get(
self.fact,
self.UNSET_DEFAULT,
)
obj.facts_values[self.fact] = value
def __get__(self, obj, objtype=None):
if obj is None:
return self
if self.arg and self.arg in obj._task.args:
# args have priority
value = obj._task.args[self.arg]
elif self.fact and self.fact in obj.facts_values:
# if we're holding a cached fact then use that
value = obj.facts_values[self.fact]
elif self.fact and self.fact in obj.task_vars:
# get the fact and cache it
value = obj.task_vars[self.fact]
elif self.default != self.UNSET_DEFAULT:
value = copy.deepcopy(self.default)
if self.fact:
obj.facts_initial[self.fact] = None
else:
raise AnsibleOptionError(self)
if not self.casted_in and self.cast_in:
value = self.cast_in(value)
self.casted_in = True
if self.fact:
if self.fact not in obj.facts_initial:
# save initial value
obj.facts_initial[self.fact] = copy.deepcopy(value)
if self.fact not in obj.facts_values and cli2.mutable(value):
# that can't happen in __set__ for mutable values
obj.facts_values[self.fact] = value
return value
[docs]
class AnsibleError(Exception):
pass
[docs]
class AnsibleOptionError(AnsibleError):
def __init__(self, option):
self.option = option
super().__init__(option.kwargs)
@property
def message(self):
message = ['Missing']
if self.option.arg:
message.append(f'arg `{self.option.arg}`')
if self.option.fact:
message.append('or')
if self.option.fact:
message.append(f'fact `{self.option.fact}`')
return ' '.join(message)
[docs]
class ActionBase(ActionBase):
"""
Base action class
.. py:attribute:: result
Result dict that will be returned to Ansible
.. py:attribute:: task_vars
The task_vars that the module was called with
.. py:attribute:: client
The client object generated by :py:meth:`client_factory` if you
implement it.
.. py:attribute:: masked_keys
Declare a list of keys to mask:
.. code-block:: python
class AnsibleModule(ansible.ActionBase):
masked_keys = ['secret', 'password']
.. py:attribute:: mask
:py:class:`~cli2.mask.Mask` object
"""
mask_keys = Option(fact='mask_keys', default=set())
mask_values = Option(
fact='mask_values',
default=set(),
cast_in=set,
cast_out=list,
)
masked_keys = []
[docs]
def run(self, tmp=None, task_vars=None):
self.tmp = tmp
self.task_vars = task_vars
self.facts_values = dict()
self.facts_initial = dict()
self.result = super().run(tmp, task_vars)
asyncio.run(self.run_wrapped_async())
return self.result
def mask_init(self):
# use ansible template value renderer
self.mask.renderer = lambda value: self._templar.template(value)
self.facts_initial['mask_values'] = set([
self._templar.template(value)
for value in self.task_vars.get('mask_values', [])
])
def _mask_key(key):
self.mask.keys.add(key)
# discover values from facts
if value := self.task_vars.get(key, None):
self.mask.values.add(value)
# discover value from args
if value := self._task.args.get(key, None):
self.mask.values.add(value)
for key in self.mask_keys:
_mask_key(key)
for key in self.masked_keys:
_mask_key(key)
for value in self.facts_initial['mask_values']:
self.mask.values.add(value)
# reference self.mask.values to our own mask_values
type(self).mask_values.casted_in = True
self.facts_values['mask_values'] = self.mask.values
async def run_wrapped_async(self):
self.verbosity = self.task_vars.get('ansible_verbosity', 0)
if 'LOG_LEVEL' not in os.environ and 'DEBUG' not in os.environ:
if self.verbosity == 1:
os.environ['LOG_LEVEL'] = 'INFO'
elif self.verbosity >= 2:
os.environ['LOG_LEVEL'] = 'DEBUG'
cli2.configure()
try:
try:
self.client = await self.client_factory()
except NotImplementedError:
self.client = None
self.mask = cli2.Mask()
else:
self.mask = copy.deepcopy(self.client.mask)
self.mask_init()
await self.run_async()
except Exception as exc:
self.result['failed'] = True
if self.verbosity:
traceback.print_exc()
if isinstance(exc, AnsibleError):
self.result['error'] = exc.message
elif isinstance(exc, chttpx.ResponseError):
self.result.update(dict(
method=exc.method,
url=exc.url,
status_code=exc.status_code,
))
key, value = self.client.response_log_data(exc.response)
if key:
self.result[f'response_{key}'] = value
key, value = self.client.request_log_data(exc.request)
if key:
self.result[f'request_{key}'] = value
# for pytest to raise
self.exc = exc
finally:
changed = dict()
for key, value in self.facts_values.items():
initial_value = self.facts_initial[key]
if value and value != initial_value:
changed[key] = value
if changed:
if 'ansible_facts' not in self.result:
self.result['ansible_facts'] = dict()
for key, value in changed.items():
for name in dir(type(self)):
option = getattr(type(self), name)
if not isinstance(option, Option):
continue
if option.fact == key and option.cast_out:
changed[key] = option.cast_out(value)
self.result['ansible_facts'].update(changed)
if self.mask and self.verbosity:
# this task has a mask, so it's probably a no_log, we're
# running verbose, print masked result
self.print_yaml(self.result)
if (
self._before_data != UNSET_DEFAULT
and self._after_data != UNSET_DEFAULT
):
diff = difflib.unified_diff(
to_nice_yaml(
self.mask(self._before_data)
).splitlines(),
to_nice_yaml(
self.mask(self._after_data)
).splitlines(),
self._before_label,
self._after_label,
)
cli2.diff(diff)
[docs]
def print_yaml(self, data):
"""
Render data as masked yaml.
"""
data = self.mask(data)
yaml = to_nice_yaml(data)
rendered = cli2.yaml_highlight(yaml)
self.print(rendered, mask=False)
[docs]
def print(self, data, mask=True):
"""
Print that masks by default.
"""
if mask:
data = self.mask(data)
display.display(data)
[docs]
async def run_async(self):
"""
The method you are supposed to implement.
It should:
- provision the :py:attr:`result` dict
- find task_vars in :py:attr:`task_vars`
"""
[docs]
async def client_factory(self):
"""
Return a client instance.
:raise NotImplementedError: By default
"""
raise NotImplementedError()
[docs]
@classmethod
async def run_test_async(cls, args=None, facts=None, client=None,
fail=False):
"""
Test run the module in a mocked context.
:param args: Dict of task arguments
:param facts: Dict of play facts
:param client: Client instance, overrides the factory
:param fail: Allow this test to fail without exception
"""
from unittest import mock
from ansible.template import Templar
from ansible.parsing.dataloader import DataLoader
loader = DataLoader()
obj = cls(
task=mock.Mock(),
connection=mock.Mock(),
play_context=mock.Mock(),
loader=loader,
templar=Templar(loader, variables=facts),
shared_loader_obj=mock.Mock(),
)
obj.facts_values = dict()
obj.facts_initial = dict()
obj.tmp = None
obj.result = dict()
obj._task.args = args or {}
obj.task_vars = facts or {}
obj.task_vars.setdefault('ansible_verbosity', 2)
obj.exc = False
if client:
async def _factory():
return client
obj.client_factory = _factory
old = obj.client_factory
async def set_tries():
client = await old()
client.handler.tries = 0
return client
obj.client_factory = set_tries
await obj.run_wrapped_async()
if obj.exc and not fail:
raise obj.exc
if obj.result.get('failed', False) and not fail:
raise Exception('Module failed, and fail is not True {obj.result}')
return obj
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._before_data = UNSET_DEFAULT
self._after_data = UNSET_DEFAULT
[docs]
def before_set(self, data, label='before'):
"""
Set the data we're going to display the diff for at the end.
:param data: Dictionnary of data
:param label: Label to show in diff
"""
self._before_data = data
self._before_label = label
[docs]
def after_set(self, data, label='after'):
"""
Set the data we're going to display the diff for at the end.
:param data: Dictionnary of data
:param label: Label to show in diff
"""
self._after_data = data
self._after_label = label
[docs]
def action_run(self, name, **kwargs):
"""
"""
new_task = self._task.copy()
new_task.args = kwargs
shell_action = self._shared_loader_obj.action_loader.get(
name,
task=new_task,
connection=self._connection,
play_context=self._play_context,
loader=self._loader,
templar=self._templar,
shared_loader_obj=self._shared_loader_obj,
)
return shell_action.run(task_vars=self.task_vars.copy())
[docs]
def subprocess_remote(self, cmd, callback=None, print=True, **kwargs):
"""
Execute a shell command on the remote in a masked context
:param cmd: Command to run
:param kwargs: Other shell args, such as creates etc
:param print: Wether to print stdout/stderr, defaults to True
:param callback: Callback function called before output is printed and
returned, use that to collect secrets and provision
self.mask_values.
"""
if self.verbosity:
display.display(
f'<{self.task_vars["inventory_hostname"]}> + '
+ self.mask(cmd),
color='blue',
)
result = self.action_run(
'ansible.builtin.shell',
_raw_params=cmd,
**kwargs,
)
if callback:
callback(result)
if self.verbosity and print:
if 'stderr_lines' in result:
self.print(result['stderr'])
if 'stdout_lines' in result:
self.print(result['stdout'])
result.pop('invocation')
return result