Source code for aiohttp_json_api.helpers


import inspect
from collections import Iterable, Mapping
from typing import Optional, Tuple, List, Iterable as IterableType

from aiohttp import web
from mimeparse import parse_media_range, _filter_blank

from .abc.field import FieldABC
from .fields.decorators import Tag
from .typings import Callee, MimeTypeComponents, QFParsed
from .common import JSONAPI

[docs]def is_generator(obj): """Return True if ``obj`` is a generator.""" return inspect.isgeneratorfunction(obj) or inspect.isgenerator(obj)
[docs]def is_iterable_but_not_string(obj): """Return True if ``obj`` is an iterable object that isn't a string.""" return ( (isinstance(obj, Iterable) and not hasattr(obj, "strip")) or is_generator(obj) )
[docs]def is_indexable_but_not_string(obj): """Return True if ``obj`` is indexable but isn't a string.""" return not hasattr(obj, "strip") and hasattr(obj, "__getitem__")
[docs]def is_collection(obj, exclude=()): """Return True if ``obj`` is a collection type.""" return (not isinstance(obj, (Mapping,) + exclude) and is_iterable_but_not_string(obj))
[docs]def ensure_collection(value, exclude=()): """Ensure value is collection.""" return value if is_collection(value, exclude=exclude) else (value,)
[docs]def first(iterable, default=None, key=None): """ Return first element of *iterable*. Return first element of *iterable* that evaluates to ``True``, else return ``None`` or optional *default*. >>> first([0, False, None, [], (), 42]) 42 >>> first([0, False, None, [], ()]) is None True >>> first([0, False, None, [], ()], default='ohai') 'ohai' >>> import re >>> m = first(re.match(regex, 'abc') for regex in ['b.*', 'a(.*)']) >>> 'bc' The optional *key* argument specifies a one-argument predicate function like that used for *filter()*. The *key* argument, if supplied, should be in keyword form. For example, finding the first even number in an iterable: >>> first([1, 1, 3, 4, 5], key=lambda x: x % 2 == 0) 4 Contributed by Hynek Schlawack, author of `the original standalone module`_ .. _the original standalone module: """ return next(filter(key, iterable), default)
[docs]def make_sentinel(name='_MISSING', var_name=None): """ Create sentinel instance. Creates and returns a new **instance** of a new class, suitable for usage as a "sentinel", a kind of singleton often used to indicate a value is missing when ``None`` is a valid input. >>> make_sentinel(var_name='_MISSING') _MISSING The most common use cases here in project are as default values for optional function arguments, partly because of its less-confusing appearance in automatically generated documentation. Sentinels also function well as placeholders in queues and linked lists. .. note:: By design, additional calls to ``make_sentinel`` with the same values will not produce equivalent objects. >>> make_sentinel('TEST') == make_sentinel('TEST') False >>> type(make_sentinel('TEST')) == type(make_sentinel('TEST')) False :arg str name: Name of the Sentinel :arg str var_name: Set this name to the name of the variable in its respective module enable pickleability. """ class Sentinel(object): def __init__(self): = name self.var_name = var_name def __repr__(self): if self.var_name: return self.var_name return '%s(%r)' % (self.__class__.__name__, if var_name: def __reduce__(self): return self.var_name def __nonzero__(self): return False __bool__ = __nonzero__ return Sentinel()
[docs]def get_router_resource(app: web.Application, resource: str): """Return route of JSON API application for resource.""" return app.router[f"{app[JSONAPI]['routes_namespace']}.{resource}"]
[docs]def get_processors(obj, tag: Tag, field: FieldABC, default: Optional[Callee] = None): has_processors = getattr(obj, '_has_processors', False) if has_processors: processor_tag = tag, field.key processors = obj.__processors__.get(processor_tag) if processors: for processor_name in processors: processor = getattr(obj, processor_name) processor_kwargs = \ processor.__processing_kwargs__.get(processor_tag) yield processor, processor_kwargs return if not callable(default): return yield default, {}
[docs]def quality_and_fitness_parsed(mime_type: str, parsed_ranges: List[MimeTypeComponents] ) -> QFParsed: """Find the best match for a mime-type amongst parsed media-ranges. Find the best match for a given mime-type against a list of media_ranges that have already been parsed by parse_media_range(). Returns a tuple of the fitness value and the value of the 'q' quality parameter of the best match, or (-1, 0) if no match was found. Just as for quality_parsed(), 'parsed_ranges' must be a list of parsed media ranges. Cherry-picked from python-mimeparse and improved. """ best_fitness = -1 best_fit_q = 0 (target_type, target_subtype, target_params) = parse_media_range(mime_type) best_matched = None for (type, subtype, params) in parsed_ranges: # check if the type and the subtype match type_match = ( type in (target_type, '*') or target_type == '*' ) subtype_match = ( subtype in (target_subtype, '*') or target_subtype == '*' ) # if they do, assess the "fitness" of this mime_type if type_match and subtype_match: # 100 points if the type matches w/o a wildcard fitness = type == target_type and 100 or 0 # 10 points if the subtype matches w/o a wildcard fitness += subtype == target_subtype and 10 or 0 # 1 bonus point for each matching param besides "q" param_matches = sum([ 1 for (key, value) in target_params.items() if key != 'q' and key in params and value == params[key] ]) fitness += param_matches # finally, add the target's "q" param (between 0 and 1) fitness += float(target_params.get('q', 1)) if fitness > best_fitness: best_fitness = fitness best_fit_q = params['q'] best_matched = (type, subtype, params) return (float(best_fit_q), best_fitness), best_matched
[docs]def best_match(supported: IterableType[str], header: str) -> Tuple[str, Optional[MimeTypeComponents]]: """Return mime-type with the highest quality ('q') from list of candidates. Takes a list of supported mime-types and finds the best match for all the media-ranges listed in header. The value of header must be a string that conforms to the format of the HTTP Accept: header. The value of 'supported' is a list of mime-types. The list of supported mime-types should be sorted in order of increasing desirability, in case of a situation where there is a tie. Cherry-picked from python-mimeparse and improved. >>> best_match(['application/xbel+xml', 'text/xml'], 'text/*;q=0.5,*/*; q=0.1') ('text/xml', ('text', '*', {'q': '0.5'})) """ split_header = _filter_blank(header.split(',')) parsed_header = [parse_media_range(r) for r in split_header] weighted_matches = {} for i, mime_type in enumerate(supported): weight, match = quality_and_fitness_parsed(mime_type, parsed_header) weighted_matches[(weight, i)] = (mime_type, match) best = max(weighted_matches.keys()) return best[0][0] and weighted_matches[best] or ('', None)
[docs]def get_mime_type_params(mime_type: MimeTypeComponents): return {k: v for k, v in mime_type[2].items() if k != 'q'}
MISSING = make_sentinel()