''' @author: olivier.massot, sept. 2018 ''' import time import zipfile from path import Path, TempDir from core import gis from core.cerberus_extend import CerberusErrorHandler, GeoValidator from schemas.common import SRID class BaseModel(): filename = "" pk = "" schema = {} def __init__(self, **kwargs): self.__dict__.update(kwargs) class BaseGeoModel(BaseModel): def __init__(self, geom, **kwargs): super(BaseGeoModel, self).__init__(**kwargs) self.geom = geom class ValidatorInterruption(BaseException): pass class Checkpoint(): def __init__(self, name, valid=True): self.name = name self.valid = valid VALIDATION_ERROR_LEVELS = {10: "MINEURE", 20: "AVERTISSEMENT", 30: "ERREUR", 40: "CRITIQUE"} MINOR = 10 WARNING = 20 ERROR = 30 CRITICAL = 40 class BaseValidationError(): name = "Erreur" level = ERROR help = "" def __init__(self, message, filename="", field=""): self.message = message self.filename = filename self.field = field def __repr__(self): return " - ".join(filter(None, [self.name, self.filename, self.field, self.message])) # Erreurs dans le chargement des fichiers class MissingFile(BaseValidationError): level = CRITICAL name = "Fichier Manquant" class UnreadableFile(BaseValidationError): level = CRITICAL name = "Fichier Illisible" class WrongSrid(BaseValidationError): level = CRITICAL name = "Mauvais SRID" ### Erreurs dans la structure des données class DataError(BaseValidationError): name = "Erreur de format" level = ERROR # Erreurs dans le contenu, erreurs métiers class TechnicalValidationError(BaseValidationError): level = ERROR class DuplicatedPk(TechnicalValidationError): name = "Doublons dans le champs" class RelationError(TechnicalValidationError): name = "Un objet lié n'existe pas" class BaseValidator(): schema_name = "" models = {} dataset = {} def __init__(self): self.valid = True self.checkpoints = [] self.errors = [] self.dt = 0 def checkpoint(self, name): valid = (len(self.errors) == 0) self.checkpoints.append(Checkpoint(name, valid)) if not valid: self.valid = False raise ValidatorInterruption() def log_error(self, validation_error): self.errors.append(validation_error) @classmethod def submit(cls, subject): """ prends un dossier ou une archive en entrée et vérifie son contenu """ subject = Path(subject) if subject.isfile(): with TempDir() as dirname: zip_ref = zipfile.ZipFile(subject, 'r') zip_ref.extractall(dirname) zip_ref.close() if Path(dirname / subject.stem).isdir(): # cas où l'archive contient un dossier qui lui-même contient les fichiers dirname /= subject.stem return cls._submit_folder(dirname) elif subject.isdir(): return cls._submit_folder(subject) else: raise FileNotFoundError(f"Impossible de trouver le fichier ou répertoire: {subject}") @classmethod def _submit_folder(cls, folder): validator = cls() t0 = time.time() try: validator.validate(folder) except ValidatorInterruption: pass validator.dt = time.time() - t0 report = validator.build_report(validator.schema_name, folder.name) return report def validate(self, folder): # Chargement des données en mémoire self._load_files(folder) self.checkpoint("Chargement des données") # Controle la structure des données (champs, formats et types) self._structure_validation() self.checkpoint("Contrôle de la structure des données") # Validation technique self._technical_validation() self.checkpoint("Validation Métier") def _load_files(self, folder): """ Charge les données du fichier et les associe à un modèle. Attention: pas de contrôle de format ou de validité à ce niveau! """ raise NotImplementedError() def _structure_validation(self): for model in self.models: v = GeoValidator(model.schema, error_handler=CerberusErrorHandler) for item in self.dataset[model]: v.validate(item.__dict__) for field, verrors in v.errors.items(): for err in verrors: self.log_error(DataError(err, filename=model.filename, field=field)) @classmethod def _technical_validation(cls): raise NotImplementedError() def build_report(self, schema, filename): report = {} report["schema"] = schema report["filename"] = filename report["exec_time"] = "{} s.".format(self.dt) report["checkpoints"] = [{"name": chk.name, "valid": chk.valid} for chk in self.checkpoints] report["errors"] = {} for err in self.errors: if not err.name in report["errors"]: report["errors"][err.name] = {"help": err.help, "list": []} err_report = {"filename": err.filename or "-", "field": err.field or "-", "message": err.message} if err_report not in report["errors"][err.name]["list"]: report["errors"][err.name]["list"].append(err_report) return report class NetgeoValidator(BaseValidator): def _load_files(self, folder): for model in self.models: filename = model.filename path_ = Path(folder) / filename if not path_.isfile(): self.log_error(MissingFile("Fichier manquant: '{}'".format(filename))) continue self.dataset[model] = [] try: with gis.ShapeFile(path_, srid=SRID) as sf: fields = sf.fields() for record in sf.records(): data = dict(zip(fields, record.record)) item = model(record.shape, **data) self.dataset[model].append(item) except gis.ShapeError as e: self.log_error(UnreadableFile(str(e))) except gis.SridError: self.log_error(WrongSrid(str(e)))