| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407 |
- """
- Extensible validation for Python dictionaries.
- This module implements Cerberus Validator class
- :copyright: 2012-2016 by Nicola Iarocci.
- :license: ISC, see LICENSE for more details.
- Full documentation is available at http://python-cerberus.org
- """
- from __future__ import absolute_import
- from ast import literal_eval
- from collections import Hashable, Iterable, Mapping, Sequence
- from copy import copy
- from datetime import date, datetime
- import re
- from warnings import warn
- from cerberus import errors
- from cerberus.platform import _int_types, _str_type
- from cerberus.schema import (schema_registry, rules_set_registry,
- DefinitionSchema, SchemaError)
- from cerberus.utils import (drop_item_from_tuple, isclass,
- readonly_classproperty, TypeDefinition)
- toy_error_handler = errors.ToyErrorHandler()
- def dummy_for_rule_validation(rule_constraints):
- def dummy(self, constraint, field, value):
- raise RuntimeError('Dummy method called. Its purpose is to hold just'
- 'validation constraints for a rule in its '
- 'docstring.')
- f = dummy
- f.__doc__ = rule_constraints
- return f
- class DocumentError(Exception):
- """ Raised when the target document is missing or has the wrong format """
- pass
- class _SchemaRuleTypeError(Exception):
- """ Raised when a schema (list) validation encounters a mapping.
- Not supposed to be used outside this module. """
- pass
- class BareValidator(object):
- """ Validator class. Normalizes and/or validates any mapping against a
- validation-schema which is provided as an argument at class instantiation
- or upon calling the :meth:`~cerberus.Validator.validate`,
- :meth:`~cerberus.Validator.validated` or
- :meth:`~cerberus.Validator.normalized` method. An instance itself is
- callable and executes a validation.
- All instantiation parameters are optional.
- There are the introspective properties :attr:`types`, :attr:`validators`,
- :attr:`coercers`, :attr:`default_setters`, :attr:`rules`,
- :attr:`normalization_rules` and :attr:`validation_rules`.
- The attributes reflecting the available rules are assembled considering
- constraints that are defined in the docstrings of rules' methods and is
- effectively used as validation schema for :attr:`schema`.
- :param schema: See :attr:`~cerberus.Validator.schema`.
- Defaults to :obj:`None`.
- :type schema: any :term:`mapping`
- :param ignore_none_values: See :attr:`~cerberus.Validator.ignore_none_values`.
- Defaults to ``False``.
- :type ignore_none_values: :class:`bool`
- :param allow_unknown: See :attr:`~cerberus.Validator.allow_unknown`.
- Defaults to ``False``.
- :type allow_unknown: :class:`bool` or any :term:`mapping`
- :param purge_unknown: See :attr:`~cerberus.Validator.purge_unknown`.
- Defaults to to ``False``.
- :type purge_unknown: :class:`bool`
- :param error_handler: The error handler that formats the result of
- :attr:`~cerberus.Validator.errors`.
- When given as two-value tuple with an error-handler
- class and a dictionary, the latter is passed to the
- initialization of the error handler.
- Default: :class:`~cerberus.errors.BasicErrorHandler`.
- :type error_handler: class or instance based on
- :class:`~cerberus.errors.BaseErrorHandler` or
- :class:`tuple`
- """ # noqa: E501
- mandatory_validations = ('nullable',)
- """ Rules that are evaluated on any field, regardless whether defined in
- the schema or not.
- Type: :class:`tuple` """
- priority_validations = ('nullable', 'readonly', 'type', 'empty')
- """ Rules that will be processed in that order before any other.
- Type: :class:`tuple` """
- types_mapping = {
- 'binary':
- TypeDefinition('binary', (bytes, bytearray), ()),
- 'boolean':
- TypeDefinition('boolean', (bool,), ()),
- 'date':
- TypeDefinition('date', (date,), ()),
- 'datetime':
- TypeDefinition('datetime', (datetime,), ()),
- 'dict':
- TypeDefinition('dict', (Mapping,), ()),
- 'float':
- TypeDefinition('float', (float, _int_types), ()),
- 'integer':
- TypeDefinition('integer', (_int_types,), ()),
- 'list':
- TypeDefinition('list', (Sequence,), (_str_type,)),
- 'number':
- TypeDefinition('number', (_int_types, float), (bool,)),
- 'set':
- TypeDefinition('set', (set,), ()),
- 'string':
- TypeDefinition('string', (_str_type), ())
- }
- """ This mapping holds all available constraints for the type rule and
- their assigned :class:`~cerberus.TypeDefinition`. """
- _valid_schemas = set()
- """ A :class:`set` of hashes derived from validation schemas that are
- legit for a particular ``Validator`` class. """
- def __init__(self, *args, **kwargs):
- """ The arguments will be treated as with this signature:
- ext(self, schema=None, ignore_none_values=False,
- allow_unknown=False, purge_unknown=False,
- error_handler=errors.BasicErrorHandler)
- """
- self.document = None
- """ The document that is or was recently processed.
- Type: any :term:`mapping` """
- self._errors = errors.ErrorList()
- """ The list of errors that were encountered since the last document
- processing was invoked.
- Type: :class:`~cerberus.errors.ErrorList` """
- self.recent_error = None
- """ The last individual error that was submitted.
- Type: :class:`~cerberus.errors.ValidationError` """
- self.document_error_tree = errors.DocumentErrorTree()
- """ A tree representiation of encountered errors following the
- structure of the document.
- Type: :class:`~cerberus.errors.DocumentErrorTree` """
- self.schema_error_tree = errors.SchemaErrorTree()
- """ A tree representiation of encountered errors following the
- structure of the schema.
- Type: :class:`~cerberus.errors.SchemaErrorTree` """
- self.document_path = ()
- """ The path within the document to the current sub-document.
- Type: :class:`tuple` """
- self.schema_path = ()
- """ The path within the schema to the current sub-schema.
- Type: :class:`tuple` """
- self.update = False
- self.error_handler = self.__init_error_handler(kwargs)
- """ The error handler used to format :attr:`~cerberus.Validator.errors`
- and process submitted errors with
- :meth:`~cerberus.Validator._error`.
- Type: :class:`~cerberus.errors.BaseErrorHandler` """
- self.__store_config(args, kwargs)
- self.schema = kwargs.get('schema', None)
- self.allow_unknown = kwargs.get('allow_unknown', False)
- self._remaining_rules = []
- """ Keeps track of the rules that are next in line to be evaluated
- during the validation of a field.
- Type: :class:`list` """
- super(BareValidator, self).__init__()
- @staticmethod
- def __init_error_handler(kwargs):
- error_handler = kwargs.pop('error_handler', errors.BasicErrorHandler)
- if isinstance(error_handler, tuple):
- error_handler, eh_config = error_handler
- else:
- eh_config = {}
- if isclass(error_handler) and \
- issubclass(error_handler, errors.BaseErrorHandler):
- return error_handler(**eh_config)
- elif isinstance(error_handler, errors.BaseErrorHandler):
- return error_handler
- else:
- raise RuntimeError('Invalid error_handler.')
- def __store_config(self, args, kwargs):
- """ Assign args to kwargs and store configuration. """
- signature = ('schema', 'ignore_none_values', 'allow_unknown',
- 'purge_unknown')
- for i, p in enumerate(signature[:len(args)]):
- if p in kwargs:
- raise TypeError("ext got multiple values for argument "
- "'%s'" % p)
- else:
- kwargs[p] = args[i]
- self._config = kwargs
- """ This dictionary holds the configuration arguments that were used to
- initialize the :class:`Validator` instance except the
- ``error_handler``. """
- @classmethod
- def clear_caches(cls):
- """ Purge the cache of known valid schemas. """
- cls._valid_schemas.clear()
- def _error(self, *args):
- """ Creates and adds one or multiple errors.
- :param args: Accepts different argument's signatures.
- *1. Bulk addition of errors:*
- - :term:`iterable` of
- :class:`~cerberus.errors.ValidationError`-instances
- The errors will be added to
- :attr:`~cerberus.Validator._errors`.
- *2. Custom error:*
- - the invalid field's name
- - the error message
- A custom error containing the message will be created and
- added to :attr:`~cerberus.Validator._errors`.
- There will however be fewer information contained in the
- error (no reference to the violated rule and its
- constraint).
- *3. Defined error:*
- - the invalid field's name
- - the error-reference, see :mod:`cerberus.errors`
- - arbitrary, supplemental information about the error
- A :class:`~cerberus.errors.ValidationError` instance will
- be created and added to
- :attr:`~cerberus.Validator._errors`.
- """
- if len(args) == 1:
- self._errors.extend(args[0])
- self._errors.sort()
- for error in args[0]:
- self.document_error_tree += error
- self.schema_error_tree += error
- self.error_handler.emit(error)
- elif len(args) == 2 and isinstance(args[1], _str_type):
- self._error(args[0], errors.CUSTOM, args[1])
- elif len(args) >= 2:
- field = args[0]
- code = args[1].code
- rule = args[1].rule
- info = args[2:]
- document_path = self.document_path + (field, )
- schema_path = self.schema_path
- if code != errors.UNKNOWN_FIELD.code and rule is not None:
- schema_path += (field, rule)
- if not rule:
- constraint = None
- else:
- field_definitions = self._resolve_rules_set(self.schema[field])
- if rule == 'nullable':
- constraint = field_definitions.get(rule, False)
- else:
- constraint = field_definitions[rule]
- value = self.document.get(field)
- self.recent_error = errors.ValidationError(
- document_path, schema_path, code, rule, constraint, value, info
- )
- self._error([self.recent_error])
- def _get_child_validator(self, document_crumb=None, schema_crumb=None,
- **kwargs):
- """ Creates a new instance of Validator-(sub-)class. All initial
- parameters of the parent are passed to the initialization, unless
- a parameter is given as an explicit *keyword*-parameter.
- :param document_crumb: Extends the
- :attr:`~cerberus.Validator.document_path`
- of the child-validator.
- :type document_crumb: :class:`tuple` or :term:`hashable`
- :param schema_crumb: Extends the
- :attr:`~cerberus.Validator.schema_path`
- of the child-validator.
- :type schema_crumb: :class:`tuple` or hashable
- :param kwargs: Overriding keyword-arguments for initialization.
- :type kwargs: :class:`dict`
- :return: an instance of ``self.__class__``
- """
- child_config = self._config.copy()
- child_config.update(kwargs)
- if not self.is_child:
- child_config['is_child'] = True
- child_config['error_handler'] = toy_error_handler
- child_config['root_allow_unknown'] = self.allow_unknown
- child_config['root_document'] = self.document
- child_config['root_schema'] = self.schema
- child_validator = self.__class__(**child_config)
- if document_crumb is None:
- child_validator.document_path = self.document_path
- else:
- if not isinstance(document_crumb, tuple):
- document_crumb = (document_crumb, )
- child_validator.document_path = self.document_path + document_crumb
- if schema_crumb is None:
- child_validator.schema_path = self.schema_path
- else:
- if not isinstance(schema_crumb, tuple):
- schema_crumb = (schema_crumb, )
- child_validator.schema_path = self.schema_path + schema_crumb
- return child_validator
- def __get_rule_handler(self, domain, rule):
- methodname = '_{0}_{1}'.format(domain, rule.replace(' ', '_'))
- result = getattr(self, methodname, None)
- if result is None:
- raise RuntimeError("There's no handler for '{}' in the '{}' "
- "domain.".format(rule, domain))
- return result
- def _drop_nodes_from_errorpaths(self, _errors, dp_items, sp_items):
- """ Removes nodes by index from an errorpath, relatively to the
- basepaths of self.
- :param errors: A list of :class:`errors.ValidationError` instances.
- :param dp_items: A list of integers, pointing at the nodes to drop from
- the :attr:`document_path`.
- :param sp_items: Alike ``dp_items``, but for :attr:`schema_path`.
- """
- dp_basedepth = len(self.document_path)
- sp_basedepth = len(self.schema_path)
- for error in _errors:
- for i in sorted(dp_items, reverse=True):
- error.document_path = \
- drop_item_from_tuple(error.document_path, dp_basedepth + i)
- for i in sorted(sp_items, reverse=True):
- error.schema_path = \
- drop_item_from_tuple(error.schema_path, sp_basedepth + i)
- if error.child_errors:
- self._drop_nodes_from_errorpaths(error.child_errors,
- dp_items, sp_items)
- def _lookup_field(self, path):
- """ Searches for a field as defined by path. This method is used by the
- ``dependency`` evaluation logic.
- :param path: Path elements are separated by a ``.``. A leading ``^``
- indicates that the path relates to the document root,
- otherwise it relates to the currently evaluated document,
- which is possibly a subdocument.
- The sequence ``^^`` at the start will be interpreted as a
- literal ``^``.
- :type path: :class:`str`
- :returns: Either the found field name and its value or :obj:`None` for
- both.
- :rtype: A two-value :class:`tuple`.
- """
- if path.startswith('^'):
- path = path[1:]
- context = self.document if path.startswith('^') \
- else self.root_document
- else:
- context = self.document
- parts = path.split('.')
- for part in parts:
- if part not in context:
- return None, None
- context = context.get(part)
- return parts[-1], context
- def _resolve_rules_set(self, rules_set):
- if isinstance(rules_set, Mapping):
- return rules_set
- elif isinstance(rules_set, _str_type):
- return self.rules_set_registry.get(rules_set)
- return None
- def _resolve_schema(self, schema):
- if isinstance(schema, Mapping):
- return schema
- elif isinstance(schema, _str_type):
- return self.schema_registry.get(schema)
- return None
- # Properties
- @property
- def allow_unknown(self):
- """ If ``True`` unknown fields that are not defined in the schema will
- be ignored. If a mapping with a validation schema is given, any
- undefined field will be validated against its rules.
- Also see :ref:`allowing-the-unknown`.
- Type: :class:`bool` or any :term:`mapping` """
- return self._config.get('allow_unknown', False)
- @allow_unknown.setter
- def allow_unknown(self, value):
- if not (self.is_child or isinstance(value, (bool, DefinitionSchema))):
- DefinitionSchema(self, {'allow_unknown': value})
- self._config['allow_unknown'] = value
- @property
- def errors(self):
- """ The errors of the last processing formatted by the handler that is
- bound to :attr:`~cerberus.Validator.error_handler`. """
- return self.error_handler(self._errors)
- @property
- def ignore_none_values(self):
- """ Whether to not process :obj:`None`-values in a document or not.
- Type: :class:`bool` """
- return self._config.get('ignore_none_values', False)
- @ignore_none_values.setter
- def ignore_none_values(self, value):
- self._config['ignore_none_values'] = value
- @property
- def is_child(self):
- """ ``True`` for child-validators obtained with
- :meth:`~cerberus.Validator._get_child_validator`.
- Type: :class:`bool` """
- return self._config.get('is_child', False)
- @property
- def _is_normalized(self):
- """ ``True`` if the document is already normalized. """
- return self._config.get('_is_normalized', False)
- @_is_normalized.setter
- def _is_normalized(self, value):
- self._config['_is_normalized'] = value
- @property
- def purge_unknown(self):
- """ If ``True`` unknown fields will be deleted from the document
- unless a validation is called with disabled normalization.
- Also see :ref:`purging-unknown-fields`. Type: :class:`bool` """
- return self._config.get('purge_unknown', False)
- @purge_unknown.setter
- def purge_unknown(self, value):
- self._config['purge_unknown'] = value
- @property
- def root_allow_unknown(self):
- """ The :attr:`~cerberus.Validator.allow_unknown` attribute of the
- first level ancestor of a child validator. """
- return self._config.get('root_allow_unknown', self.allow_unknown)
- @property
- def root_document(self):
- """ The :attr:`~cerberus.Validator.document` attribute of the
- first level ancestor of a child validator. """
- return self._config.get('root_document', self.document)
- @property
- def rules_set_registry(self):
- """ The registry that holds referenced rules sets.
- Type: :class:`~cerberus.Registry` """
- return self._config.get('rules_set_registry', rules_set_registry)
- @rules_set_registry.setter
- def rules_set_registry(self, registry):
- self._config['rules_set_registry'] = registry
- @property
- def root_schema(self):
- """ The :attr:`~cerberus.Validator.schema` attribute of the
- first level ancestor of a child validator. """
- return self._config.get('root_schema', self.schema)
- @property
- def schema(self):
- """ The validation schema of a validator. When a schema is passed to
- a method, it replaces this attribute.
- Type: any :term:`mapping` or :obj:`None` """
- return self._schema
- @schema.setter
- def schema(self, schema):
- if schema is None:
- self._schema = None
- elif self.is_child or isinstance(schema, DefinitionSchema):
- self._schema = schema
- else:
- self._schema = DefinitionSchema(self, schema)
- @property
- def schema_registry(self):
- """ The registry that holds referenced schemas.
- Type: :class:`~cerberus.Registry` """
- return self._config.get('schema_registry', schema_registry)
- @schema_registry.setter
- def schema_registry(self, registry):
- self._config['schema_registry'] = registry
- # FIXME the returned method has the correct docstring, but doesn't appear
- # in the API docs
- @readonly_classproperty
- def types(cls):
- """ The constraints that can be used for the 'type' rule.
- Type: A tuple of strings. """
- redundant_types = \
- set(cls.types_mapping) & set(cls._types_from_methods)
- if redundant_types:
- warn("These types are defined both with a method and in the"
- "'types_mapping' property of this validator: %s"
- % redundant_types)
- return tuple(cls.types_mapping) + cls._types_from_methods
- # Document processing
- def __init_processing(self, document, schema=None):
- self._errors = errors.ErrorList()
- self.recent_error = None
- self.document_error_tree = errors.DocumentErrorTree()
- self.schema_error_tree = errors.SchemaErrorTree()
- self.document = copy(document)
- if not self.is_child:
- self._is_normalized = False
- if schema is not None:
- self.schema = DefinitionSchema(self, schema)
- elif self.schema is None:
- if isinstance(self.allow_unknown, Mapping):
- self._schema = {}
- else:
- raise SchemaError(errors.SCHEMA_ERROR_MISSING)
- if document is None:
- raise DocumentError(errors.DOCUMENT_MISSING)
- if not isinstance(document, Mapping):
- raise DocumentError(
- errors.DOCUMENT_FORMAT.format(document))
- self.error_handler.start(self)
- def _drop_remaining_rules(self, *rules):
- """ Drops rules from the queue of the rules that still need to be
- evaluated for the currently processed field.
- If no arguments are given, the whole queue is emptied.
- """
- if rules:
- for rule in rules:
- try:
- self._remaining_rules.remove(rule)
- except ValueError:
- pass
- else:
- self._remaining_rules = []
- # # Normalizing
- def normalized(self, document, schema=None, always_return_document=False):
- """ Returns the document normalized according to the specified rules
- of a schema.
- :param document: The document to normalize.
- :type document: any :term:`mapping`
- :param schema: The validation schema. Defaults to :obj:`None`. If not
- provided here, the schema must have been provided at
- class instantiation.
- :type schema: any :term:`mapping`
- :param always_return_document: Return the document, even if an error
- occurred. Defaults to: ``False``.
- :type always_return_document: :class:`bool`
- :return: A normalized copy of the provided mapping or :obj:`None` if an
- error occurred during normalization.
- """
- self.__init_processing(document, schema)
- self.__normalize_mapping(self.document, self.schema)
- self.error_handler.end(self)
- if self._errors and not always_return_document:
- return None
- else:
- return self.document
- def __normalize_mapping(self, mapping, schema):
- if isinstance(schema, _str_type):
- schema = self._resolve_schema(schema)
- schema = schema.copy()
- for field in schema:
- schema[field] = self._resolve_rules_set(schema[field])
- self.__normalize_rename_fields(mapping, schema)
- if self.purge_unknown and not self.allow_unknown:
- self._normalize_purge_unknown(mapping, schema)
- # Check `readonly` fields before applying default values because
- # a field's schema definition might contain both `readonly` and
- # `default`.
- self.__validate_readonly_fields(mapping, schema)
- self.__normalize_default_fields(mapping, schema)
- self._normalize_coerce(mapping, schema)
- self.__normalize_containers(mapping, schema)
- self._is_normalized = True
- return mapping
- def _normalize_coerce(self, mapping, schema):
- """ {'oneof': [
- {'type': 'callable'},
- {'type': 'list',
- 'schema': {'oneof': [{'type': 'callable'},
- {'type': 'string'}]}},
- {'type': 'string'}
- ]} """
- error = errors.COERCION_FAILED
- for field in mapping:
- if field in schema and 'coerce' in schema[field]:
- mapping[field] = self.__normalize_coerce(
- schema[field]['coerce'], field, mapping[field],
- schema[field].get('nullable', False), error)
- elif isinstance(self.allow_unknown, Mapping) and \
- 'coerce' in self.allow_unknown:
- mapping[field] = self.__normalize_coerce(
- self.allow_unknown['coerce'], field, mapping[field],
- self.allow_unknown.get('nullable', False), error)
- def __normalize_coerce(self, processor, field, value, nullable, error):
- if isinstance(processor, _str_type):
- processor = self.__get_rule_handler('normalize_coerce', processor)
- elif isinstance(processor, Iterable):
- result = value
- for p in processor:
- result = self.__normalize_coerce(p, field, result,
- nullable, error)
- if errors.COERCION_FAILED in \
- self.document_error_tree.fetch_errors_from(
- self.document_path + (field,)):
- break
- return result
- try:
- return processor(value)
- except Exception as e:
- if not nullable and e is not TypeError:
- self._error(field, error, str(e))
- return value
- def __normalize_containers(self, mapping, schema):
- for field in mapping:
- if field not in schema:
- continue
- # TODO: This check conflates validation and normalization
- if isinstance(mapping[field], Mapping):
- if 'keyschema' in schema[field]:
- self.__normalize_mapping_per_keyschema(
- field, mapping, schema[field]['keyschema'])
- if 'valueschema' in schema[field]:
- self.__normalize_mapping_per_valueschema(
- field, mapping, schema[field]['valueschema'])
- if set(schema[field]) & set(('allow_unknown', 'purge_unknown',
- 'schema')):
- try:
- self.__normalize_mapping_per_schema(
- field, mapping, schema)
- except _SchemaRuleTypeError:
- pass
- elif isinstance(mapping[field], _str_type):
- continue
- elif isinstance(mapping[field], Sequence) and \
- 'schema' in schema[field]:
- self.__normalize_sequence(field, mapping, schema)
- def __normalize_mapping_per_keyschema(self, field, mapping, property_rules):
- schema = dict(((k, property_rules) for k in mapping[field]))
- document = dict(((k, k) for k in mapping[field]))
- validator = self._get_child_validator(
- document_crumb=field, schema_crumb=(field, 'keyschema'),
- schema=schema)
- result = validator.normalized(document, always_return_document=True)
- if validator._errors:
- self._drop_nodes_from_errorpaths(validator._errors, [], [2, 4])
- self._error(validator._errors)
- for k in result:
- if k == result[k]:
- continue
- if result[k] in mapping[field]:
- warn("Normalizing keys of {path}: {key} already exists, "
- "its value is replaced."
- .format(path='.'.join(self.document_path + (field,)),
- key=k))
- mapping[field][result[k]] = mapping[field][k]
- else:
- mapping[field][result[k]] = mapping[field][k]
- del mapping[field][k]
- def __normalize_mapping_per_valueschema(self, field, mapping, value_rules):
- schema = dict(((k, value_rules) for k in mapping[field]))
- validator = self._get_child_validator(
- document_crumb=field, schema_crumb=(field, 'valueschema'),
- schema=schema)
- mapping[field] = validator.normalized(mapping[field],
- always_return_document=True)
- if validator._errors:
- self._drop_nodes_from_errorpaths(validator._errors, [], [2])
- self._error(validator._errors)
- def __normalize_mapping_per_schema(self, field, mapping, schema):
- validator = self._get_child_validator(
- document_crumb=field, schema_crumb=(field, 'schema'),
- schema=schema[field].get('schema', {}),
- allow_unknown=schema[field].get('allow_unknown', self.allow_unknown), # noqa: E501
- purge_unknown=schema[field].get('purge_unknown', self.purge_unknown)) # noqa: E501
- value_type = type(mapping[field])
- result_value = validator.normalized(mapping[field],
- always_return_document=True)
- mapping[field] = value_type(result_value)
- if validator._errors:
- self._error(validator._errors)
- def __normalize_sequence(self, field, mapping, schema):
- schema = dict(((k, schema[field]['schema'])
- for k in range(len(mapping[field]))))
- document = dict((k, v) for k, v in enumerate(mapping[field]))
- validator = self._get_child_validator(
- document_crumb=field, schema_crumb=(field, 'schema'),
- schema=schema)
- value_type = type(mapping[field])
- result = validator.normalized(document, always_return_document=True)
- mapping[field] = value_type(result.values())
- if validator._errors:
- self._drop_nodes_from_errorpaths(validator._errors, [], [2])
- self._error(validator._errors)
- @staticmethod
- def _normalize_purge_unknown(mapping, schema):
- """ {'type': 'boolean'} """
- for field in tuple(mapping):
- if field not in schema:
- del mapping[field]
- return mapping
- def __normalize_rename_fields(self, mapping, schema):
- for field in tuple(mapping):
- if field in schema:
- self._normalize_rename(mapping, schema, field)
- self._normalize_rename_handler(mapping, schema, field)
- elif isinstance(self.allow_unknown, Mapping) and \
- 'rename_handler' in self.allow_unknown:
- self._normalize_rename_handler(
- mapping, {field: self.allow_unknown}, field)
- return mapping
- def _normalize_rename(self, mapping, schema, field):
- """ {'type': 'hashable'} """
- if 'rename' in schema[field]:
- mapping[schema[field]['rename']] = mapping[field]
- del mapping[field]
- def _normalize_rename_handler(self, mapping, schema, field):
- """ {'oneof': [
- {'type': 'callable'},
- {'type': 'list',
- 'schema': {'oneof': [{'type': 'callable'},
- {'type': 'string'}]}},
- {'type': 'string'}
- ]} """
- if 'rename_handler' not in schema[field]:
- return
- new_name = self.__normalize_coerce(
- schema[field]['rename_handler'], field, field,
- False, errors.RENAMING_FAILED)
- if new_name != field:
- mapping[new_name] = mapping[field]
- del mapping[field]
- def __validate_readonly_fields(self, mapping, schema):
- for field in (x for x in schema if x in mapping and
- self._resolve_rules_set(schema[x]).get('readonly')):
- self._validate_readonly(schema[field]['readonly'], field,
- mapping[field])
- def __normalize_default_fields(self, mapping, schema):
- fields = [x for x in schema if x not in mapping or
- mapping[x] is None and not schema[x].get('nullable', False)]
- try:
- fields_with_default = [x for x in fields if 'default' in schema[x]]
- except TypeError:
- raise _SchemaRuleTypeError
- for field in fields_with_default:
- self._normalize_default(mapping, schema, field)
- known_fields_states = set()
- fields = [x for x in fields if 'default_setter' in schema[x]]
- while fields:
- field = fields.pop(0)
- try:
- self._normalize_default_setter(mapping, schema, field)
- except KeyError:
- fields.append(field)
- except Exception as e:
- self._error(field, errors.SETTING_DEFAULT_FAILED, str(e))
- fields_state = tuple(fields)
- if fields_state in known_fields_states:
- for field in fields:
- self._error(field, errors.SETTING_DEFAULT_FAILED,
- 'Circular dependencies of default setters.')
- break
- else:
- known_fields_states.add(fields_state)
- def _normalize_default(self, mapping, schema, field):
- """ {'nullable': True} """
- mapping[field] = schema[field]['default']
- def _normalize_default_setter(self, mapping, schema, field):
- """ {'oneof': [
- {'type': 'callable'},
- {'type': 'string'}
- ]} """
- if 'default_setter' in schema[field]:
- setter = schema[field]['default_setter']
- if isinstance(setter, _str_type):
- setter = self.__get_rule_handler('normalize_default_setter',
- setter)
- mapping[field] = setter(mapping)
- # # Validating
- def validate(self, document, schema=None, update=False, normalize=True):
- """ Normalizes and validates a mapping against a validation-schema of
- defined rules.
- :param document: The document to normalize.
- :type document: any :term:`mapping`
- :param schema: The validation schema. Defaults to :obj:`None`. If not
- provided here, the schema must have been provided at
- class instantiation.
- :type schema: any :term:`mapping`
- :param update: If ``True``, required fields won't be checked.
- :type update: :class:`bool`
- :param normalize: If ``True``, normalize the document before validation.
- :type normalize: :class:`bool`
- :return: ``True`` if validation succeeds, otherwise ``False``. Check
- the :func:`errors` property for a list of processing errors.
- :rtype: :class:`bool`
- """
- self.update = update
- self._unrequired_by_excludes = set()
- self.__init_processing(document, schema)
- if normalize:
- self.__normalize_mapping(self.document, self.schema)
- for field in self.document:
- if self.ignore_none_values and self.document[field] is None:
- continue
- definitions = self.schema.get(field)
- if definitions is not None:
- self.__validate_definitions(definitions, field)
- else:
- self.__validate_unknown_fields(field)
- if not self.update:
- self.__validate_required_fields(self.document)
- self.error_handler.end(self)
- return not bool(self._errors)
- __call__ = validate
- def validated(self, *args, **kwargs):
- """ Wrapper around :meth:`~cerberus.Validator.validate` that returns
- the normalized and validated document or :obj:`None` if validation
- failed. """
- always_return_document = kwargs.pop('always_return_document', False)
- self.validate(*args, **kwargs)
- if self._errors and not always_return_document:
- return None
- else:
- return self.document
- def __validate_unknown_fields(self, field):
- if self.allow_unknown:
- value = self.document[field]
- if isinstance(self.allow_unknown, (Mapping, _str_type)):
- # validate that unknown fields matches the schema
- # for unknown_fields
- schema_crumb = 'allow_unknown' if self.is_child \
- else '__allow_unknown__'
- validator = self._get_child_validator(
- schema_crumb=schema_crumb,
- schema={field: self.allow_unknown})
- if not validator({field: value}, normalize=False):
- self._error(validator._errors)
- else:
- self._error(field, errors.UNKNOWN_FIELD)
- def __validate_definitions(self, definitions, field):
- """ Validate a field's value against its defined rules. """
- def validate_rule(rule):
- validator = self.__get_rule_handler('validate', rule)
- return validator(definitions.get(rule, None), field, value)
- definitions = self._resolve_rules_set(definitions)
- value = self.document[field]
- rules_queue = [x for x in self.priority_validations
- if x in definitions or x in self.mandatory_validations]
- rules_queue.extend(x for x in self.mandatory_validations
- if x not in rules_queue)
- rules_queue.extend(x for x in definitions
- if x not in rules_queue and
- x not in self.normalization_rules and
- x not in ('allow_unknown', 'required'))
- self._remaining_rules = rules_queue
- while self._remaining_rules:
- rule = self._remaining_rules.pop(0)
- try:
- result = validate_rule(rule)
- # TODO remove on next breaking release
- if result:
- break
- except _SchemaRuleTypeError:
- break
- self._drop_remaining_rules()
- # Remember to keep the validation methods below this line
- # sorted alphabetically
- _validate_allow_unknown = dummy_for_rule_validation(
- """ {'oneof': [{'type': 'boolean'},
- {'type': ['dict', 'string'],
- 'validator': 'bulk_schema'}]} """)
- def _validate_allowed(self, allowed_values, field, value):
- """ {'type': 'list'} """
- if isinstance(value, Iterable) and not isinstance(value, _str_type):
- unallowed = set(value) - set(allowed_values)
- if unallowed:
- self._error(field, errors.UNALLOWED_VALUES, list(unallowed))
- else:
- if value not in allowed_values:
- self._error(field, errors.UNALLOWED_VALUE, value)
- def _validate_dependencies(self, dependencies, field, value):
- """ {'type': ('dict', 'hashable', 'list'),
- 'validator': 'dependencies'} """
- if isinstance(dependencies, _str_type):
- dependencies = (dependencies,)
- if isinstance(dependencies, Sequence):
- self.__validate_dependencies_sequence(dependencies, field)
- elif isinstance(dependencies, Mapping):
- self.__validate_dependencies_mapping(dependencies, field)
- if self.document_error_tree.fetch_node_from(
- self.schema_path + (field, 'dependencies')) is not None:
- return True
- def __validate_dependencies_mapping(self, dependencies, field):
- validated_dependencies_counter = 0
- error_info = {}
- for dependency_name, dependency_values in dependencies.items():
- if (not isinstance(dependency_values, Sequence) or
- isinstance(dependency_values, _str_type)):
- dependency_values = [dependency_values]
- wanted_field, wanted_field_value = \
- self._lookup_field(dependency_name)
- if wanted_field_value in dependency_values:
- validated_dependencies_counter += 1
- else:
- error_info.update({dependency_name: wanted_field_value})
- if validated_dependencies_counter != len(dependencies):
- self._error(field, errors.DEPENDENCIES_FIELD_VALUE, error_info)
- def __validate_dependencies_sequence(self, dependencies, field):
- for dependency in dependencies:
- if self._lookup_field(dependency)[0] is None:
- self._error(field, errors.DEPENDENCIES_FIELD, dependency)
- def _validate_empty(self, empty, field, value):
- """ {'type': 'boolean'} """
- if isinstance(value, Iterable) and len(value) == 0:
- self._drop_remaining_rules(
- 'allowed', 'forbidden', 'items', 'minlength', 'maxlength',
- 'regex', 'validator')
- if not empty:
- self._error(field, errors.EMPTY_NOT_ALLOWED)
- def _validate_excludes(self, excludes, field, value):
- """ {'type': ('hashable', 'list'),
- 'schema': {'type': 'hashable'}} """
- if isinstance(excludes, Hashable):
- excludes = [excludes]
- # Save required field to be checked latter
- if 'required' in self.schema[field] and self.schema[field]['required']:
- self._unrequired_by_excludes.add(field)
- for exclude in excludes:
- if (exclude in self.schema and
- 'required' in self.schema[exclude] and
- self.schema[exclude]['required']):
- self._unrequired_by_excludes.add(exclude)
- if [True for key in excludes if key in self.document]:
- # Wrap each field in `excludes` list between quotes
- exclusion_str = ', '.join("'{0}'"
- .format(word) for word in excludes)
- self._error(field, errors.EXCLUDES_FIELD, exclusion_str)
- def _validate_forbidden(self, forbidden_values, field, value):
- """ {'type': 'list'} """
- if isinstance(value, _str_type):
- if value in forbidden_values:
- self._error(field, errors.FORBIDDEN_VALUE, value)
- elif isinstance(value, Sequence):
- forbidden = set(value) & set(forbidden_values)
- if forbidden:
- self._error(field, errors.FORBIDDEN_VALUES, list(forbidden))
- elif isinstance(value, int):
- if value in forbidden_values:
- self._error(field, errors.FORBIDDEN_VALUE, value)
- def _validate_items(self, items, field, values):
- """ {'type': 'list', 'validator': 'items'} """
- if len(items) != len(values):
- self._error(field, errors.ITEMS_LENGTH, len(items), len(values))
- else:
- schema = dict((i, definition) for i, definition in enumerate(items)) # noqa: E501
- validator = self._get_child_validator(document_crumb=field,
- schema_crumb=(field, 'items'), # noqa: E501
- schema=schema)
- if not validator(dict((i, value) for i, value in enumerate(values)),
- update=self.update, normalize=False):
- self._error(field, errors.BAD_ITEMS, validator._errors)
- def __validate_logical(self, operator, definitions, field, value):
- """ Validates value against all definitions and logs errors according
- to the operator. """
- valid_counter = 0
- _errors = errors.ErrorList()
- for i, definition in enumerate(definitions):
- schema = {field: definition.copy()}
- for rule in ('allow_unknown', 'type'):
- if rule not in schema[field] and rule in self.schema[field]:
- schema[field][rule] = self.schema[field][rule]
- if 'allow_unknown' not in schema[field]:
- schema[field]['allow_unknown'] = self.allow_unknown
- validator = self._get_child_validator(
- schema_crumb=(field, operator, i),
- schema=schema, allow_unknown=True)
- if validator(self.document, update=self.update, normalize=False):
- valid_counter += 1
- else:
- self._drop_nodes_from_errorpaths(validator._errors, [], [3])
- _errors.extend(validator._errors)
- return valid_counter, _errors
- def _validate_anyof(self, definitions, field, value):
- """ {'type': 'list', 'logical': 'anyof'} """
- valids, _errors = \
- self.__validate_logical('anyof', definitions, field, value)
- if valids < 1:
- self._error(field, errors.ANYOF, _errors,
- valids, len(definitions))
- def _validate_allof(self, definitions, field, value):
- """ {'type': 'list', 'logical': 'allof'} """
- valids, _errors = \
- self.__validate_logical('allof', definitions, field, value)
- if valids < len(definitions):
- self._error(field, errors.ALLOF, _errors,
- valids, len(definitions))
- def _validate_noneof(self, definitions, field, value):
- """ {'type': 'list', 'logical': 'noneof'} """
- valids, _errors = \
- self.__validate_logical('noneof', definitions, field, value)
- if valids > 0:
- self._error(field, errors.NONEOF, _errors,
- valids, len(definitions))
- def _validate_oneof(self, definitions, field, value):
- """ {'type': 'list', 'logical': 'oneof'} """
- valids, _errors = \
- self.__validate_logical('oneof', definitions, field, value)
- if valids != 1:
- self._error(field, errors.ONEOF, _errors,
- valids, len(definitions))
- def _validate_max(self, max_value, field, value):
- """ {'nullable': False } """
- try:
- if value > max_value:
- self._error(field, errors.MAX_VALUE)
- except TypeError:
- pass
- def _validate_min(self, min_value, field, value):
- """ {'nullable': False } """
- try:
- if value < min_value:
- self._error(field, errors.MIN_VALUE)
- except TypeError:
- pass
- def _validate_maxlength(self, max_length, field, value):
- """ {'type': 'integer'} """
- if isinstance(value, Iterable) and len(value) > max_length:
- self._error(field, errors.MAX_LENGTH, len(value))
- def _validate_minlength(self, min_length, field, value):
- """ {'type': 'integer'} """
- if isinstance(value, Iterable) and len(value) < min_length:
- self._error(field, errors.MIN_LENGTH, len(value))
- def _validate_nullable(self, nullable, field, value):
- """ {'type': 'boolean'} """
- if value is None:
- if not nullable:
- self._error(field, errors.NOT_NULLABLE)
- self._drop_remaining_rules(
- 'empty', 'forbidden', 'items', 'keyschema', 'min', 'max',
- 'minlength', 'maxlength', 'regex', 'schema', 'type',
- 'valueschema')
- def _validate_keyschema(self, schema, field, value):
- """ {'type': ['dict', 'string'], 'validator': 'bulk_schema',
- 'forbidden': ['rename', 'rename_handler']} """
- if isinstance(value, Mapping):
- validator = self._get_child_validator(
- document_crumb=field,
- schema_crumb=(field, 'keyschema'),
- schema=dict(((k, schema) for k in value.keys())))
- if not validator(dict(((k, k) for k in value.keys())),
- normalize=False):
- self._drop_nodes_from_errorpaths(validator._errors,
- [], [2, 4])
- self._error(field, errors.KEYSCHEMA, validator._errors)
- def _validate_readonly(self, readonly, field, value):
- """ {'type': 'boolean'} """
- if readonly:
- if not self._is_normalized:
- self._error(field, errors.READONLY_FIELD)
- # If the document was normalized (and therefore already been
- # checked for readonly fields), we still have to return True
- # if an error was filed.
- has_error = errors.READONLY_FIELD in \
- self.document_error_tree.fetch_errors_from(
- self.document_path + (field,))
- if self._is_normalized and has_error:
- self._drop_remaining_rules()
- def _validate_regex(self, pattern, field, value):
- """ {'type': 'string'} """
- if not isinstance(value, _str_type):
- return
- if not pattern.endswith('$'):
- pattern += '$'
- re_obj = re.compile(pattern)
- if not re_obj.match(value):
- self._error(field, errors.REGEX_MISMATCH)
- _validate_required = dummy_for_rule_validation(""" {'type': 'boolean'} """)
- def __validate_required_fields(self, document):
- """ Validates that required fields are not missing.
- :param document: The document being validated.
- """
- try:
- required = set(field for field, definition in self.schema.items()
- if self._resolve_rules_set(definition).
- get('required') is True)
- except AttributeError:
- if self.is_child and self.schema_path[-1] == 'schema':
- raise _SchemaRuleTypeError
- else:
- raise
- required -= self._unrequired_by_excludes
- missing = required - set(field for field in document
- if document.get(field) is not None or
- not self.ignore_none_values)
- for field in missing:
- self._error(field, errors.REQUIRED_FIELD)
- # At least on field from self._unrequired_by_excludes should be
- # present in document
- if self._unrequired_by_excludes:
- fields = set(field for field in document
- if document.get(field) is not None)
- if self._unrequired_by_excludes.isdisjoint(fields):
- for field in self._unrequired_by_excludes - fields:
- self._error(field, errors.REQUIRED_FIELD)
- def _validate_schema(self, schema, field, value):
- """ {'type': ['dict', 'string'],
- 'anyof': [{'validator': 'schema'},
- {'validator': 'bulk_schema'}]} """
- if schema is None:
- return
- if isinstance(value, Sequence) and not isinstance(value, _str_type):
- self.__validate_schema_sequence(field, schema, value)
- elif isinstance(value, Mapping):
- self.__validate_schema_mapping(field, schema, value)
- def __validate_schema_mapping(self, field, schema, value):
- schema = self._resolve_schema(schema)
- allow_unknown = self.schema[field].get('allow_unknown',
- self.allow_unknown)
- validator = self._get_child_validator(document_crumb=field,
- schema_crumb=(field, 'schema'),
- schema=schema,
- allow_unknown=allow_unknown)
- try:
- if not validator(value, update=self.update, normalize=False):
- self._error(field, errors.MAPPING_SCHEMA, validator._errors)
- except _SchemaRuleTypeError:
- self._error(field, errors.BAD_TYPE_FOR_SCHEMA)
- raise
- def __validate_schema_sequence(self, field, schema, value):
- schema = dict(((i, schema) for i in range(len(value))))
- validator = self._get_child_validator(
- document_crumb=field, schema_crumb=(field, 'schema'),
- schema=schema, allow_unknown=self.allow_unknown)
- validator(dict(((i, v) for i, v in enumerate(value))),
- update=self.update, normalize=False)
- if validator._errors:
- self._drop_nodes_from_errorpaths(validator._errors, [], [2])
- self._error(field, errors.SEQUENCE_SCHEMA, validator._errors)
- def _validate_type(self, data_type, field, value):
- """ {'type': ['string', 'list'],
- 'validator': 'type'} """
- if not data_type:
- return
- types = (data_type,) if isinstance(data_type, _str_type) else data_type
- for _type in types:
- # TODO remove this block on next major release
- # this implementation still supports custom type validation methods
- type_definition = self.types_mapping.get(_type)
- if type_definition is not None:
- matched = isinstance(value, type_definition.included_types) \
- and not isinstance(value, type_definition.excluded_types)
- else:
- type_handler = self.__get_rule_handler('validate_type', _type)
- matched = type_handler(value)
- if matched:
- return
- # TODO uncomment this block on next major release
- # when _validate_type_* methods were deprecated:
- # type_definition = self.types_mapping[_type]
- # if isinstance(value, type_definition.included_types) \
- # and not isinstance(value, type_definition.excluded_types): # noqa 501
- # return
- self._error(field, errors.BAD_TYPE)
- self._drop_remaining_rules()
- def _validate_validator(self, validator, field, value):
- """ {'oneof': [
- {'type': 'callable'},
- {'type': 'list',
- 'schema': {'oneof': [{'type': 'callable'},
- {'type': 'string'}]}},
- {'type': 'string'}
- ]} """
- if isinstance(validator, _str_type):
- validator = self.__get_rule_handler('validator', validator)
- validator(field, value)
- elif isinstance(validator, Iterable):
- for v in validator:
- self._validate_validator(v, field, value)
- else:
- validator(field, value, self._error)
- def _validate_valueschema(self, schema, field, value):
- """ {'type': ['dict', 'string'], 'validator': 'bulk_schema',
- 'forbidden': ['rename', 'rename_handler']} """
- schema_crumb = (field, 'valueschema')
- if isinstance(value, Mapping):
- validator = self._get_child_validator(
- document_crumb=field, schema_crumb=schema_crumb,
- schema=dict((k, schema) for k in value))
- validator(value, update=self.update, normalize=False)
- if validator._errors:
- self._drop_nodes_from_errorpaths(validator._errors, [], [2])
- self._error(field, errors.VALUESCHEMA, validator._errors)
- RULE_SCHEMA_SEPARATOR = \
- "The rule's arguments are validated against this schema:"
- class InspectedValidator(type):
- """ Metaclass for all validators """
- def __new__(cls, *args):
- if '__doc__' not in args[2]:
- args[2].update({'__doc__': args[1][0].__doc__})
- return super(InspectedValidator, cls).__new__(cls, *args)
- def __init__(cls, *args):
- def attributes_with_prefix(prefix):
- return tuple(x.split('_', 2)[-1] for x in dir(cls)
- if x.startswith('_' + prefix))
- super(InspectedValidator, cls).__init__(*args)
- cls._types_from_methods, cls.validation_rules = (), {}
- for attribute in attributes_with_prefix('validate'):
- # TODO remove inspection of type test methods in next major release
- if attribute.startswith('type_'):
- cls._types_from_methods += (attribute[len('type_'):],)
- else:
- cls.validation_rules[attribute] = \
- cls.__get_rule_schema('_validate_' + attribute)
- # TODO remove on next major release
- if cls._types_from_methods:
- warn("Methods for type testing are deprecated, use TypeDefinition "
- "and the 'types_mapping'-property of a Validator-instance "
- "instead.", DeprecationWarning)
- cls.validators = tuple(x for x in attributes_with_prefix('validator'))
- x = cls.validation_rules['validator']['oneof']
- x[1]['schema']['oneof'][1]['allowed'] = x[2]['allowed'] = cls.validators
- for rule in (x for x in cls.mandatory_validations if x != 'nullable'):
- cls.validation_rules[rule]['required'] = True
- cls.coercers, cls.default_setters, cls.normalization_rules = (), (), {}
- for attribute in attributes_with_prefix('normalize'):
- if attribute.startswith('coerce_'):
- cls.coercers += (attribute[len('coerce_'):],)
- elif attribute.startswith('default_setter_'):
- cls.default_setters += (attribute[len('default_setter_'):],)
- else:
- cls.normalization_rules[attribute] = \
- cls.__get_rule_schema('_normalize_' + attribute)
- for rule in ('coerce', 'rename_handler'):
- x = cls.normalization_rules[rule]['oneof']
- x[1]['schema']['oneof'][1]['allowed'] = \
- x[2]['allowed'] = cls.coercers
- cls.normalization_rules['default_setter']['oneof'][1]['allowed'] = \
- cls.default_setters
- cls.rules = {}
- cls.rules.update(cls.validation_rules)
- cls.rules.update(cls.normalization_rules)
- def __get_rule_schema(cls, method_name):
- docstring = getattr(cls, method_name).__doc__
- if docstring is None:
- result = {}
- else:
- if RULE_SCHEMA_SEPARATOR in docstring:
- docstring = docstring.split(RULE_SCHEMA_SEPARATOR)[1]
- try:
- result = literal_eval(docstring.strip())
- except Exception:
- result = {}
- if not result:
- warn("No validation schema is defined for the arguments of rule "
- "'%s'" % method_name.split('_', 2)[-1])
- return result
- Validator = InspectedValidator('Validator', (BareValidator,), {})
|