''' @author: olivier.massot, sept. 2018 ''' import time import zipfile from path import Path, TempDir from core import gis_ from core.cerberus_extend import CerberusErrorHandler, \ _translate_messages, ExtendedValidator from schemas.common import SRID class ValidatorInterruption(BaseException): pass class Checkpoint(): def __init__(self, name, valid=True): self.name = name self.valid = valid ########### MODELES ################ class BaseModel(): filename = "" pk = "" schema = {} def __init__(self, **kwargs): self.__dict__.update(kwargs) class BaseGeoModel(gis_.Feature): filename = "" pk = "" geom_type = 0 bounding_box = (0,0,1,1) schema = {} def __init__(self, feature): self.__dict__.update(feature.__dict__) ########### ERREURS DE VALIDATION ################ VALIDATION_ERROR_LEVELS = {10: "MINEURE", 20: "AVERTISSEMENT", 30: "ERREUR", 40: "CRITIQUE"} MINOR = 10 WARNING = 20 ERROR = 30 CRITICAL = 40 class BaseValidationError(): order_ = 0 name = "Erreur" level = ERROR help = "" def __init__(self, message, filename="", field=""): self.message = message self.filename = filename self.field = field def __repr__(self): return " - ".join(filter(None, [self.name, self.filename, self.field, self.message])) # Erreurs dans le chargement des fichiers class InputError(BaseValidationError): order_ = 0 level = CRITICAL name = "Erreur de chargement" class MissingFile(InputError): order_ = 1 name = "Fichier Manquant" class UnreadableFile(InputError): order_ = 2 name = "Fichier Illisible" class WrongSrid(InputError): order_ = 3 name = "Mauvais SRID" ### Erreurs dans la structure des données class StructureError(BaseValidationError): order_ = 10 name = "Erreur de structure" level = ERROR class GeomTypeError(StructureError): order_ = 12 name = "Type de géométrie invalide" level = CRITICAL class BoundingBoxError(StructureError): order_ = 11 name = "Coordonnées hors de la zone autorisée" class InvalidGeometry(StructureError): order_ = 13 name = "Géométrie invalide" class DataError(StructureError): order_ = 14 name = "Erreur de format" # Erreurs dans le contenu, erreurs métiers class TechnicalValidationError(BaseValidationError): order_ = 20 level = ERROR name = "Erreur technique" class UniqueError(TechnicalValidationError): order_ = 21 name = "Doublons dans le champs" class RelationError(TechnicalValidationError): order_ = 22 level = CRITICAL name = "Un objet lié n'existe pas" class DuplicatedGeom(TechnicalValidationError): order_ = 23 name = "Doublon graphique" class MissingItem(TechnicalValidationError): order_ = 24 name = "Elément manquant" class DimensionError(TechnicalValidationError): order_ = 25 name = "Elément de dimension" class PositionError(TechnicalValidationError): order_ = 26 name = "Erreur de positionnement" ########### VALIDATION ################ class BaseValidator(): schema_name = "" models = {} dataset = {} def __init__(self): self.valid = True self.checkpoints = [] self.errors = [] self._current_checkpoint_valid = True self.dt = 0 def checkpoint(self, title): self.checkpoints.append(Checkpoint(title, self._current_checkpoint_valid)) self._current_checkpoint_valid = True if self.errors: self.valid = False if self.critical_happened(): raise ValidatorInterruption() def critical_happened(self): return any([err.level == CRITICAL for err in self.errors]) def log_error(self, validation_error): self._current_checkpoint_valid = False self.errors.append(validation_error) @classmethod def submit(cls, subject): """ prends un dossier ou une archive en entrée et vérifie son contenu """ subject = Path(subject) if subject.isfile(): with TempDir() as dirname: zip_ref = zipfile.ZipFile(subject, 'r') zip_ref.extractall(dirname) zip_ref.close() if Path(dirname / subject.stem).isdir(): # cas où l'archive contient un dossier qui lui-même contient les fichiers dirname /= subject.stem return cls._submit_folder(dirname) elif subject.isdir(): return cls._submit_folder(subject) else: raise FileNotFoundError(f"Impossible de trouver le fichier ou répertoire: {subject}") @classmethod def _submit_folder(cls, folder): validator = cls() t0 = time.time() try: validator.validate(folder) except ValidatorInterruption: pass validator.dt = time.time() - t0 report = validator.build_report(validator.schema_name, folder.name) return report def validate(self, folder): # Chargement des données en mémoire self._load_files(folder) self.checkpoint("Chargement des données") # Controle la structure des données (champs, formats et types) self._structure_validation() self.checkpoint("Contrôle de la structure des données") # Validation technique # try: self._technical_validation() self.checkpoint("Validation Métier") # except: # self.checkpoint("Validation Métier [interrompu]") def _load_files(self, folder): """ Charge les données du fichier et les associe à un modèle. Attention: pas de contrôle de format ou de validité à ce niveau! """ raise NotImplementedError() def _structure_validation(self): for model in self.models: v = ExtendedValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True) for item in self.dataset[model]: v.validate(item.__dict__) for field, verrors in v.errors.items(): for err in verrors: self.log_error(DataError(_translate_messages(err), filename=model.filename, field=field)) @classmethod def _technical_validation(cls): raise NotImplementedError() def build_report(self, schema, filename): report = {} report["schema"] = schema report["filename"] = filename report["exec_time"] = "{:.3g} s.".format(self.dt) report["checkpoints"] = [{"name": chk.name, "valid": chk.valid} for chk in self.checkpoints] report["errors"] = {} for err in self.errors: if not err.name in report["errors"]: report["errors"][err.name] = {"help": err.help, "order_": err.order_, "list": []} err_report = {"filename": err.filename or "-", "field": err.field or "-", "message": err.message} if err_report not in report["errors"][err.name]["list"]: report["errors"][err.name]["list"].append(err_report) return report class NetgeoValidator(BaseValidator): def _load_files(self, folder): for model in self.models: filename = model.filename path_ = Path(folder) / filename if not path_.isfile(): self.log_error(MissingFile("Fichier manquant: '{}'".format(filename))) continue self.dataset[model] = [] try: ds = gis_.Datasource(path_) layer = ds.layer if layer.srid != SRID: self.log_error(WrongSrid("Mauvaise projection: {} (attendu: {})".format(layer.srid, SRID))) for feature in layer: item = model(feature) self.dataset[model].append(item) except IOError: self.log_error(UnreadableFile("Fichier illisible: {}".format(path_.name))) def _structure_validation(self): for model in self.models: v = ExtendedValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True) xmin, ymin, xmax, ymax = model.bounding_box for item in self.dataset[model]: # geom type if item.geom_type != model.geom_type: self.log_error(GeomTypeError("Type de géométrie invalide: {} (attendu: {})".format(item.geom_name, gis_.GEOM_NAMES[model.geom_type]), filename=model.filename, field="geom")) # bounding box x1, y1, x2, y2 = item.bounding_box if any(x < xmin or x > xmax for x in (x1, x2)) or \ any(y < ymin or y > ymax for y in (y1, y2)): self.log_error(BoundingBoxError("Situé hors de l'emprise autorisée", filename=model.filename, field="geom")) v.validate(item.__dict__) for field, verrors in v.errors.items(): for err in verrors: self.log_error(DataError(_translate_messages(err), filename=model.filename, field=field))