"""
HTTP Client boilerplate code to conquer the world.
"""
import asyncio
import copy
import httpx
import inspect
import json
import math
import os
import ssl
import yaml
from datetime import datetime
from pathlib import Path
from urllib.parse import parse_qs
try:
import truststore
except ImportError:
truststore = None
from cli2 import display
from cli2.asyncio import async_resolve
from cli2.cli import Argument, Command, Group, cmd, hide
from cli2.colors import colors
from cli2.log import log
from cli2.mask import Mask
__all__ = [
'ClientError',
'ResponseError',
'TokenGetError',
'RefusedResponseError',
'RetriesExceededError',
'FieldError',
'FieldValueError',
'FieldExternalizeError',
'VirtualField',
'Client',
'ClientCommand',
'DateTimeField',
'Field',
'Handler',
'JSONStringField',
'Model',
'ModelCommand',
'Paginator',
'Related',
]
[docs]
class Paginator:
"""
Generic pagination class.
Should work with most paginations by default, if you're extending this then
override:
- :py:meth:`~Paginator.pagination_initialize`
- :py:meth:`~Paginator.pagination_parameters`
.. py:attribute:: total_pages
Total number of pages.
.. py:attribute:: total_items
Total number of items.
.. py:attribute:: per_page
Number of items per page
.. py:attribute:: url
The URL to query
.. py:attribute:: url
The URL to query
.. py:attribute:: params
Dictionnary of GET parameters
.. py:attribute:: model
:py:class:`Model` class or ``dict`` by default.
.. py:attribute:: callback
Async callback called for every item before filtering by expressions.
"""
def __init__(self, client, url, params=None, model=None, expressions=None,
callback=None):
"""
Initialize a paginator object with a client on a URL with parameters.
:param client: :py:class:`Client` object
:param url: URL to query
:param params: Dictionnary of GET parameters
:params model: Model class, can be a dict, or :py:class:`Model`
"""
self.client = client
self.url = url
self.params = params or {}
self.model = model or dict
self.page_start = 1
self.per_page = None
self.initialized = False
self.callback = callback
self.expressions = []
for expression in (expressions or []):
if not isinstance(expression, Expression):
expression = Filter(expression)
self.expressions.append(expression)
self._total_pages = None
self._total_items = None
self._reverse = False
[docs]
def reverse(self):
"""
Return a copy of this :py:class:`Paginator` object to iterate in
reverse order.
For this to work, :py:meth:`pagination_initialize` **must** set
:py:attr:`per_page` and either of :py:attr:`total_pages` or
:py:attr:`total_items`, which is up to you to implement.
"""
obj = copy.copy(self)
obj._reverse = True
return obj
[docs]
async def last_item(self):
"""
Return the last item of a paginated request.
"""
self.initialized or await self.initialize()
items = await self.page_items(self.total_pages)
return items[-1]
@property
def total_items(self):
return self._total_items
@total_items.setter
def total_items(self, value):
self._total_items = value
@property
def total_pages(self):
if self._total_pages:
return self._total_pages
if self.total_items and self.per_page:
self._total_pages = math.ceil(self.total_items / self.per_page)
return self._total_pages
@total_pages.setter
def total_pages(self, value):
self._total_pages = value
[docs]
async def call(self, callback):
"""
Call an async callback for each item
:param callback: Function to call for every item.
"""
async for item in self.__aiter__(callback=callback):
pass
[docs]
async def list(self):
""" Return casted list of items """
self.results = []
async for item in self:
self.results.append(item)
return self.results
[docs]
async def initialize(self, response=None):
"""
This method is called once when we get the first response.
:param response: First response object
"""
if not response:
response = await self.page_response(1)
data = response.json()
if isinstance(data, list):
# we won't figure max page
self.initialized = True
return
self.pagination_initialize(data)
if not self.per_page:
self.per_page = len(self.data_items(data))
self.initialized = True
[docs]
def response_items(self, response):
"""
Parse a response and return a list of model items.
:param response: Response to parse
"""
try:
data = response.json()
except json.JSONDecodeError:
return []
return self.data_items(data)
[docs]
def data_items(self, data):
"""
Given response data, return items.
:param data: Response JSON data
"""
items_list = []
if isinstance(data, list):
items_list = data
elif isinstance(data, dict):
for key, value in data.items():
if isinstance(value, list):
items_list = value
break
items = [self.model(item) for item in items_list]
if not self.per_page:
self.per_page = len(items_list)
return items
def python_filter(self):
filters = [
f
for f in self.expressions
if not f.parameterable
]
if filters:
return And(*filters)
[docs]
async def page_items(self, page_number):
"""
Return the items of a given page number.
:param page_number: Page number to get the items from
"""
try:
return self.response_items(await self.page_response(page_number))
except NotImplementedError:
# pagination_parameters not implemented, can't paginate
return []
[docs]
async def page_response(self, page_number):
"""
Return the response for a page.
:param page_number: Page number to get the items from
"""
params = self.params.copy()
if page_number > 1:
try:
self.pagination_parameters(params, page_number)
except NotImplementedError:
raise
for expression in self.expressions:
if expression.parameterable:
expression.params(params)
response = await self.client.get(self.url, params=params, quiet=True)
if not self.initialized:
await self.initialize(response)
return response
async def __aiter__(self, callback=None):
"""
Asynchronous iterator.
"""
callback = callback or self.callback
if self._reverse and not self.total_pages:
first_page_response = await self.page_response(1)
page = self.total_pages
if not self.total_pages:
raise Exception('Reverse pagination without total_pages')
else:
page = self.page_start
python_filter = self.python_filter()
async def yielder(items):
if callback:
await asyncio.gather(*[callback(item) for item in items])
for item in items:
if not python_filter or python_filter.matches(item):
yield item
while items := await self.page_items(page):
if not items:
continue
if self._reverse:
items = reversed(items)
async for item in yielder(items):
yield item
if self._reverse:
page -= 1
if not page:
break
if page == 1:
# use cached first page response
items = self.response_items(first_page_response)
async for item in yielder(reversed(items)):
yield item
break
else:
if page == self.total_pages:
break
page += 1
[docs]
async def first(self):
""" Return first item """
async for item in self:
return item
[docs]
class Field:
"""
Field descriptor for models.
The basic Field descriptor manages (get and set) data from within the
:py:attr:`Model.data` JSON.
Since sub-classes are going to convert data, we need to understand the
concepts of internal and external data:
- external: the Python value, this can be any kind of Python object
- internal: the Python representation of the JSON value, this can be any
Python type that will work given json.dumps
- internalizing: process of converting a Python value into a JSON one
- externalizing: process of converting a JSON value into something Python
.. py:attribute:: data_accessor
Describes how to access the Field's data inside the model's data dict.
If data_accessor is ``foo`` then this field will control the ``foo``
key in the model's data dict.
Use ``/`` to separate keys when nesting, if data_accessor is
``foo/bar`` then it will control the ``bar`` key of the ``foo`` dict in
the model's data dict.
.. py:attribute:: parameter
Name of the GET parameter on the model's :py:attr:`Model.url_list`, if
any. So that the filter will be converted to a GET parameter.
Otherwise, filtering will happen in Python.
.. py:attribute:: callback
Callback function to define a default value. Any field with a callback
will provision the :py:attr:`Model.data` dict automatically.
"""
def __init__(self, data_accessor=None, parameter=None, callback=None):
self.data_accessor = data_accessor
self.parameter = parameter
self.callback = callback
self.callback_dependencies = []
def __get__(self, obj, objtype=None):
"""
Get the value of a field for an object.
A simple process:
- Get the internal value from :py:meth:`internal_get`
- Pass it through the :py:meth:`externalize` method prior to returning
it.
"""
if obj is None:
return self
data = self.internal_get(obj)
return self.externalize(obj, data)
def __set__(self, obj, value):
"""
Set the value in the internal :py:attr:`Model.data` dict.
A two-step process:
- Use :py:meth:`internalize` to convert the Python external value into
a Python representation of the JSON value
- Use :py:meth:`internal_set` to actually set the internal
:py:attr:`Model.data`
"""
try:
old_value = getattr(obj, self.name)
if self.name not in obj.changed_fields and value != old_value:
obj.changed_fields[self.name] = old_value
except FieldExternalizeError:
obj.changed_fields[self.name] = None
value = self.internalize(obj, value)
self.internal_set(obj, value)
[docs]
def internal_get(self, obj):
"""
Get the "raw" value from the object, which is a Python representation
of the JSON internal value, using :py:attr:`data_accessor`.
"""
data = obj.data
if not isinstance(data, dict):
return self.__get__(data, type(data))
try:
data = self.get(obj.data)
except KeyError:
return None
return data
def get(self, data):
for key in self.data_accessor.split('/'):
data = data[key]
return data
def is_set(self, obj):
try:
self.get(obj._data)
except KeyError:
return False
else:
return True
[docs]
def internal_set(self, obj, value):
"""
Using :py:attr:`data_accessor`, set the value in :py:attr:`Model.data`
"""
parts = self.data_accessor.split('/')
data = obj.data
for number, key in enumerate(parts, start=1):
if number == len(parts):
data[key] = value
break
if key not in data:
data[key] = dict()
data = data[key]
[docs]
def internalize(self, obj, value):
"""
Transform external value into JSON internal value.
Any kind of processing from the Python value to the JSON value can be
done in this method.
"""
return value
[docs]
def externalize(self, obj, value):
"""
Transform internal JSON value to Python value, based on
:py:attr:`data_accessor`.
Any kind of processing from the JSON value to the Python value can be
done in this method.
"""
return value
[docs]
def mark_dirty(self, obj):
"""
Tell the model that the data must be cleaned.
"""
obj._dirty_fields.append(self)
[docs]
def clean(self, obj):
"""
Clean the data.
Called by the Model when requested a :py:attr:`data`, this method:
- Gets the externalized value from :py:meth:`__get__`
- Convert it into a JSON object with :py:meth:`internalize`
- Uses :py:meth:`internal_set` to update :py:attr:`Model.data`
"""
externalized = self.__get__(obj)
internalized = self.internalize(obj, externalized)
self.internal_set(obj, internalized)
def __eq__(self, value):
return Equal(self, value)
def __gt__(self, value):
return GreaterThan(self, value)
def __lt__(self, value):
return LesserThan(self, value)
def startswith(self, value):
return StartsWith(self, value)
def factory(self, *args):
def _(callback):
self.callback = callback
return callback
if isinstance(args[0], Field):
self.callback_dependencies = args
return _
else:
return _(args[0])
[docs]
class VirtualField(Field):
def __get__(self, obj, objtype=None):
"""
Get the value of a virtual field for an object.
"""
if obj is None:
return self
return obj._data_virtual.get(self.name)
def __set__(self, obj, value):
"""
Set a value that must not end up in :py:attr:`Model.data` dict.
"""
if self.name in obj._data_virtual:
old_value = self.__get__(obj)
if old_value != value:
obj.changed_fields[self.name] = old_value
obj._data_virtual[self.name] = value
def is_set(self, obj):
return self.name in obj._data_virtual
class MutableField(Field):
"""
Base class for mutable value fields like :py:class:`JSONStringField`
Basically, this field:
- caches the externalized value, so that you can mutate it
- marks the field as dirty so you get the internalized mutated value that
next time you get the :py:attr:`Model.data`
"""
def cache_set(self, obj, value):
"""
Cache a computed value for obj
:param obj: Model object
"""
obj._field_cache[self.name] = value
def cache_get(self, obj):
"""
Return cached value for obj
:param obj: Model object
"""
return obj._field_cache[self.name]
def __get__(self, obj, objtype=None):
"""
Return safely mutable value.
If the value is not found in cache, externalize the internal value and
cache it.
Always mark the field as dirty given the cached external data may
mutate.
"""
if not obj:
return super().__get__(obj, objtype)
try:
return self.cache_get(obj)
except KeyError:
externalized = self.externalize(obj, self.internal_get(obj))
self.cache_set(obj, externalized)
return externalized
finally:
if not obj._data_updating:
self.mark_dirty(obj)
def __set__(self, obj, value):
"""
Cache the value prior to setting it normally.
"""
self.cache_set(obj, value)
super().__set__(obj, value)
[docs]
class JSONStringField(MutableField):
"""
Yes, some proprietary APIs have JSON fields containing JSON strings.
This Field is the cure the world needed for that disease.
.. py:attribute:: options
Options dict for json.dumps, ie. ``options=dict(indent=4)``
"""
def __init__(self, *args, options=None, **kwargs):
super().__init__(*args, **kwargs)
self.options = options or dict()
[docs]
def internalize(self, obj, data):
return json.dumps(data, **self.options)
[docs]
def externalize(self, obj, value):
if value == '':
return value
return json.loads(value)
[docs]
class DateTimeField(Field):
"""
JSON has no datetime object, which means different APIs will serve us
different formats in string variables.
Heck, I'm pretty sure there are even some APIs which use different formats.
This is the cure the world needed against that disease.
.. py:attribute:: fmt
The datetime format for Python's strptime/strftime.
.. py:attribute:: fmts
A list of formats, in case you don't have one. This list will be
populated with :py:attr:`default_fmts` by default.
.. py:attribute:: default_fmts
A class property containing a list of formats we're going to try to
figure `fmt` and have this thing "work by default". Please contribute
to this list with different formats.
"""
iso_fmt = '%Y-%m-%dT%H:%M:%S.%f'
default_fmts = [
iso_fmt,
'%Y-%m-%dT%H:%M:%S',
]
def __init__(self, *args, fmt=None, fmts=None, **kwargs):
super().__init__(*args, **kwargs)
self.fmt = fmt
self.fmts = fmts
if not self.fmt and not self.fmts:
self.fmts = self.default_fmts
[docs]
def externalize(self, obj, value):
"""
Convert the internal string into an external datetime.
"""
if self.fmt:
return datetime.strptime(value, self.fmt)
# try a bunch of formats and hope for the best
for fmt in self.default_fmts:
try:
value = datetime.strptime(value, fmt)
except (ValueError, TypeError):
continue
else:
self.fmt = fmt
return value
raise FieldExternalizeError(
f'Could not figure how to parse {value}, use fmt option',
self, obj, value,
)
[docs]
def internalize(self, obj, value):
"""
Convert a datetime into an internal string.
"""
if isinstance(value, str):
return value
return value.strftime(self.fmt or self.iso_fmt)
[docs]
class ModelCommand(Command):
"""
Command class for Model class.
"""
def __init__(self, target, *args, **kwargs):
# unbound method by force
target = getattr(target, '__func__', target)
super().__init__(target, *args, **kwargs)
self.overrides['self']['factory'] = self.get_object
self.overrides['cls']['factory'] = self.get_model
self.client = None
[docs]
def setargs(self):
"""
ModelCommand setargs which calls setargs on the client class and
defines an id argument for object commands.
"""
super().setargs()
self.group.parent.client_class.setargs(self)
if 'self' in self:
self.arg('id', position=0, kind='POSITIONAL_ONLY', doc='ID')
[docs]
async def factories_resolve(self):
"""
Return a client object from it's factory, will all args resolved.
"""
# create a hidden Argument to use it's factory caller
argument = Argument(
self,
inspect.Parameter('_', kind=inspect.Parameter.POSITIONAL_ONLY),
factory=self.client_class.factory,
)
# this ensures the factory gets any kind of args
factory = argument.factory_value(self)
self.client = await async_resolve(factory)
await super().factories_resolve()
[docs]
async def get_model(self):
""" Return a client instance bound model """
return getattr(self.client, self.model.__name__)
async def get_object(self):
model = await self.get_model()
return await model.get(id=self['id'].value)
[docs]
async def post_call(self):
if self.client:
await self.client.post_call(self)
class ModelMetaclass(type):
def __new__(cls, name, bases, attributes):
if 'Paginator' in attributes:
attributes['paginator'] = attributes['Paginator']
cls = super().__new__(cls, name, bases, attributes)
client_class = getattr(cls, '_client_class', None)
cls.cmdclass = type(
'ModelCommand',
(cls.cmdclass,),
dict(
model=cls,
client_class=client_class,
),
)
client = getattr(cls, 'client', None)
if client:
if not cls.paginator:
cls.paginator = client.paginator
return cls
if client_class:
client_class.models.append(cls)
cls._fields = dict()
def process_cl(cl):
for key, obj in cl.__dict__.items():
if not isinstance(obj, Field):
continue
if not obj.data_accessor:
obj.data_accessor = key
cls._fields[key] = obj
obj.name = key
process_cl(cls)
def process_bases(cl):
for base in cl.__bases__:
process_cl(base)
process_bases(base)
process_bases(cls)
return cls
@property
def cli(cls):
if '_cli' not in cls.__dict__:
cli_kwargs = dict(
name=cls.__name__.lower(),
cmdclass=cls.cmdclass,
)
doc = inspect.getdoc(cls)
if doc != inspect.getdoc(Model):
cli_kwargs['doc'] = doc
cli_kwargs.update(cls.cli_kwargs)
cls._cli = Group(**cli_kwargs)
cls._cli.load(cls)
return cls._cli
[docs]
class Model(metaclass=ModelMetaclass):
"""
You should never call this class directly, instead, get it from the
:py:class:`Client` object after decorating your model classes with a
client as such:
.. py:attribute:: paginator
:py:class:`Paginator` class, you can leave it by default and just
implement :py:meth:`pagination_initialize` and
:py:meth:`pagination_parameters`.
.. py:attribute:: url_list
The URL to get the list of objects, you're supposed to configure it as
a model attribute in your model subclass.
This may be a format string using a client ``client`` variable.
.. py:attribute:: url_detail
The URL to get the details of an object, you're supposed to configure
it as a model attribute in your model subclass.
.. py:attribute:: id_field
Name of the field that should be used as resource identifier, `id` by
default.
.. py:attribute:: url
Object URL based on :py:attr:`url_detail` and :py:attr:`id_field`.
.. py:attribute:: cli_kwargs
Dict of kwargs to use to create the :py:class:`~cli2.cli.Group` for
this model.
.. py:attribute:: cmdclass
:py:class:`ModelCommand` subclass. You generally don't need to
define this, instead, you should do what you need in the
:py:meth:`Client.factory`, :py:meth:`Client.setargs` and
:py:meth:`Client.post_call` methods.
"""
paginator = None
cmdclass = ModelCommand
url_list = None
url_detail = '{self.url_list}/{self.id_value}'
id_field = 'id'
cli_kwargs = dict()
def __init__(self, data=None, **values):
"""
Instanciate a model.
:param data: JSON Data
"""
self._data = data or dict()
self._data_virtual = data or dict()
self._data_updating = False
self._dirty_fields = []
self._field_cache = dict()
self.changed_fields = dict()
for key, value in values.items():
setattr(self, key, value)
# actually reset that
self.changed_fields = dict()
@property
def data(self):
"""
Just ensure we update dirty data prior to returning the data dict.
"""
if not self._data_updating:
self._data_updating = True
if self._dirty_fields:
for field in self._dirty_fields:
field.clean(self)
self._dirty_fields = []
self._data_callbacks()
self._data_updating = False
return self._data
def _data_callbacks(self):
missing = []
done = []
for name, field in self._fields.items():
if not field.callback or field.is_set(self):
continue
ready = True
for dependency in field.callback_dependencies:
if not dependency.is_set(self):
missing.append(dependency)
ready = False
if ready:
setattr(self, name, field.callback(self))
done.append(field)
for _ in missing:
if _ in done:
self._data_callbacks()
@data.setter
def data(self, value):
self._data = value
@property
def data_masked(self):
return self.client.mask(self.data)
[docs]
@classmethod
@hide('expressions')
@cmd(color='green', condition=lambda cls: cls.url_list)
def find(cls, *expressions, **params):
"""
Find objects filtered by GET params
:param params: GET URL parameters
:param expressions: :py:class:`Expression` list
"""
return cls.paginate(cls.url_list, *expressions, **params)
[docs]
@classmethod
def paginate(cls, url, *expressions, **params):
"""
Return a :py:class:`Paginator` based on :py:attr:`url_list`
:param expressions: :py:class:`Expression` list
"""
return cls.paginator(cls.client, url, params, cls, expressions)
@property
def cli2_display(self):
return self.data
@property
def url(self):
if 'url_list' in self.url_detail and not self.url_list:
raise Exception(f'{type(self).__name__}.url_list not set')
return self.url_detail.format(self=self)
[docs]
@cmd(color='red', condition=lambda cls: cls.url_list)
async def delete(self):
"""
Delete model.
DELETE request on :py:attr:`url`
"""
return await self.client.delete(self.url)
[docs]
@classmethod
@cmd(condition=lambda cls: cls.url_list, doc="""
POST request to create.
Example:
create name=foo
""")
async def create(cls, **kwargs):
"""
Instanciate a model with kwargs and run :py:meth:`save`.
"""
obj = cls(**kwargs)
await obj.save()
return obj
[docs]
@classmethod
@cmd(color='green', condition=lambda cls: cls.url_list, doc="""
Get a model based on kwargs.
Example:
get id=3
""")
async def get(cls, **kwargs):
"""
Instanciate a model with kwargs and run :py:meth:`hydrate`.
"""
obj = cls(**kwargs)
await obj.hydrate()
return obj
[docs]
async def hydrate(self, data=None):
"""
Refresh data with GET requset on :py:attr:`url_detail`
:param data: Data dict, otherwise will get it
"""
if data is None:
response = await self.client.get(self.url)
data = response.json()
self.data.update(data)
self.changed_fields = dict()
[docs]
async def save(self):
"""
Call :py:meth:`update` if `self.id` otherwise :py:meth:`instanciate`.
Then updates :py:attr:`data` based on the response.json if possible.
You might want to override this.
"""
if self.id_value:
return await self.update()
else:
return await self.instanciate()
[docs]
async def instanciate(self):
"""
POST :py:attr:`data` to :py:attr:`url_list`, update data with response
json.
You might want to override this.
"""
if not self.url_list:
raise Exception(f'{type(self).__name__}.url_list not set')
response = await self.client.post(self.url_list, json=self.data)
try:
data = response.json()
except json.JSONDecodeError:
pass
else:
await self.hydrate(data)
return response
[docs]
async def update(self):
"""
POST :py:attr:`data` to :py:attr:`url_list`, update data with response
json.
You might want to override this.
"""
response = await self.client.post(self.url, json=self.data)
try:
data = response.json()
except json.JSONDecodeError:
pass
else:
await self.hydrate(data)
return response
@property
def id_value(self):
"""
Return value of the :py:attr:`id_field`.
"""
return getattr(self, self.id_field)
class ClientMetaclass(type):
cli_kwargs = dict()
def __new__(cls, name, bases, attributes):
if 'Paginator' in attributes:
attributes['paginator'] = attributes['Paginator']
cls = super().__new__(cls, name, bases, attributes)
cls.cmdclass = type(
'ClientCommand',
(cls.cmdclass,),
dict(client_class=cls),
)
# bind ourself as _client_class to any inherited model
cls.Model = type('Model', (Model,), dict(_client_class=cls))
cls.models = []
return cls
@property
def cli(cls):
if '_cli' not in cls.__dict__:
cli_kwargs = dict(
name=cls.__name__.lower().replace('client', '') or 'client',
overrides=dict(
cls=dict(factory=lambda: cls),
self=dict(factory=lambda: cls())
),
cmdclass=cls.cmdclass,
)
doc = inspect.getdoc(cls)
if doc != inspect.getdoc(Client):
cli_kwargs['doc'] = doc
cli_kwargs.update(cls.cli_kwargs)
cli = Group(**cli_kwargs)
cli.client_class = cls
cli.load(cls)
cls._cli = cli
for model in cls.models:
group = model.cli
if group.name in cls._cli:
continue
group.client_class = cls
if len(group) > 1:
cls._cli[model.__name__.lower()] = group
return cls._cli
[docs]
class Handler:
"""
.. py:attribute:: tries
Number of retries for an un-accepted request prior to failing.
Default: 30
.. py:attribute:: backoff
Will sleep ``number_of_tries * backoff`` prior to retrying.
Default: `.1`
.. py:attribute:: accepts
Accepted status codes, you should always set this to ensure responses
with an unexpected status either retry or raise.
Default: range(200, 299)
.. py:attribute:: refuses
List of refused status codes, responses returning those will not retry
at all and raise directly.
Default: [400, 404]
.. py:attribute:: retokens
Status codes which trigger a new call of :py:meth:`~Client.token_get`
prior to a retry. Only one retry is done then by this handler,
considering that authenticating twice in a row is useless: there's a
problem in your credentials instead.
Default: [401, 403, 407, 511]
"""
retokens_defaults = [401, 403, 407, 511]
accepts_defaults = range(200, 299)
refuses_defaults = [400, 404]
tries_default = 30
backoff_default = .1
def __init__(self, accepts=None, refuses=None, retokens=None, tries=None,
backoff=None):
if retokens is None:
self.retokens = copy.copy(self.retokens_defaults)
else:
self.retokens = retokens
if accepts is None:
self.accepts = copy.copy(self.accepts_defaults)
else:
self.accepts = accepts
if refuses is None:
self.refuses = copy.copy(self.refuses_defaults)
else:
self.refuses = refuses
self.tries = self.tries_default if tries is None else tries
self.backoff = self.backoff_default if backoff is None else backoff
async def __call__(self, client, response, tries, log):
seconds = tries * self.backoff
if isinstance(response, Exception):
if tries >= self.tries:
raise response
# httpx session is rendered unusable after a TransportError
if isinstance(response, httpx.TransportError):
kwargs = dict(error=repr(response))
try:
response.request
except (RuntimeError, AttributeError):
pass
else:
kwargs['method'] = response.request.method
kwargs['url'] = str(response.request.url)
log.warn('reconnect', **kwargs)
await asyncio.sleep(seconds)
await client.client_reset()
return
if self.accepts:
if response.status_code in self.accepts:
return response
elif response.status_code not in self.refuses:
return response
if response.status_code in self.refuses:
raise RefusedResponseError(client, response, tries)
if tries >= self.tries:
raise RetriesExceededError(client, response, tries)
if response.status_code in self.retokens:
if tries:
# our authentication is just not working, no need to retry
raise TokenGetError(client, response, tries)
log.warn('retoken')
await client.token_reset()
log.warn(
'retry',
status_code=response.status_code,
tries=tries,
sleep=seconds,
)
await asyncio.sleep(seconds)
[docs]
class ClientError(Exception):
pass
[docs]
class ResponseError(ClientError):
"""
Beautiful Response Error class.
.. py:attribute:: response
httpx Response object
.. py:attribute:: request
httpx Request object
.. py:attribute:: status_code
Response status code
.. py:attribute:: url
Request url
.. py:attribute:: method
Request method
"""
def __init__(self, client, response, tries, msg=None):
self.client = client
self.response = response
self.tries = tries
self.msg = msg or getattr(self, 'msg', '').format(self=self)
super().__init__(self.enhance(self.msg))
@property
def request(self):
return self.response.request
@property
def method(self):
return str(self.request.method)
@property
def url(self):
return str(self.request.url)
@property
def status_code(self):
return str(self.response.status_code)
[docs]
def enhance(self, msg):
"""
Enhance an httpx.HTTPStatusError
Adds beatiful request/response data to the exception.
:param exc: httpx.HTTPStatusError
"""
output = [msg]
key, value = self.client.request_log_data(self.response.request)
request_msg = ' '.join([
str(self.response.request.method),
str(self.response.request.url),
])
output.append(
f'{colors.reset}{colors.bold}{request_msg}{colors.reset}',
)
if value:
output.append(display.render(value))
key, value = self.client.response_log_data(self.response)
output.append(
''.join([
colors.bold,
f'HTTP {self.response.status_code}',
colors.reset,
])
)
if value:
output.append(display.render(value))
return '\n'.join(output)
[docs]
class TokenGetError(ResponseError):
msg = 'Authentication failed after {self.tries} tries'
[docs]
class RefusedResponseError(ResponseError):
msg = 'Response {self.response} refused'
[docs]
class RetriesExceededError(ResponseError):
msg = 'Unacceptable response {self.response} after {self.tries} tries'
[docs]
class FieldError(ClientError):
pass
[docs]
class FieldValueError(FieldError):
def __init__(self, msg, field, obj, value):
super().__init__(msg)
self.obj = obj
self.field = field
self.value = value
[docs]
class FieldExternalizeError(FieldValueError):
pass
[docs]
class ClientCommand(Command):
"""
Client CLI command
.. py:attribute:: client
The client object that was constructed from :py:meth:`Client.factory`
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = None
[docs]
def setargs(self):
"""
Set a `self` factory of :py:meth:`Client.factory` method, and call
:py:meth:`Client.setargs`.
"""
super().setargs()
if 'self' in self:
self['self'].factory = self.client_class.factory
self.client_class.setargs(self)
[docs]
async def factories_resolve(self):
""" Set :py:attr:`client` after resolving factories. """
await super().factories_resolve()
if 'self' in self:
self.client = self['self'].value
[docs]
async def post_call(self):
""" Call :py:meth:`Client.post_call`. """
if self.client:
await self.client.post_call(self)
[docs]
class Client(metaclass=ClientMetaclass):
"""
HTTPx Client Wrapper
.. py:attribute:: paginator
:py:class:`Paginator` class
.. py:attribute:: semaphore
Optionnal asyncio semaphore to throttle requests.
.. py:attribute:: handler
A callback that will take responses objects and decide wether or not to
retry the request, or raise an exception, or return the request.
Default is a :py:class:`Handler`
.. py:attribute:: mask_keys
Use this class attribute to declare keys to mask:
.. code-block:: python
class YourClient(cli2.Client):
mask_keys = ['password', 'secret']
.. py:attribute:: cli
Generated :py:class:`~cli2.cli.Group` for this client.
Uses :py:attr:`cli_kwargs` to pass kwargs to the generated group.
Note that this is a cached property.
.. py:attribute:: cli_kwargs
Dict of overrides for the generated :py:class:`~cli2.cli.Group`.
Example:
.. code-block:: python
class YourClient(cli2.Client):
cli_kwargs = dict(cmdclass=YourCommandClass)
.. py:attribute:: cmdclass
:py:class:`ClientCommand` class or subclass. You usually won't have to
define this, instead, you should do what you need in the
:py:meth:`factory`, :py:meth:`setargs` and :py:meth:`post_call`
methods.
.. py:attribute:: debug
Enforce full logging: quiet requests are logged, masking does not
apply. This is also enabled with environment variable ``DEBUG``.
.. py:attribute:: mask
:py:class:`~cli2.mask.Mask` object
.. py:attribute:: models
Declared models for this Client.
"""
paginator = Paginator
models = []
semaphore = None
debug = False
cmdclass = ClientCommand
mask_keys = None
def __init__(self, *args, handler=None, semaphore=None, mask=None,
debug=False, **kwargs):
"""
Instanciate a client with httpx.AsyncClient args and kwargs.
"""
self._client = None
self._client_args = args
self._client_kwargs = kwargs
self._client_attrs = None
self.handler = handler or Handler()
self.semaphore = semaphore if semaphore else self.semaphore
self.mask = mask or Mask()
if self.mask_keys:
for key in self.mask_keys:
self.mask.keys.add(key)
self.debug = debug or os.getenv('DEBUG', self.debug)
self.mask.debug = self.debug
if truststore:
self._client_kwargs.setdefault(
'verify',
truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT),
)
self.token_getting = False
self.token = None
for model in self.models:
model = type(model.__name__, (model,), dict(client=self))
if model.url_list:
model.url_list = model.url_list.format(client=self)
setattr(self, model.__name__, model)
[docs]
@classmethod
async def factory(cls):
"""
Override this method to customize your client construction.
You can add custom args, if you declare them in :py:meth:`setargs()`.
"""
return cls()
[docs]
@classmethod
def setargs(cls, cmd):
"""
Override this method to declare CLI args globally for this client.
:param cmd: :py:class:`ClientCommand` object
"""
[docs]
async def post_call(self, cmd):
"""
Override this method which will run after a CLI exits.
:param cmd: :py:class:`ClientCommand` object
"""
@property
def client(self):
"""
Return last client object used, unless it raised RemoteProtocolError.
"""
if not self._client:
self._client = self.client_factory()
return self._client
[docs]
def client_factory(self):
"""
Return a fresh httpx async client instance.
"""
client = httpx.AsyncClient(*self._client_args, **self._client_kwargs)
if self.token and not self.token_getting:
try:
self.client_token_apply(client)
except NotImplementedError:
pass
return client
@client.setter
def client(self, value):
self._client = value
@client.deleter
def client(self):
self._client = None
[docs]
async def send(self, request, handler, retries=True, semaphore=None,
log=None, quiet=None, auth=None, follow_redirects=None):
"""
Internal request method
"""
semaphore = semaphore or self.semaphore
tries = 0
async def _send():
return await self.client.send(
request,
auth=auth,
follow_redirects=follow_redirects,
)
async def _request():
if semaphore:
async with semaphore:
return await _send()
return await _send()
_log = log.bind(method=request.method, url=str(request.url))
if not quiet or self.debug:
# ensure we have content to log
await request.aread()
key, value = self.request_log_data(request, quiet)
kwargs = dict()
if value:
kwargs[key] = value
if os.getenv('HTTP_DEBUG'):
kwargs['content'] = request.content
kwargs['headers'] = request.headers
_log.debug('request', **kwargs)
while retries or tries > 1:
try:
response = await _request()
except Exception as exc:
await handler(self, exc, tries, log)
else:
kwargs = dict(status_code=response.status_code)
if not quiet or self.debug:
key, value = self.response_log_data(response)
if value:
kwargs[key] = value
_log.info('response', **kwargs)
if response := await handler(self, response, tries, log):
return response
tries += 1
return response
async def client_reset(self):
del self.client
async def token_reset(self):
self.token = None
[docs]
async def token_refresh(self):
""" Use :py:meth:`token_get()` to get a token """
self.token_getting = True
try:
self.token = await self.token_get()
except NotImplementedError:
self.token = True
else:
try:
self.client_token_apply(self.client)
except NotImplementedError:
pass
self.token_getting = False
[docs]
def client_token_apply(self, client):
"""
Actually provision self.client with self.token.
This is yours to implement, ie.:
.. code-block:: python
client.headers['X-API'] = f'Bearer {self.token}'
Do NOT use self.client in this function given it's called by the
factory itself.
:param client: The actual AsyncClient instance to provision.
"""
raise NotImplementedError()
[docs]
async def token_get(self):
"""
Authentication dance to get a token.
This method will automatically be called by :py:meth:`request` if it
finds out that :py:attr:`token` is None.
This is going to depend on the API you're going to consume, basically
it's very client-specific.
By default, this method does nothing. Implement it to your likings.
This method is supposed to return the token, but doesn't do anything
with it by itself.
You also need to implement the :py:meth:`client_token_apply` which is
in charge of updating the actual httpx client object with the said
token.
.. code-block::
async def token_get(self):
response = await self.post('/login', dict(...))
return response.json()['token']
def client_token_apply(self, client):
client.headers['X-ApiKey'] = self.token
"""
raise NotImplementedError()
[docs]
async def request(
# base arguments
self, method, url,
*,
# cli2 arguments
handler=None, quiet=False, accepts=None, refuses=None, tries=None,
backoff=None, retries=True, semaphore=None, mask=None,
# httpx arguments
content=None, data=None, files=None, json=None, params=None,
headers=None, cookies=None, auth=httpx.USE_CLIENT_DEFAULT,
follow_redirects=httpx.USE_CLIENT_DEFAULT,
timeout=httpx.USE_CLIENT_DEFAULT, extensions=None,
):
"""
Request method
If your client defines a token_get callable, then it will
automatically play it.
If your client defines an asyncio semaphore, it will respect it.
.. code-block:: python
client = Client()
await client.request(
'GET',
'/',
# extend the current handler with 10 tries with 200 accepted
# status code only
tries=10,
accepts=[200],
)
await client.request(
'GET',
'/',
# you can also pass your own handler callable
handler=Handler(tries=10, accepts=[200]),
# that could also have been a function
)
:param method: HTTP Method name, GET, POST, etc
:param url: URL to query
:param handler: If a callable, will be called, if a dict, will extend
the client's :py:attr:`handler`.
:param quiet: Wether to log or not, used by :py:class:`Paginator` to
not clutter logs with pagination. Meaning if you want to
debug pagination, you'll have to make it not quiet from
there.
If you really want to see all results, set
:py:attr:`debug` to True.
:param retries: Wether to retry or not in case handler dosen't accept
the response, set to False if you want only 1 try.
:param accepts: Override for :py:attr:`Handler.accepts`
:param refuses: Override for :py:attr:`Handler.refuses`
:param tries: Override for :py:attr:`Handler.tries`
:param backoff: Override for :py:attr:`Handler.backoff`
:param semaphore: Override for :py:attr:`Client.semaphore`
"""
if not self.token and not self.token_getting:
await self.token_refresh()
if not accepts and os.getenv('STRICT'):
raise Exception('Accepts not set')
if handler is None:
if accepts or refuses or tries or backoff:
# if any handler kwarg, clone our handler and override
handler = copy.deepcopy(self.handler)
if accepts is not None:
handler.accepts = accepts
if refuses is not None:
handler.refuses = refuses
if tries is not None:
handler.tries = tries
if backoff is not None:
handler.backoff = backoff
else:
handler = self.handler
request = self.client.build_request(
method=method,
url=url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
extensions=extensions,
)
response = await self.send(
request,
handler=handler,
retries=retries,
semaphore=semaphore,
log=log,
quiet=quiet,
auth=auth,
follow_redirects=follow_redirects,
)
return response
def response_log_data(self, response):
try:
data = response.json()
except: # noqa
if response.content:
return 'content', self.mask(response.content)
else:
if data:
return 'json', self.mask(data)
return None, None
def request_log_data(self, request, quiet=False):
content = request.content.decode()
if not content:
return None, None
try:
data = json.loads(content)
except: # noqa
pass
else:
return 'json', self.mask(data)
parsed = parse_qs(content)
if parsed:
data = {
key: value[0] if len(value) == 1 else value
for key, value in parsed.items()
}
return 'data', self.mask(data)
return 'content', self.mask(content)
return data
[docs]
@cmd
async def get(self, url, *args, **kwargs):
""" GET Request """
return await self.request('GET', url, *args, **kwargs)
[docs]
@cmd(name='request')
async def request_cmd(self, method, url, *args, **kwargs):
"""
Perform an HTTP Request.
This calls the underlying httpx.Client request command, so, you can use
kwargs such as content for raw body pass, data for form data, and json
for json. Values for these kwargs may be file paths.
Example:
request POST /objects json=my_data.yaml
:param method: HTTP verb, GET, POST, etc
:param url: URL relative to the client's base_url
:param args: Any args to pass to the request method
:param kwargs: Any kwargs that will be loaded as file
"""
for key, value in kwargs.items():
file = Path(value)
if not file.exists():
continue
with file.open('r') as fh:
kwargs[key] = yaml.safe_load(fh.read())
return await self.request(method, url, *args, **kwargs)
[docs]
async def patch(self, url, *args, **kwargs):
""" PATCH Request """
return await self.request('PATCH', url, *args, **kwargs)
[docs]
async def post(self, url, *args, **kwargs):
""" POST Request """
return await self.request('POST', url, *args, **kwargs)
[docs]
async def put(self, url, *args, **kwargs):
""" PUT Request """
return await self.request('PUT', url, *args, **kwargs)
[docs]
async def head(self, url, *args, **kwargs):
""" HEAD Request """
return await self.request('HEAD', url, *args, **kwargs)
[docs]
async def delete(self, url, *args, **kwargs):
""" DELETE Request """
return await self.request('DELETE', url, *args, **kwargs)
[docs]
def paginate(self, url, *expressions, params=None, model=None,
callback=None):
"""
Return a paginator to iterate over results
:param url: URL to paginate on
:param params: GET parameters
:param model: Model class to cast for items
"""
return self.paginator(self, url, params or {}, model or dict,
expressions, callback)
class Expression:
def __init__(self, field, value):
self.field = field
self.value = value
self.parameterable = bool(field.parameter)
def params(self, params):
raise NotImplementedError()
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
return And(self, other)
def __str__(self):
return self.compile()
class Equal(Expression):
def params(self, params):
params[self.field.parameter] = self.value
def matches(self, item):
return self.field.__get__(item) == self.value
class Filter(Expression):
def __init__(self, function):
self.function = function
# This filter works with Python functions
self.parameterable = False
def matches(self, item):
return self.function(item)
class LesserThan(Expression):
def matches(self, item):
value = self.field.__get__(item)
if not value:
return False
return value < self.value
class GreaterThan(Expression):
def matches(self, item):
value = self.field.__get__(item)
if not value:
return False
return value > self.value
class StartsWith(Expression):
def matches(self, item):
value = self.field.__get__(item)
if not value:
return False
return str(value).startswith(self.value)
class Expressions(Expression):
def __init__(self, *expressions):
self.expressions = expressions
self.parameterable = all(exp.parameterable for exp in expressions)
class Or(Expressions):
def matches(self, value):
for exp in self.expressions:
if exp.matches(value):
return True
return False
class And(Expressions):
def matches(self, value):
for exp in self.expressions:
if not exp.matches(value):
return False
return True