| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626 |
- # -*-: coding utf-8 -*-
- """ This module contains the error-related constants and classes. """
- from __future__ import absolute_import
- from collections import defaultdict, namedtuple, MutableMapping
- from copy import copy, deepcopy
- from functools import wraps
- from pprint import pformat
- from cerberus.platform import PYTHON_VERSION
- from cerberus.utils import compare_paths_lt, quote_string
- ErrorDefinition = namedtuple('ErrorDefinition', 'code, rule')
- """
- This class is used to define possible errors. Each distinguishable error is
- defined by a *unique* error ``code`` as integer and the ``rule`` that can
- cause it as string.
- The instances' names do not contain a common prefix as they are supposed to be
- referenced within the module namespace, e.g. ``errors.CUSTOM``.
- """
- # custom
- CUSTOM = ErrorDefinition(0x00, None)
- # existence
- DOCUMENT_MISSING = ErrorDefinition(0x01, None) # issues/141
- DOCUMENT_MISSING = "document is missing"
- REQUIRED_FIELD = ErrorDefinition(0x02, 'required')
- UNKNOWN_FIELD = ErrorDefinition(0x03, None)
- DEPENDENCIES_FIELD = ErrorDefinition(0x04, 'dependencies')
- DEPENDENCIES_FIELD_VALUE = ErrorDefinition(0x05, 'dependencies')
- EXCLUDES_FIELD = ErrorDefinition(0x06, 'excludes')
- # shape
- DOCUMENT_FORMAT = ErrorDefinition(0x21, None) # issues/141
- DOCUMENT_FORMAT = "'{0}' is not a document, must be a dict"
- EMPTY_NOT_ALLOWED = ErrorDefinition(0x22, 'empty')
- NOT_NULLABLE = ErrorDefinition(0x23, 'nullable')
- BAD_TYPE = ErrorDefinition(0x24, 'type')
- BAD_TYPE_FOR_SCHEMA = ErrorDefinition(0x25, 'schema')
- ITEMS_LENGTH = ErrorDefinition(0x26, 'items')
- MIN_LENGTH = ErrorDefinition(0x27, 'minlength')
- MAX_LENGTH = ErrorDefinition(0x28, 'maxlength')
- # color
- REGEX_MISMATCH = ErrorDefinition(0x41, 'regex')
- MIN_VALUE = ErrorDefinition(0x42, 'min')
- MAX_VALUE = ErrorDefinition(0x43, 'max')
- UNALLOWED_VALUE = ErrorDefinition(0x44, 'allowed')
- UNALLOWED_VALUES = ErrorDefinition(0x45, 'allowed')
- FORBIDDEN_VALUE = ErrorDefinition(0x46, 'forbidden')
- FORBIDDEN_VALUES = ErrorDefinition(0x47, 'forbidden')
- # other
- NORMALIZATION = ErrorDefinition(0x60, None)
- COERCION_FAILED = ErrorDefinition(0x61, 'coerce')
- RENAMING_FAILED = ErrorDefinition(0x62, 'rename_handler')
- READONLY_FIELD = ErrorDefinition(0x63, 'readonly')
- SETTING_DEFAULT_FAILED = ErrorDefinition(0x64, 'default_setter')
- # groups
- ERROR_GROUP = ErrorDefinition(0x80, None)
- MAPPING_SCHEMA = ErrorDefinition(0x81, 'schema')
- SEQUENCE_SCHEMA = ErrorDefinition(0x82, 'schema')
- KEYSCHEMA = ErrorDefinition(0x83, 'keyschema')
- VALUESCHEMA = ErrorDefinition(0x84, 'valueschema')
- BAD_ITEMS = ErrorDefinition(0x8f, 'items')
- LOGICAL = ErrorDefinition(0x90, None)
- NONEOF = ErrorDefinition(0x91, 'noneof')
- ONEOF = ErrorDefinition(0x92, 'oneof')
- ANYOF = ErrorDefinition(0x93, 'anyof')
- ALLOF = ErrorDefinition(0x94, 'allof')
- """ SchemaError messages """
- SCHEMA_ERROR_DEFINITION_TYPE = \
- "schema definition for field '{0}' must be a dict"
- SCHEMA_ERROR_MISSING = "validation schema missing"
- """ Error representations """
- class ValidationError(object):
- """ A simple class to store and query basic error information. """
- def __init__(self, document_path, schema_path, code, rule, constraint,
- value, info):
- self.document_path = document_path
- """ The path to the field within the document that caused the error.
- Type: :class:`tuple` """
- self.schema_path = schema_path
- """ The path to the rule within the schema that caused the error.
- Type: :class:`tuple` """
- self.code = code
- """ The error's identifier code. Type: :class:`int` """
- self.rule = rule
- """ The rule that failed. Type: `string` """
- self.constraint = constraint
- """ The constraint that failed. """
- self.value = value
- """ The value that failed. """
- self.info = info
- """ May hold additional information about the error.
- Type: :class:`tuple` """
- def __eq__(self, other):
- """ Assumes the errors relate to the same document and schema. """
- return hash(self) == hash(other)
- def __hash__(self):
- """ Expects that all other properties are transitively determined. """
- return hash(self.document_path) ^ hash(self.schema_path) \
- ^ hash(self.code)
- def __lt__(self, other):
- if self.document_path != other.document_path:
- return compare_paths_lt(self.document_path, other.document_path)
- else:
- return compare_paths_lt(self.schema_path, other.schema_path)
- def __repr__(self):
- return "{class_name} @ {memptr} ( " \
- "document_path={document_path}," \
- "schema_path={schema_path}," \
- "code={code}," \
- "constraint={constraint}," \
- "value={value}," \
- "info={info} )"\
- .format(class_name=self.__class__.__name__, memptr=hex(id(self)), # noqa: E501
- document_path=self.document_path,
- schema_path=self.schema_path,
- code=hex(self.code),
- constraint=quote_string(self.constraint),
- value=quote_string(self.value),
- info=self.info)
- @property
- def child_errors(self):
- """
- A list that contains the individual errors of a bulk validation error.
- """
- return self.info[0] if self.is_group_error else None
- @property
- def definitions_errors(self):
- """ Dictionary with errors of an *of-rule mapped to the index of the
- definition it occurred in. Returns :obj:`None` if not applicable.
- """
- if not self.is_logic_error:
- return None
- result = defaultdict(list)
- for error in self.child_errors:
- i = error.schema_path[len(self.schema_path)]
- result[i].append(error)
- return result
- @property
- def field(self):
- """ Field of the contextual mapping, possibly :obj:`None`. """
- if self.document_path:
- return self.document_path[-1]
- else:
- return None
- @property
- def is_group_error(self):
- """ ``True`` for errors of bulk validations. """
- return bool(self.code & ERROR_GROUP.code)
- @property
- def is_logic_error(self):
- """ ``True`` for validation errors against different schemas with
- *of-rules. """
- return bool(self.code & LOGICAL.code - ERROR_GROUP.code)
- @property
- def is_normalization_error(self):
- """ ``True`` for normalization errors. """
- return bool(self.code & NORMALIZATION.code)
- class ErrorList(list):
- """ A list for :class:`~cerberus.errors.ValidationError` instances that
- can be queried with the ``in`` keyword for a particular
- :class:`~cerberus.errors.ErrorDefinition`. """
- def __contains__(self, error_definition):
- for code in (x.code for x in self):
- if code == error_definition.code:
- return True
- return False
- class ErrorTreeNode(MutableMapping):
- __slots__ = ('descendants', 'errors', 'parent_node', 'path', 'tree_root')
- def __init__(self, path, parent_node):
- self.parent_node = parent_node
- self.tree_root = self.parent_node.tree_root
- self.path = path[:self.parent_node.depth + 1]
- self.errors = ErrorList()
- self.descendants = {}
- def __add__(self, error):
- self.add(error)
- return self
- def __contains__(self, item):
- if isinstance(item, ErrorDefinition):
- return item in self.errors
- else:
- return item in self.descendants
- def __delitem__(self, key):
- del self.descendants[key]
- def __iter__(self):
- return iter(self.errors)
- def __getitem__(self, item):
- if isinstance(item, ErrorDefinition):
- for error in self.errors:
- if item.code == error.code:
- return error
- else:
- return self.descendants.get(item)
- def __len__(self):
- return len(self.errors)
- def __repr__(self):
- return self.__str__()
- def __setitem__(self, key, value):
- self.descendants[key] = value
- def __str__(self):
- return str(self.errors) + ',' + str(self.descendants)
- @property
- def depth(self):
- return len(self.path)
- @property
- def tree_type(self):
- return self.tree_root.tree_type
- def add(self, error):
- error_path = self._path_of_(error)
- key = error_path[self.depth]
- if key not in self.descendants:
- self[key] = ErrorTreeNode(error_path, self)
- if len(error_path) == self.depth + 1:
- self[key].errors.append(error)
- self[key].errors.sort()
- if error.is_group_error:
- for child_error in error.child_errors:
- self.tree_root += child_error
- else:
- self[key] += error
- def _path_of_(self, error):
- return getattr(error, self.tree_type + '_path')
- class ErrorTree(ErrorTreeNode):
- """ Base class for :class:`~cerberus.errors.DocumentErrorTree` and
- :class:`~cerberus.errors.SchemaErrorTree`. """
- def __init__(self, errors=[]):
- self.parent_node = None
- self.tree_root = self
- self.path = ()
- self.errors = ErrorList()
- self.descendants = {}
- for error in errors:
- self += error
- def add(self, error):
- """ Add an error to the tree.
- :param error: :class:`~cerberus.errors.ValidationError`
- """
- if not self._path_of_(error):
- self.errors.append(error)
- self.errors.sort()
- else:
- super(ErrorTree, self).add(error)
- def fetch_errors_from(self, path):
- """ Returns all errors for a particular path.
- :param path: :class:`tuple` of :term:`hashable` s.
- :rtype: :class:`~cerberus.errors.ErrorList`
- """
- node = self.fetch_node_from(path)
- if node is not None:
- return node.errors
- else:
- return ErrorList()
- def fetch_node_from(self, path):
- """ Returns a node for a path.
- :param path: Tuple of :term:`hashable` s.
- :rtype: :class:`~cerberus.errors.ErrorTreeNode` or :obj:`None`
- """
- context = self
- for key in path:
- context = context[key]
- if context is None:
- break
- return context
- class DocumentErrorTree(ErrorTree):
- """ Implements a dict-like class to query errors by indexes following the
- structure of a validated document. """
- tree_type = 'document'
- class SchemaErrorTree(ErrorTree):
- """ Implements a dict-like class to query errors by indexes following the
- structure of the used schema. """
- tree_type = 'schema'
- class BaseErrorHandler(object):
- """ Base class for all error handlers.
- Subclasses are identified as error-handlers with an instance-test. """
- def __init__(self, *args, **kwargs):
- """ Optionally initialize a new instance. """
- pass
- def __call__(self, errors):
- """ Returns errors in a handler-specific format.
- :param errors: An object containing the errors.
- :type errors: :term:`iterable` of
- :class:`~cerberus.errors.ValidationError` instances or a
- :class:`~cerberus.Validator` instance
- """
- raise NotImplementedError
- def __iter__(self):
- """ Be a superhero and implement an iterator over errors. """
- raise NotImplementedError
- def add(self, error):
- """ Add an error to the errors' container object of a handler.
- :param error: The error to add.
- :type error: :class:`~cerberus.errors.ValidationError`
- """
- raise NotImplementedError
- def emit(self, error):
- """ Optionally emits an error in the handler's format to a stream.
- Or light a LED, or even shut down a power plant.
- :param error: The error to emit.
- :type error: :class:`~cerberus.errors.ValidationError`
- """
- pass
- def end(self, validator):
- """ Gets called when a validation ends.
- :param validator: The calling validator.
- :type validator: :class:`~cerberus.Validator` """
- pass
- def extend(self, errors):
- """ Adds all errors to the handler's container object.
- :param errors: The errors to add.
- :type errors: :term:`iterable` of
- :class:`~cerberus.errors.ValidationError` instances
- """
- for error in errors:
- self.add(error)
- def start(self, validator):
- """ Gets called when a validation starts.
- :param validator: The calling validator.
- :type validator: :class:`~cerberus.Validator`
- """
- pass
- class ToyErrorHandler(BaseErrorHandler):
- def __call__(self, *args, **kwargs):
- raise RuntimeError('This is not supposed to happen.')
- def clear(self):
- pass
- def encode_unicode(f):
- """Cerberus error messages expect regular binary strings.
- If unicode is used in a ValidationError message can't be printed.
- This decorator ensures that if legacy Python is used unicode
- strings are encoded before passing to a function.
- """
- @wraps(f)
- def wrapped(obj, error):
- def _encode(value):
- """Helper encoding unicode strings into binary utf-8"""
- if isinstance(value, unicode): # noqa: F821
- return value.encode('utf-8')
- return value
- error = copy(error)
- error.document_path = _encode(error.document_path)
- error.schema_path = _encode(error.schema_path)
- error.constraint = _encode(error.constraint)
- error.value = _encode(error.value)
- error.info = _encode(error.info)
- return f(obj, error)
- return wrapped if PYTHON_VERSION < 3 else f
- class BasicErrorHandler(BaseErrorHandler):
- """ Models cerberus' legacy. Returns a :class:`dict`. When mangled
- through :class:`str` a pretty-formatted representation of that
- tree is returned.
- """
- messages = {0x00: "{0}",
- 0x01: "document is missing",
- 0x02: "required field",
- 0x03: "unknown field",
- 0x04: "field '{0}' is required",
- 0x05: "depends on these values: {constraint}",
- 0x06: "{0} must not be present with '{field}'",
- 0x21: "'{0}' is not a document, must be a dict",
- 0x22: "empty values not allowed",
- 0x23: "null value not allowed",
- 0x24: "must be of {constraint} type",
- 0x25: "must be of dict type",
- 0x26: "length of list should be {constraint}, it is {0}",
- 0x27: "min length is {constraint}",
- 0x28: "max length is {constraint}",
- 0x41: "value does not match regex '{constraint}'",
- 0x42: "min value is {constraint}",
- 0x43: "max value is {constraint}",
- 0x44: "unallowed value {value}",
- 0x45: "unallowed values {0}",
- 0x46: "unallowed value {value}",
- 0x47: "unallowed values {0}",
- 0x61: "field '{field}' cannot be coerced: {0}",
- 0x62: "field '{field}' cannot be renamed: {0}",
- 0x63: "field is read-only",
- 0x64: "default value for '{field}' cannot be set: {0}",
- 0x81: "mapping doesn't validate subschema: {0}",
- 0x82: "one or more sequence-items don't validate: {0}",
- 0x83: "one or more keys of a mapping don't validate: {0}",
- 0x84: "one or more values in a mapping don't validate: {0}",
- 0x85: "one or more sequence-items don't validate: {0}",
- 0x91: "one or more definitions validate",
- 0x92: "none or more than one rule validate",
- 0x93: "no definitions validate",
- 0x94: "one or more definitions don't validate"
- }
- def __init__(self, tree=None):
- self.tree = {} if tree is None else tree
- def __call__(self, errors=None):
- if errors is not None:
- self.clear()
- self.extend(errors)
- return self.pretty_tree
- def __str__(self):
- return pformat(self.pretty_tree)
- @property
- def pretty_tree(self):
- pretty = deepcopy(self.tree)
- for field in pretty:
- self._purge_empty_dicts(pretty[field])
- return pretty
- @encode_unicode
- def add(self, error):
- # Make sure the original error is not altered with
- # error paths specific to the handler.
- error = deepcopy(error)
- self._rewrite_error_path(error)
- if error.is_logic_error:
- self._insert_logic_error(error)
- elif error.is_group_error:
- self._insert_group_error(error)
- elif error.code in self.messages:
- self._insert_error(error.document_path,
- self._format_message(error.field, error))
- def clear(self):
- self.tree = {}
- def start(self, validator):
- self.clear()
- def _format_message(self, field, error):
- return self.messages[error.code].format(
- *error.info, constraint=error.constraint,
- field=field, value=error.value)
- def _insert_error(self, path, node):
- """ Adds an error or sub-tree to :attr:tree.
- :param path: Path to the error.
- :type path: Tuple of strings and integers.
- :param node: An error message or a sub-tree.
- :type node: String or dictionary.
- """
- field = path[0]
- if len(path) == 1:
- if field in self.tree:
- subtree = self.tree[field].pop()
- self.tree[field] += [node, subtree]
- else:
- self.tree[field] = [node, {}]
- elif len(path) >= 1:
- if field not in self.tree:
- self.tree[field] = [{}]
- subtree = self.tree[field][-1]
- if subtree:
- new = self.__class__(tree=copy(subtree))
- else:
- new = self.__class__()
- new._insert_error(path[1:], node)
- subtree.update(new.tree)
- def _insert_group_error(self, error):
- for child_error in error.child_errors:
- if child_error.is_logic_error:
- self._insert_logic_error(child_error)
- elif child_error.is_group_error:
- self._insert_group_error(child_error)
- else:
- self._insert_error(child_error.document_path,
- self._format_message(child_error.field,
- child_error))
- def _insert_logic_error(self, error):
- field = error.field
- self._insert_error(error.document_path,
- self._format_message(field, error))
- for definition_errors in error.definitions_errors.values():
- for child_error in definition_errors:
- if child_error.is_logic_error:
- self._insert_logic_error(child_error)
- elif child_error.is_group_error:
- self._insert_group_error(child_error)
- else:
- self._insert_error(child_error.document_path,
- self._format_message(field, child_error))
- def _purge_empty_dicts(self, error_list):
- subtree = error_list[-1]
- if not error_list[-1]:
- error_list.pop()
- else:
- for key in subtree:
- self._purge_empty_dicts(subtree[key])
- def _rewrite_error_path(self, error, offset=0):
- """
- Recursively rewrites the error path to correctly represent logic errors
- """
- if error.is_logic_error:
- self._rewrite_logic_error_path(error, offset)
- elif error.is_group_error:
- self._rewrite_group_error_path(error, offset)
- def _rewrite_group_error_path(self, error, offset=0):
- child_start = len(error.document_path) - offset
- for child_error in error.child_errors:
- relative_path = child_error.document_path[child_start:]
- child_error.document_path = error.document_path + relative_path
- self._rewrite_error_path(child_error, offset)
- def _rewrite_logic_error_path(self, error, offset=0):
- child_start = len(error.document_path) - offset
- for i, definition_errors in error.definitions_errors.items():
- if not definition_errors:
- continue
- nodename = '%s definition %s' % (error.rule, i)
- path = error.document_path + (nodename,)
- for child_error in definition_errors:
- rel_path = child_error.document_path[child_start:]
- child_error.document_path = path + rel_path
- self._rewrite_error_path(child_error, offset + 1)
- class SchemaErrorHandler(BasicErrorHandler):
- messages = BasicErrorHandler.messages.copy()
- messages[0x03] = "unknown rule"
|