"""Request context."""
import json
import re
from collections import OrderedDict
from typing import Any, Optional, Tuple, Union
import inflection
from aiohttp import web
from multidict import MultiDict
from .common import Event, FilterRule, JSONAPI, logger, SortDirection
from .errors import HTTPBadRequest, HTTPNotFound
from .abc.schema import SchemaABC
from .typings import (
RequestFields, RequestFilters, RequestIncludes, RequestSorting
)
[docs]class JSONAPIContext:
"""JSON API request context."""
FILTER_KEY = re.compile(r"filter\[(?P<field>\w[-\w_]*)\]")
FILTER_VALUE = re.compile(r"(?P<name>[a-z]+):(?P<value>.*)")
FIELDS_RE = re.compile(r"fields\[(?P<name>\w[-\w_]*)\]")
inflect = inflection.underscore
def __init__(self, request: web.Request,
resource_type: str = None) -> None:
"""
Initialize request context.
:param request: Request instance
:param resource_type: Resource type for current request
"""
self.__request = request
self.__resource_type = resource_type
if self.__resource_type is None:
self.__resource_type = self.__request.match_info.get('type', None)
if (self.__resource_type is None or
self.__resource_type not in self.registry):
# If type is not found in URI, and type is not passed
# via decorator to custom handler, then raise HTTP 404
raise HTTPNotFound()
self.__pagination = None
self.__filters = self.parse_request_filters(request)
self.__fields = self.parse_request_fields(request)
self.__include = self.parse_request_includes(request)
self.__sorting = self.parse_request_sorting(request)
self.__event = None
if self.__request.method in Event.__members__:
self.__event = Event[self.__request.method]
schema_cls, controller_cls = self.registry.get(self.resource_type)
self.__controller = controller_cls(self)
self.__schema = schema_cls(self)
logger.debug('Request context info:\n'
'Filters: %s\n'
'Fields: %s\n'
'Includes: %s\n'
'Sorting: %s\n'
'Event: %s\n'
'Schema: %s\n'
'Controller: %s\n',
self.filters, self.fields, self.include, self.sorting,
self.event, schema_cls.__name__, controller_cls.__name__)
@property
def request(self):
return self.__request
@property
def app(self):
return self.__request.app
@property
def resource_type(self):
return self.__resource_type
@property
def registry(self):
return self.app[JSONAPI]['registry']
@property
def schema(self) -> Optional[SchemaABC]:
return self.__schema
@property
def controller(self):
return self.__controller
@property
def filters(self):
return self.__filters
@property
def fields(self):
return self.__fields
@property
def include(self):
return self.__include
@property
def sorting(self):
return self.__sorting
@property
def event(self):
return self.__event
@property
def pagination(self):
if self.__pagination is not None:
return self.__pagination
if self.schema is not None:
pagination_type = self.schema.opts.pagination
if pagination_type:
self.__pagination = pagination_type(self.__request)
return self.__pagination
return None
[docs] @classmethod
def convert_field_name(cls, field_name):
return cls.inflect(field_name) \
if cls.inflect is not None \
else field_name
[docs] @classmethod
def parse_request_filters(cls, request: web.Request) -> RequestFilters:
"""
Parse filters from request query string.
.. hint::
Please note, that the *filter* strategy is not defined by the
JSON API specification and depends on the implementation.
If you want to use another filter strategy,
feel free to **override** this method.
Returns a MultiDict with field names as keys and rules as values.
Rule value is JSON deserialized from query string.
Filters can be applied using the query string.
.. code-block:: python3
>>> from aiohttp_json_api.context import JSONAPIContext
>>> from aiohttp.test_utils import make_mocked_request
>>> request = make_mocked_request('GET', '/api/User/?filter[name]=endswith:"Simpson"')
>>> JSONAPIContext.parse_request_filters(request)
<MultiDict('name': FilterRule(name='endswith', value='Simpson'))>
>>> request = make_mocked_request('GET', '/api/User/?filter[name]=endswith:"Simpson"&filter[name]=in:["Some","Names"]')
>>> JSONAPIContext.parse_request_filters(request)
<MultiDict('name': FilterRule(name='endswith', value='Simpson'), 'name': FilterRule(name='in', value=['Some', 'Names']))>
>>> request = make_mocked_request('GET', '/api/User/?filter[name]=in:["Homer Simpson", "Darth Vader"]')
>>> JSONAPIContext.parse_request_filters(request)
<MultiDict('name': FilterRule(name='in', value=['Homer Simpson', 'Darth Vader']))>
>>> request = make_mocked_request('GET', '/api/User/?filter[some-field]=startswith:"lisa"&filter[another-field]=lt:20')
>>> JSONAPIContext.parse_request_filters(request)
<MultiDict('some_field': FilterRule(name='startswith', value='lisa'), 'another_field': FilterRule(name='lt', value=20))>
The general syntax is::
"?filter[field]=name:rule"
where *rule* is a JSON value.
:raises HTTPBadRequest:
If the rule of a filter is not a JSON object.
:raises HTTPBadRequest:
If a filter name contains invalid characters.
"""
filters = MultiDict() # type: MultiDict
for key, value in request.query.items():
key_match = re.fullmatch(cls.FILTER_KEY, key)
value_match = re.fullmatch(cls.FILTER_VALUE, value)
# If the key indicates a filter, but the value is not correct
# formatted.
if key_match and not value_match:
field = key_match.group('field')
raise HTTPBadRequest(
detail=f"The filter '{field}' is not correct applied.",
source_parameter=key
)
# The key indicates a filter and the filter name exists.
elif key_match and value_match:
field = key_match.group('field')
name = value_match.group('name')
value = value_match.group('value')
try:
value = json.loads(value)
except Exception as err:
logger.debug(str(err), exc_info=False)
raise HTTPBadRequest(
detail=f"The value '{value}' is not JSON serializable",
source_parameter=key
)
filters.add(cls.convert_field_name(field),
FilterRule(name=name, value=value))
return filters
[docs] @classmethod
def parse_request_fields(cls, request: web.Request) -> RequestFields:
"""
Parse sparse fields from request query string.
Used for determine fields, which should be included in the response
(sparse fieldset).
.. code-block:: python3
>>> from aiohttp_json_api.context import JSONAPIContext
>>> from aiohttp.test_utils import make_mocked_request
>>> request = make_mocked_request('GET', '/api/User?fields[User]=email,name&fields[Post]=comments')
>>> JSONAPIContext.parse_request_fields(request)
OrderedDict([('User', ('email', 'name')), ('Post', ('comments',))])
:seealso: http://jsonapi.org/format/#fetching-sparse-fieldsets
"""
fields = OrderedDict() # type: OrderedDict
for key, value in request.query.items():
match = re.fullmatch(cls.FIELDS_RE, key)
if match:
typename = match.group('name')
fields[typename] = tuple(
cls.convert_field_name(item.strip())
for item in value.split(',')
if item.strip()
)
return fields
[docs] @classmethod
def parse_request_includes(cls, request: web.Request) -> RequestIncludes:
"""
Parse compound documents parameters from request query string.
Returns the names of the relationships, which should be included into
the response.
.. code-block:: python3
>>> from aiohttp_json_api.context import JSONAPIContext
>>> from aiohttp.test_utils import make_mocked_request
>>> request = make_mocked_request('GET', '/api/Post?include=author,comments.author,some-field.nested')
>>> JSONAPIContext.parse_request_includes(request)
(('author',), ('comments', 'author'), ('some_field', 'nested'))
:seealso: http://jsonapi.org/format/#fetching-includes
"""
return tuple(
tuple(cls.convert_field_name(p) for p in path.split('.'))
for path in request.query.get('include', '').split(',') if path
)
[docs] @classmethod
def parse_request_sorting(cls, request: web.Request) -> RequestSorting:
"""
Parse sorting parameters of fields from request query string.
Returns a mapping with tuples as keys, and values with SortDirection,
describing how the output should be sorted.
.. code-block:: python3
>>> from aiohttp_json_api.context import JSONAPIContext
>>> from aiohttp.test_utils import make_mocked_request
>>> request = make_mocked_request('GET', '/api/Post?sort=name,-age,+comments.count')
>>> JSONAPIContext.parse_request_sorting(request)
OrderedDict([(('name',), <SortDirection.ASC: '+'>), (('age',), <SortDirection.DESC: '-'>), (('comments', 'count'), <SortDirection.ASC: '+'>)])
:seealso: http://jsonapi.org/format/#fetching-sorting
"""
sort = OrderedDict() # type: RequestSorting
if 'sort' not in request.query:
return sort
direction = SortDirection.ASC
for field in request.query.get('sort').split(','):
if field.startswith(('+', '-')):
direction = SortDirection(field[0])
field = field[1:]
field = tuple(cls.convert_field_name(e.strip())
for e in field.split('.'))
sort[field] = direction
return sort
[docs] def has_filter(self, field: str, name: str) -> bool:
"""
Check current context for existing filters of field.
Returns true, if the filter *name* has been applied at least once
on the *field*.
:arg str field:
Name of field
:arg str name:
Name of filter
"""
return (field, name) in self.filters
[docs] def get_filter(self, field: str, name: str, default: Any = None) -> Any:
"""
Get filter from request context by name and field.
If the filter *name* has been applied on the *field*, the
*filter* is returned and *default* otherwise.
:arg str field:
Name of field
:arg str name:
Name of filter
:arg Any default:
A fallback rule value for the filter.
"""
return self.filters.get((field, name), default)
[docs] def get_order(self, field: Union[str, Tuple[str, ...]],
default: SortDirection = SortDirection.ASC) -> SortDirection:
"""
Get sorting order of field from request context.
Checks if a sort criterion (``+`` or ``-``) for the *field* exists
and returns it.
:arg Union[str, Tuple[str, ...]] field:
:arg SortDirection default:
Returned, if no criterion is set by the request.
"""
field = tuple(field.split('.')) if isinstance(field, str) else field
return self.sorting.get(field, default)