"""
HTTP Client boilerplate code to conquer the world.
"""
import asyncio
import copy
import httpx
import inspect
import json
import math
import os
import ssl
import structlog
import yaml
from datetime import datetime
from pathlib import Path
from urllib.parse import parse_qs
try:
import truststore
except ImportError:
truststore = None
from . import display
from .asyncio import async_resolve
from .command import Command
from .colors import colors
from .group import Group
from .decorators import cmd, hide
[docs]
class Paginator:
"""
Generic pagination class.
You don't have to override that class to do basic paginator customization,
instead, you can also implement pagination specifics into:
- the :py:class:`~Client` class with the
:py:meth:`~Client.pagination_initialize` and
:py:meth:`~Client.pagination_parameters` methods
- or also, per model, in the :py:class:`~Model` class with the
:py:meth:`~Model.pagination_initialize` and
:py:meth:`~Model.pagination_parameters` methods
Refer to :py:meth:`pagination_parameters` and
:py:meth:`pagination_initialize` for details.
.. py:attribute:: total_pages
Total number of pages.
.. py:attribute:: total_items
Total number of items.
.. py:attribute:: per_page
Number of items per page
.. py:attribute:: url
The URL to query
.. py:attribute:: url
The URL to query
.. py:attribute:: params
Dictionnary of GET parameters
.. py:attribute:: model
:py:class:`Model` class or ``dict`` by default.
"""
def __init__(self, client, url, params=None, model=None, expressions=None):
"""
Initialize a paginator object with a client on a URL with parameters.
:param client: :py:class:`Client` object
:param url: URL to query
:param params: Dictionnary of GET parameters
:params model: Model class, can be a dict, or :py:class:`Model`
"""
self.client = client
self.url = url
self.params = params or {}
self.model = model or dict
self.page_start = 1
self.per_page = None
self.initialized = False
self.expressions = []
for expression in (expressions or []):
if not isinstance(expression, Expression):
expression = Filter(expression)
self.expressions.append(expression)
self._total_pages = None
self._total_items = None
self._reverse = False
[docs]
def reverse(self):
"""
Return a copy of this :py:class:`Paginator` object to iterate in
reverse order.
For this to work, :py:meth:`pagination_initialize` **must** set
:py:attr:`per_page` and either of :py:attr:`total_pages` or
:py:attr:`total_items`, which is up to you to implement.
"""
obj = copy.copy(self)
obj._reverse = True
return obj
@property
def total_items(self):
return self._total_items
@total_items.setter
def total_items(self, value):
self._total_items = value
@property
def total_pages(self):
if self._total_pages:
return self._total_pages
if self.total_items and self.per_page:
self._total_pages = math.ceil(self.total_items / self.per_page)
return self._total_pages
@total_pages.setter
def total_pages(self, value):
self._total_pages = value
async def list(self):
self.results = []
async for item in self:
self.results.append(item)
return self.results
[docs]
async def initialize(self, response=None):
"""
This method is called once when we get the first response.
:param response: First response object
"""
if not response:
response = await self.page_response(1)
data = response.json()
if isinstance(data, list):
# we won't figure max page
self.initialized = True
return
self.pagination_initialize(data)
self.initialized = True
[docs]
def response_items(self, response):
"""
Parse a response and return a list of model items.
:param response: Response to parse
"""
try:
data = response.json()
except json.JSONDecodeError:
return []
items_list = []
if isinstance(data, list):
items_list = data
elif isinstance(data, dict):
for key, value in data.items():
if isinstance(value, list):
items_list = value
break
items = [self.model(item) for item in items_list]
if not self.per_page:
self.per_page = len(items_list)
return items
def python_filter(self):
filters = [
f
for f in self.expressions
if not f.parameterable
]
if filters:
return And(*filters)
[docs]
async def page_items(self, page_number):
"""
Return the items of a given page number.
:param page_number: Page number to get the items from
"""
try:
return self.response_items(await self.page_response(page_number))
except NotImplementedError:
return []
[docs]
async def page_response(self, page_number):
"""
Return the response for a page.
:param page_number: Page number to get the items from
"""
params = self.params.copy()
pagination_parameters = self.pagination_parameters(page_number)
if pagination_parameters:
params.update(pagination_parameters)
elif page_number != 1:
raise NotImplementedError(
'pagination_parameters returned None, cannot paginate',
)
for expression in self.expressions:
if expression.parameterable:
expression.params(params)
response = await self.client.get(self.url, params=params, quiet=True)
if not self.initialized:
await self.initialize(response)
return response
async def __aiter__(self, callback=None):
"""
Asynchronous iterator.
"""
if self._reverse and not self.total_pages:
first_page_response = await self.page_response(1)
page = self.total_pages
else:
page = self.page_start
python_filter = self.python_filter()
while items := await self.page_items(page):
if items == 'continue':
continue
if self._reverse:
items = reversed(items)
for item in items:
if not python_filter or python_filter.matches(item):
yield item
if self._reverse:
page -= 1
if not page:
break
if page == 1:
# use cached first page response
items = self.response_items(first_page_response)
for item in reversed(items):
if not python_filter or python_filter.matches(item):
yield item
break
else:
if page == self.total_pages:
break
page += 1
[docs]
async def first(self):
""" Return first item """
async for item in self:
return item
[docs]
class Field:
"""
Field descriptor for models.
The basic Field descriptor manages (get and set) data from within the
:py:attr:`Model.data` JSON.
Since sub-classes are going to convert data, we need to understand the
concepts of internal and external data:
- external: the Python value, this can be any kind of Python object
- internal: the Python representation of the JSON value, this can be any
Python type that will work given json.dumps
- internalizing: process of converting a Python value into a JSON one
- externalizing: process of converting a JSON value into something Python
.. py:attribute:: data_accessor
Describes how to access the Field's data inside the model's data dict.
If data_accessor is ``foo`` then this field will control the ``foo``
key in the model's data dict.
Use ``/`` to separate keys when nesting, if data_accessor is
``foo/bar`` then it will control the ``bar`` key of the ``foo`` dict in
the model's data dict.
.. py:attribute:: parameter
Name of the GET parameter on the model's :py:attr:`Model.url_list`, if
any. So that the filter will be converted to a GET parameter.
Otherwise, filtering will happen in Python.
"""
def __init__(self, data_accessor=None, parameter=None):
self.data_accessor = data_accessor
self.parameter = parameter
def __get__(self, obj, objtype=None):
"""
Get the value of a field for an object.
A simple process:
- Get the internal value from :py:meth:`internal_get`
- Pass it through the :py:meth:`externalize` method prior to returning
it.
"""
if obj is None:
return self
data = self.internal_get(obj)
return self.externalize(obj, data)
def __set__(self, obj, value):
"""
Set the value in the internal :py:attr:`Model.json` dict.
A two-step process:
- Use :py:meth:`internalize` to convert the Python external value into
a Python representation of the JSON value
- Use :py:meth:`internal_set` to actually set the internal
:py:attr:`Model.data`
"""
value = self.internalize(obj, value)
self.internal_set(obj, value)
[docs]
def internal_get(self, obj):
"""
Get the "raw" value from the object, which is a Python representation
of the JSON internal value, using :py:attr:`data_accessor`.
"""
data = obj.data
if not isinstance(data, dict):
return self.__get__(data, type(data))
for key in self.data_accessor.split('/'):
if not data:
return ''
try:
data = data[key]
except KeyError:
return
return data
[docs]
def internal_set(self, obj, value):
"""
Using :py:attr:`data_accessor`, set the value in :py:attr:`Model.data`
"""
parts = self.data_accessor.split('/')
data = obj.data
for number, key in enumerate(parts, start=1):
if number == len(parts):
data[key] = value
break
if key not in data:
data[key] = dict()
data = data[key]
[docs]
def internalize(self, obj, value):
"""
Transform external value into JSON internal value.
Any kind of processing from the Python value to the JSON value can be
done in this method.
"""
return value
[docs]
def externalize(self, obj, value):
"""
Transform internal JSON value to Python value, based on
:py:attr:`data_accessor`.
Any kind of processing from the JSON value to the Python value can be
done in this method.
"""
return value
[docs]
def mark_dirty(self, obj):
"""
Tell the model that the data must be cleaned.
"""
obj._dirty_fields.append(self)
[docs]
def clean(self, obj):
"""
Clean the data.
Called by the Model when requested a :py:attr:`data`, this method:
- Gets the externalized value from :py:meth:`__get__`
- Convert it into a JSON object with :py:meth:`internalize`
- Uses :py:meth:`internal_set` to update :py:attr:`Model.data`
"""
externalized = self.__get__(obj)
internalized = self.internalize(obj, externalized)
self.internal_set(obj, internalized)
def __eq__(self, value):
return Equal(self, value)
def __gt__(self, value):
return GreaterThan(self, value)
def __lt__(self, value):
return LesserThan(self, value)
def startswith(self, value):
return StartsWith(self, value)
[docs]
class MutableField(Field):
"""
Base class for mutable value fields like :py:class:`JSONStringField`
Basically, this field:
- caches the externalized value, so that you can mutate it
- marks the field as dirty so you get the internalized mutated value that
next time you get the :py:attr:`Model.data`
"""
[docs]
def cache_set(self, obj, value):
"""
Cache a computed value for obj
:param obj: Model object
"""
obj._field_cache[self.name] = value
[docs]
def cache_get(self, obj):
"""
Return cached value for obj
:param obj: Model object
"""
return obj._field_cache[self.name]
def __get__(self, obj, objtype=None):
"""
Return safely mutable value.
If the value is not found in cache, externalize the internal value and
cache it.
Always mark the field as dirty given the cached external data may
mutate.
"""
if not obj:
return super().__get__(obj, objtype)
try:
return self.cache_get(obj)
except KeyError:
externalized = self.externalize(obj, self.internal_get(obj))
self.cache_set(obj, externalized)
return externalized
finally:
if not obj._data_updating:
self.mark_dirty(obj)
def __set__(self, obj, value):
"""
Cache the value prior to setting it normally.
"""
self.cache_set(obj, value)
super().__set__(obj, value)
[docs]
class JSONStringField(MutableField):
"""
Yes, some proprietary APIs have JSON fields containing JSON strings.
This Field is the cure the world needed for that disease.
.. py:attribute:: options
Options dict for json.dumps, ie. ``options=dict(indent=4)``
"""
def __init__(self, *args, options=None, **kwargs):
super().__init__(*args, **kwargs)
self.options = options or dict()
[docs]
def internalize(self, obj, data):
return json.dumps(data, **self.options)
[docs]
def externalize(self, obj, value):
return json.loads(value)
[docs]
class DateTimeField(Field):
"""
JSON has no datetime object, which means different APIs will serve us
different formats in string variables.
Heck, I'm pretty sure there are even some APIs which use different formats.
This is the cure the world needed against that disease.
.. py:attribute:: fmt
The datetime format for Python's strptime/strftime.
.. py:attribute:: fmts
A list of formats, in case you don't have one. This list will be
populated with :py:attr:`default_fmts` by default.
.. py:attribute:: default_fmts
A class property containing a list of formats we're going to try to
figure `fmt` and have this thing "work by default". Please contribute
to this list with different formats.
"""
iso_fmt = '%Y-%m-%dT%H:%M:%S.%f'
default_fmts = [
iso_fmt,
'%Y-%m-%dT%H:%M:%S',
]
def __init__(self, *args, fmt=None, fmts=None, **kwargs):
super().__init__(*args, **kwargs)
self.fmt = fmt
self.fmts = fmts
if not self.fmt and not self.fmts:
self.fmts = self.default_fmts
[docs]
def externalize(self, obj, value):
"""
Convert the internal string into an external datetime.
"""
if self.fmt:
return datetime.strptime(value, self.fmt)
# try a bunch of formats and hope for the best
for fmt in self.default_fmts:
try:
value = datetime.strptime(value, fmt)
except ValueError:
continue
else:
self.fmt = fmt
return value
raise FieldExternalizeError(
f'Could not figure how to parse {value}, use fmt option',
self, obj, value,
)
[docs]
def internalize(self, obj, value):
"""
Convert a datetime into an internal string.
"""
if isinstance(value, str):
return value
return value.strftime(self.fmt or self.iso_fmt)
[docs]
class ModelCommand(Command):
"""
Command class for Model class.
"""
def __init__(self, target, *args, **kwargs):
# unbound method by force
target = getattr(target, '__func__', target)
super().__init__(target, *args, **kwargs)
self.overrides['self']['factory'] = self.get_object
self.overrides['cls']['factory'] = self.get_model
[docs]
def setargs(self):
super().setargs()
if 'self' in self:
self.arg('id', position=0, kind='POSITIONAL_ONLY', doc='ID')
async def get_client(self):
client = self.group.parent.overrides['self']['factory']()
return await async_resolve(client)
async def get_model(self):
return getattr(await self.get_client(), self.model.__name__)
async def get_object(self):
model = await self.get_model()
return await model.get(id=self['id'].value)
[docs]
class ModelGroup(Group):
def __init__(self, cls):
super().__init__(
name=cls.__name__.lower(),
doc=inspect.getdoc(cls),
cmdclass=type(
'ModelCommand',
(cls.model_command,),
dict(model=cls),
)
)
url_list_methods = ['find', 'get', 'delete', 'create']
if cls.url_list:
for name in url_list_methods:
self.cmd(getattr(cls, name))
self.load_cls(cls, exclude=url_list_methods)
class ModelMetaclass(type):
def __new__(cls, name, bases, attributes):
cls = super().__new__(cls, name, bases, attributes)
client = getattr(cls, 'client', None)
if client:
return cls
client_class = getattr(cls, '_client_class', None)
if client_class:
client_class.models.append(cls)
client_cli = getattr(client_class, 'cli', None)
if client_cli:
cls.cli = client_cli[cls.__name__.lower()] = ModelGroup(cls)
cls._fields = dict()
def process_cl(cl):
for key, obj in cl.__dict__.items():
if not isinstance(obj, Field):
continue
if not obj.data_accessor:
obj.data_accessor = key
cls._fields[key] = obj
obj.name = key
process_cl(cls)
def process_bases(cl):
for base in cl.__bases__:
process_cl(base)
process_bases(base)
process_bases(cls)
return cls
[docs]
class Model(metaclass=ModelMetaclass):
"""
You should never call this class directly, instead, get it from the
:py:class:`Client` object after decorating your model classes with a
client as such:
.. py:attribute:: paginator
:py:class:`Paginator` class, you can leave it by default and just
implement :py:meth:`pagination_initialize` and
:py:meth:`pagination_parameters`.
.. py:attribute:: url_list
The URL to get the list of objects, you're supposed to configure it as
a model attribute in your model subclass.
This may be a format string using a client ``client`` variable.
.. py:attribute:: url_detail
The URL to get the details of an object, you're supposed to configure
it as a model attribute in your model subclass.
.. py:attribute:: id_field
Name of the field that should be used as resource identifier, `id` by
default.
.. py:attribute:: url
Object URL based on :py:attr:`url_detail` and :py:attr:`id_field`.
"""
paginator = Paginator
model_command = ModelCommand
model_group = ModelGroup
url_list = None
url_detail = '{self.url_list}/{self.id_value}'
id_field = 'id'
def __init__(self, data=None, **values):
"""
Instanciate a model.
:param data: JSON Data
"""
self._data = data or dict()
self._data_updating = False
self._dirty_fields = []
self._field_cache = dict()
for key, value in values.items():
setattr(self, key, value)
@property
def data(self):
"""
Just ensure we update dirty data prior to returning the data dict.
"""
if self._dirty_fields and not self._data_updating:
self._data_updating = True
for field in self._dirty_fields:
field.clean(self)
self._dirty_fields = []
self._data_updating = False
return self._data
@data.setter
def data(self, value):
self._data = value
[docs]
@classmethod
@hide('expressions')
@cmd(color='green')
def find(cls, *expressions, **params):
"""
Find objects filtered by GET params
:param params: GET URL parameters
:param expressions: :py:class:`Expression` list
"""
return cls.paginate(cls.url_list, *expressions, **params)
[docs]
@classmethod
def paginate(cls, url, *expressions, **params):
"""
Return a :py:class:`Paginator` based on :py:attr:`url_list`
:param expressions: :py:class:`Expression` list
"""
return cls.paginator(cls.client, url, params, cls, expressions)
@property
def cli2_display(self):
return self.data
@property
def url(self):
if 'url_list' in self.url_detail and not self.url_list:
raise Exception(f'{type(self).__name__}.url_list not set')
return self.url_detail.format(self=self)
[docs]
@cmd(color='red')
async def delete(self):
"""
Delete model.
DELETE request on :py:attr:`url`
"""
return await self.client.delete(self.url)
[docs]
@classmethod
@cmd(doc="""
POST request to create.
Example:
create name=foo
""")
async def create(cls, **kwargs):
"""
Instanciate a model with kwargs and run :py:meth:`save`.
"""
obj = cls(**kwargs)
await obj.save()
return obj
[docs]
@classmethod
@cmd(color='green', doc="""
Get a model based on kwargs.
Example:
get id=3
""")
async def get(cls, **kwargs):
"""
Instanciate a model with kwargs and run :py:meth:`hydrate`.
"""
obj = cls(**kwargs)
await obj.hydrate()
return obj
[docs]
async def hydrate(self):
"""
Refresh data with GET requset on :py:attr:`url_detail`
"""
response = await self.client.get(self.url)
self.data.update(response.json())
[docs]
async def save(self):
"""
Call :py:meth:`update` if `self.id` otherwise :py:meth:`instanciate`.
Then updates :py:attr:`data` based on the response.json if possible.
You might want to override this.
"""
if self.id_value:
response = await self.update()
else:
response = await self.instanciate()
try:
data = response.json()
except json.JSONDecodeError:
pass
else:
self.data = data
return response
[docs]
async def instanciate(self):
"""
POST :py:attr:`data` to :py:attr:`url_list`, update data with response
json.
You might want to override this.
"""
if not self.url_list:
raise Exception(f'{type(self).__name__}.url_list not set')
return await self.client.post(self.url_list, json=self.data)
[docs]
async def update(self):
"""
POST :py:attr:`data` to :py:attr:`url_list`, update data with response
json.
You might want to override this.
"""
return await self.client.post(self.url, json=self.data)
@property
def id_value(self):
"""
Return value of the :py:attr:`id_field`.
"""
return getattr(self, self.id_field)
class ClientMetaclass(type):
def __new__(cls, name, bases, attributes):
cls = super().__new__(cls, name, bases, attributes)
# bind ourself as _client_class to any inherited model
cls.Model = type('Model', (Model,), dict(_client_class=cls))
cls.models = []
cli_kwargs = dict(
name=cls.__name__.lower().replace('client', '') or 'client',
doc=inspect.getdoc(cls),
overrides=dict(
cls=dict(factory=lambda: cls),
self=dict(factory=lambda: cls())
),
)
cli_kwargs.update(cls.__dict__.get('cli_kwargs', dict()))
cls.cli = Group(**cli_kwargs)
cls.cli.load_cls(cls)
return cls
[docs]
class Handler:
"""
.. py:attribute:: tries
Number of retries for an un-accepted request prior to failing.
Default: 30
.. py:attribute:: backoff
Will sleep ``number_of_tries * backoff`` prior to retrying.
Default: `.1`
.. py:attribute:: accepts
Accepted status codes, you should always set this to ensure responses
with an unexpected status either retry or raise.
Default: range(200, 299)
.. py:attribute:: refuses
List of refused status codes, responses returning those will not retry
at all and raise directly.
Default: [400, 404]
.. py:attribute:: retokens
Status codes which trigger a new call of :py:meth:`~Client.token_get`
prior to a retry. Only one retry is done then by this handler,
considering that authenticating twice in a row is useless: there's a
problem in your credentials instead.
Default: [401, 403, 407, 511]
"""
retokens_defaults = [401, 403, 407, 511]
accepts_defaults = range(200, 299)
refuses_defaults = [400, 404]
tries_default = 30
backoff_default = .1
def __init__(self, accepts=None, refuses=None, retokens=None, tries=None,
backoff=None):
if retokens is None:
self.retokens = copy.copy(self.retokens_defaults)
else:
self.retokens = retokens
if accepts is None:
self.accepts = copy.copy(self.accepts_defaults)
else:
self.accepts = accepts
if refuses is None:
self.refuses = copy.copy(self.refuses_defaults)
else:
self.refuses = refuses
self.tries = self.tries_default if tries is None else tries
self.backoff = self.backoff_default if backoff is None else backoff
async def __call__(self, client, response, tries, mask, log):
if isinstance(response, Exception):
if tries >= self.tries:
raise response
# httpx session is rendered unusable after a TransportError
if isinstance(response, httpx.TransportError):
log.warn('reconnect', exception=str(response))
await client.client_reset()
return
if self.accepts:
if response.status_code in self.accepts:
return response
elif response.status_code not in self.refuses:
return response
if response.status_code in self.refuses:
raise RefusedResponseError(client, response, tries, mask)
if tries >= self.tries:
raise RetriesExceededError(client, response, tries, mask)
seconds = tries * self.backoff
kwargs = dict(
status_code=response.status_code,
tries=tries,
sleep=seconds,
)
key, value = client.response_log_data(response, mask)
if value:
kwargs[key] = value
if response.status_code in self.retokens:
if tries:
# our authentication is just not working, no need to retry
raise TokenGetError(client, response, tries, mask)
log.warn('retoken', **kwargs)
await client.token_reset()
log.info('retry', **kwargs)
await asyncio.sleep(seconds)
[docs]
class ClientError(Exception):
pass
[docs]
class ResponseError(ClientError):
def __init__(self, client, response, tries, mask, msg=None):
self.client = client
self.response = response
self.tries = tries
self.mask = mask
msg = msg or getattr(self, 'msg', '').format(self=self)
super().__init__(self.enhance(msg))
[docs]
def enhance(self, msg):
"""
Enhance an httpx.HTTPStatusError
Adds beatiful request/response data to the exception.
:param exc: httpx.HTTPStatusError
"""
output = [msg]
key, value = self.client.request_log_data(
self.response.request,
self.mask,
)
request_msg = ' '.join([
str(self.response.request.method),
str(self.response.request.url),
])
output.append(
f'{colors.reset}{colors.bold}{request_msg}{colors.reset}',
)
if value:
output.append(display.render(value))
key, value = self.client.response_log_data(self.response, self.mask)
output.append(
''.join([
colors.bold,
f'HTTP {self.response.status_code}',
colors.reset,
])
)
if value:
output.append(display.render(value))
return '\n'.join(output)
[docs]
class TokenGetError(ResponseError):
msg = 'Authentication failed after {self.tries} tries'
[docs]
class RefusedResponseError(ResponseError):
msg = 'Response {self.response} refused'
[docs]
class RetriesExceededError(ResponseError):
msg = 'Unacceptable response {self.response} after {self.tries} tries'
[docs]
class FieldError(ClientError):
pass
[docs]
class FieldValueError(FieldError):
def __init__(self, msg, field, obj, value):
super().__init__(msg)
self.obj = obj
self.field = field
self.value = value
[docs]
class FieldExternalizeError(FieldValueError):
pass
[docs]
class Client(metaclass=ClientMetaclass):
"""
HTTPx Client Wrapper
.. py:attribute:: paginator
:py:class:`Paginator` class, you can leave it by default and just
implement :py:meth:`pagination_initialize` and
:py:meth:`pagination_parameters`.
.. py:attribute:: semaphore
Optionnal asyncio semaphore to throttle requests.
.. py:attribute:: handler
A callback that will take responses objects and decide wether or not to
retry the request, or raise an exception, or return the request.
Default is a :py:class:`Handler`
.. py:attribute:: mask
List of keys to mask in logging, ie.: ``['password', 'secret']``
.. py:attribute:: debug
Enforce full logging: quiet requests are logged, masking does not
apply. This is also enabled with environment variable ``DEBUG``.
.. py:attribute:: models
Declared models for this Client.
"""
paginator = Paginator
models = []
semaphore = None
mask = []
debug = False
def __init__(self, *args, handler=None, semaphore=None, mask=None,
debug=False, **kwargs):
"""
Instanciate a client with httpx.AsyncClient args and kwargs.
"""
self._client = None
self._client_args = args
self._client_kwargs = kwargs
self._client_attrs = None
self.handler = handler or Handler()
self.semaphore = semaphore if semaphore else self.semaphore
self.mask = mask if mask else self.mask
self.debug = debug or os.getenv('DEBUG', self.debug)
if truststore:
self._client_kwargs.setdefault(
'verify',
truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT),
)
self.logger = structlog.get_logger('cli2')
self.token_getting = False
self.token = None
for model in self.models:
model = type(model.__name__, (model,), dict(client=self))
if model.url_list:
model.url_list = model.url_list.format(client=self)
setattr(self, model.__name__, model)
@property
def client(self):
"""
Return last client object used, unless it raised RemoteProtocolError.
"""
if not self._client:
self._client = self.client_factory()
return self._client
[docs]
def client_factory(self):
"""
Return a fresh httpx async client instance.
"""
client = httpx.AsyncClient(*self._client_args, **self._client_kwargs)
if self.token and not self.token_getting:
try:
self.client_token_apply(client)
except NotImplementedError:
pass
return client
@client.setter
def client(self, value):
self._client = value
@client.deleter
def client(self):
self._client = None
[docs]
async def send(self, request, handler, mask, retries=True, semaphore=None,
log=None, auth=None, follow_redirects=None):
"""
Internal request method
"""
semaphore = semaphore or self.semaphore
tries = 0
async def _send():
return await self.client.send(
request,
auth=auth,
follow_redirects=follow_redirects,
)
async def _request():
if semaphore:
async with semaphore:
return await _send()
return await _send()
while retries or tries > 1:
try:
response = await _request()
except Exception as exc:
await handler(self, exc, tries, mask, log)
else:
if response := await handler(self, response, tries, mask, log):
return response
tries += 1
return response
async def client_reset(self):
del self.client
async def token_reset(self):
self.token = None
[docs]
async def token_refresh(self):
""" Use :py:meth:`token_get()` to get a token """
self.token_getting = True
try:
self.token = await self.token_get()
except NotImplementedError:
self.token = True
else:
try:
self.client_token_apply(self.client)
except NotImplementedError:
pass
self.token_getting = False
[docs]
def client_token_apply(self, client):
"""
Actually provision self.client with self.token.
This is yours to implement, ie.:
.. code-block:: python
client.headers['X-API'] = f'Bearer {self.token}'
Do NOT use self.client in this function given it's called by the
factory itself.
:param client: The actual AsyncClient instance to provision.
"""
raise NotImplementedError()
[docs]
async def token_get(self):
"""
Authentication dance to get a token.
This method will automatically be called by :py:meth:`request` if it
finds out that :py:attr:`token` is None.
This is going to depend on the API you're going to consume, basically
it's very client-specific.
By default, this method does nothing. Implement it to your likings.
This method is supposed to return the token, but doesn't do anything
with it by itself.
You also need to implement the :py:meth:`client_token_apply` which is
in charge of updating the actual httpx client object with the said
token.
.. code-block::
async def token_get(self):
response = await self.post('/login', dict(...))
return response.json()['token']
def client_token_apply(self, client):
client.headers['X-ApiKey'] = self.token
"""
raise NotImplementedError()
[docs]
async def request(
# base arguments
self, method, url,
*,
# cli2 arguments
handler=None, quiet=False, accepts=None, refuses=None, tries=None,
backoff=None, retries=True, semaphore=None, mask=None,
# httpx arguments
content=None, data=None, files=None, json=None, params=None,
headers=None, cookies=None, auth=httpx.USE_CLIENT_DEFAULT,
follow_redirects=httpx.USE_CLIENT_DEFAULT,
timeout=httpx.USE_CLIENT_DEFAULT, extensions=None,
):
"""
Request method
If your client defines a token_get callable, then it will
automatically play it.
If your client defines an asyncio semaphore, it will respect it.
.. code-block:: python
client = Client()
await client.request(
'GET',
'/',
# extend the current handler with 10 tries with 200 accepted
# status code only
tries=10,
accepts=[200],
)
await client.request(
'GET',
'/',
# you can also pass your own handler callable
handler=Handler(tries=10, accepts=[200]),
# that could also have been a function
)
:param method: HTTP Method name, GET, POST, etc
:param url: URL to query
:param handler: If a callable, will be called, if a dict, will extend
the client's :py:attr:`handler`.
:param quiet: Wether to log or not, used by :py:class:`Paginator` to
not clutter logs with pagination. Meaning if you want to
debug pagination, you'll have to make it not quiet from
there.
If you really want to see all results, set
:py:attr:`debug` to True.
:param retries: Wether to retry or not in case handler dosen't accept
the response, set to False if you want only 1 try.
:param accepts: Override for :py:attr:`Handler.accepts`
:param refuses: Override for :py:attr:`Handler.refuses`
:param tries: Override for :py:attr:`Handler.tries`
:param backoff: Override for :py:attr:`Handler.backoff`
:param semaphore: Override for :py:attr:`Client.semaphore`
:param mask: Override for :py:attr:`Client.mask`
"""
if not self.token and not self.token_getting:
await self.token_refresh()
if handler is None:
if accepts or refuses or tries or backoff:
# if any handler kwarg, clone our handler and override
handler = copy.deepcopy(self.handler)
if accepts is not None:
handler.accepts = accepts
if refuses is not None:
handler.refuses = refuses
if tries is not None:
handler.tries = tries
if backoff is not None:
handler.backoff = backoff
else:
handler = self.handler
request = self.client.build_request(
method=method,
url=url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
extensions=extensions,
)
log = self.logger.bind(method=method, url=str(request.url))
if not quiet or self.debug:
key, value = self.request_log_data(request, mask, quiet)
kwargs = dict()
if value:
kwargs[key] = value
log.debug('request', **kwargs)
response = await self.send(
request,
handler=handler,
retries=retries,
semaphore=semaphore,
mask=mask,
log=log,
auth=auth,
follow_redirects=follow_redirects,
)
_log = dict(status_code=response.status_code)
if not quiet or self.debug:
key, value = self.response_log_data(response, mask)
if value:
_log[key] = value
log.info('response', **_log)
return response
def response_log_data(self, response, mask):
try:
data = response.json()
except json.JSONDecodeError:
if response.content:
return 'content', self.mask_content(response.content, mask)
else:
if data:
return 'json', self.mask_data(data, mask)
return None, None
def request_log_data(self, request, mask, quiet=False):
content = request.content.decode()
if not content:
return None, None
try:
data = json.loads(content)
except json.JSONDecodeError:
pass
else:
return 'json', self.mask_data(data, mask)
parsed = parse_qs(content)
if parsed:
data = {
key: value[0] if len(value) == 1 else value
for key, value in parsed.items()
}
return 'data', self.mask_data(data, mask)
return 'content', self.mask_content(content, mask)
[docs]
def mask_content(self, content, mask=None):
"""
Implement content masking for non JSON content here.
"""
return content
[docs]
def mask_data(self, data, mask=None):
"""
Apply mask for all :py:attr:`mask` in data.
"""
if self.debug:
return data
mask = mask if mask is not None else self.mask
if isinstance(data, list):
return [self.mask_data(item, mask) for item in data]
if isinstance(data, dict):
for key, value in data.items():
if key in mask:
data[key] = '***MASKED***'
if isinstance(value, dict):
data[key] = self.mask_data(value, mask)
return data
return data
[docs]
@cmd
async def get(self, url, *args, **kwargs):
""" GET Request """
return await self.request('GET', url, *args, **kwargs)
[docs]
@cmd(name='request')
async def request_cmd(self, method, url, *args, **kwargs):
"""
Perform an HTTP Request.
This calls the underlying httpx.Client request command, so, you can use
kwargs such as content for raw body pass, data for form data, and json
for json. Values for these kwargs may be file paths.
Example:
request POST /objects json=my_data.yaml
:param method: HTTP verb, GET, POST, etc
:param url: URL relative to the client's base_url
:param args: Any args to pass to the request method
:param kwargs: Any kwargs that will be loaded as file
"""
for key, value in kwargs.items():
file = Path(value)
if not file.exists():
continue
with file.open('r') as fh:
kwargs[key] = yaml.safe_load(fh.read())
return await self.request(method, url, *args, **kwargs)
[docs]
async def post(self, url, *args, **kwargs):
""" POST Request """
return await self.request('POST', url, *args, **kwargs)
[docs]
async def put(self, url, *args, **kwargs):
""" PUT Request """
return await self.request('PUT', url, *args, **kwargs)
[docs]
async def head(self, url, *args, **kwargs):
""" HEAD Request """
return await self.request('HEAD', url, *args, **kwargs)
[docs]
async def delete(self, url, *args, **kwargs):
""" DELETE Request """
return await self.request('DELETE', url, *args, **kwargs)
[docs]
def paginate(self, url, params=None, model=None):
"""
Return a paginator to iterate over results
:param url: URL to paginate on
:param params: GET parameters
:param model: Model class to cast for items
"""
return self.paginator(self, url, params or {}, model or dict)
class Expression:
def __init__(self, field, value):
self.field = field
self.value = value
self.parameterable = bool(field.parameter)
def params(self, params):
raise NotImplementedError()
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
return And(self, other)
def __str__(self):
return self.compile()
class Equal(Expression):
def params(self, params):
params[self.field.parameter] = self.value
def matches(self, item):
return self.field.__get__(item) == self.value
class Filter(Expression):
def __init__(self, function):
self.function = function
# This filter works with Python functions
self.parameterable = False
def matches(self, item):
return self.function(item)
class LesserThan(Expression):
def matches(self, item):
value = self.field.__get__(item)
if not value:
return False
return value < self.value
class GreaterThan(Expression):
def matches(self, item):
value = self.field.__get__(item)
if not value:
return False
return value > self.value
class StartsWith(Expression):
def matches(self, item):
value = self.field.__get__(item)
if not value:
return False
return str(value).startswith(self.value)
class Expressions(Expression):
def __init__(self, *expressions):
self.expressions = expressions
self.parameterable = all(exp.parameterable for exp in expressions)
class Or(Expressions):
def matches(self, value):
for exp in self.expressions:
if exp.matches(value):
return True
return False
class And(Expressions):
def matches(self, value):
for exp in self.expressions:
if not exp.matches(value):
return False
return True