Source code for dojson.overdo

# -*- coding: utf-8 -*-
#
# This file is part of DoJSON
# Copyright (C) 2015, 2016 CERN.
#
# DoJSON is free software; you can redistribute it and/or
# modify it under the terms of the Revised BSD License; see LICENSE
# file for more details.

"""Do JSON translation."""

import re

from pkg_resources import iter_entry_points

from ._compat import iteritems, zip_longest
from .errors import IgnoreKey, MissingRule
from .utils import GroupableOrderedDict

try:
    from _sre import MAXGROUPS
except ImportError:
    MAXGROUPS = 100


[docs]class Index(object): """Index implementation based on build-in Python SRE module.""" def __init__(self, rules=None, flags=0, branch_size=MAXGROUPS - 1): """Initialize index structures. :param rules: list of tuples (regular expression, data) :param flags: additional flags passed to SRE parser :param branch_size: number of groups in a branch (max. 99) """ self._patterns = [] self.flags = flags self.rules = rules or [] self.branch_size = min(branch_size, len(self.rules)) def make_pattern(rules, flags=0): """Compile a rules to single branch with groups.""" return re.compile('|'.join('(?P<I{name}>{regex})'.format( name=name, regex=regex ) for name, (regex, _) in enumerate(rules)), flags=flags) for rules in zip_longest(*[iter(self.rules)] * self.branch_size): self._patterns.append(make_pattern([ rule for rule in rules if rule is not None ]))
[docs] def query(self, key): """Return data matching the key.""" for section, pattern in enumerate(self._patterns): match = pattern.match(key) if match: return self.rules[section * self.branch_size + int( match.lastgroup[1:]
)][1]
[docs]class Overdo(object): """Translation index.""" def __init__(self, bases=None, entry_point_group=None): """Constructor.""" self.rules = [] if bases: for base in bases: base._collect_entry_points() self.rules.extend(base.rules) self.entry_point_group = entry_point_group self.index = None def _collect_entry_points(self): """Collect entry points.""" if self.entry_point_group is not None: for entry_point in iter_entry_points( group=self.entry_point_group, name=None): entry_point.load()
[docs] def build(self): """Build.""" self._collect_entry_points()
self.index = Index(self.rules)
[docs] def over(self, name, *source_tags): """Register creator rule.""" def decorator(creator): self.index = None for field in source_tags: self.rules.append((field, (name, creator))) return creator
return decorator
[docs] def do(self, blob, ignore_missing=True, exception_handlers=None): """Translate blob values and instantiate new model instance. Raises ``MissingRule`` when no rule matched and ``ignore_missing`` is ``False``. :param blob: ``dict``-like object on which the matching rules are going to be applied. :param ignore_missing: Set to ``False`` if you prefer to raise an exception ``MissingRule`` for the first key that it is not matching any rule. :param exception_handlers: Give custom exception handlers to take care of non-standard codes that are installation specific. .. versionchanged:: 1.0.0 ``ignore_missing`` allows to specify if the function should raise an exception. .. versionchanged:: 1.1.0 ``exception_handlers`` allows to set custom handlers for non-standard MARC codes. """ handlers = {IgnoreKey: None} handlers.update(exception_handlers or {}) def clean_missing(exc, output, key, value): order = output.get('__order__') if order: order.remove(key) if ignore_missing: handlers.setdefault(MissingRule, clean_missing) output = {} if self.index is None: self.build() if isinstance(blob, GroupableOrderedDict): items = blob.iteritems(repeated=True) else: items = iteritems(blob) for key, value in items: try: result = self.index.query(key) if not result: raise MissingRule(key) name, creator = result data = creator(output, key, value) if getattr(creator, '__extend__', False): existing = output.get(name, []) existing.extend(data) output[name] = existing else: output[name] = data except Exception as exc: if exc.__class__ in handlers: handler = handlers[exc.__class__] if handler is not None: handler(exc, output, key, value) else: raise
return output
[docs] def missing(self, blob): """Return keys with missing rules.""" if self.index is None: self.build()
return [key for key in blob.keys() if self.index.query(key) is None]