''' @author: olivier.massot, sept. 2018 ''' from qgis.core import QgsProject #@UnresolvedImport from core import constants from core.cerberus_extend import CerberusErrorHandler, \ _translate_messages, ExtendedValidator from core.validation_errors import CRITICAL, DataError, GeomTypeError, BoundingBoxError, \ MissingLayer, WrongSrid class ValidatorInterruption(BaseException): pass class Checkpoint(): def __init__(self, name, valid=True): self.name = name self.valid = valid ########### MODELES ################ class BaseModel(): filename = "" pk = "" schema = {} def __init__(self, **kwargs): self.__dict__.update(kwargs) class QgsModel(): layername = "" pk = "" geom_type = 0 bounding_box = (0,0,1,1) schema = {} def __init__(self, qgs_feature): self.__dict__.update(qgs_feature.__dict__) ########### VALIDATION ################ class BaseValidator(): schema_name = "" models = {} dataset = {} def __init__(self): self.valid = True self.checkpoints = [] self.errors = [] self._current_checkpoint_valid = True self.dt = 0 def checkpoint(self, title): self.checkpoints.append(Checkpoint(title, self._current_checkpoint_valid)) self._current_checkpoint_valid = True if self.errors: self.valid = False if self.critical_happened(): raise ValidatorInterruption() def critical_happened(self): return any([err.level == CRITICAL for err in self.errors]) def log_error(self, validation_error): self._current_checkpoint_valid = False self.errors.append(validation_error) @classmethod def submit(cls, report_name=""): validator = cls() try: validator.validate() except ValidatorInterruption: pass report = validator.build_report(validator.schema_name, report_name or "(nom inconnu)") return report def validate(self): # Chargement des données en mémoire self._load_layers() self.checkpoint("Chargement des données") # Controle la structure des données (champs, formats et types) self._structure_validation() self.checkpoint("Contrôle de la structure des données") # Validation technique try: self._technical_validation() self.checkpoint("Validation Métier") except ValidatorInterruption: raise except: self.checkpoint("Validation Métier [interrompu]") def _load_layers(self): for model in self.models: layername = model.layername layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == layername)) if not layer: self.log_error(MissingLayer("Couche manquante: '{}'".format(layername))) continue self.dataset[model] = [] if layer.crs().authid() != model.crs: self.log_error(WrongSrid("Mauvaise projection: {} (attendu: {})".format(layer.crs().authid(), model.crs))) for feature in layer.getFeatures(): item = model(feature) self.dataset[model].append(item) def _structure_validation(self): for model in self.models: v = ExtendedValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True) xmin, ymin, xmax, ymax = model.bounding_box for item in self.dataset[model]: # geom type if item.geom_type != model.geom_type: self.log_error(GeomTypeError("Type de géométrie invalide: {} (attendu: {})".format(item.geom_name, constants.GEOM_NAMES[model.geom_type]), filename=model.filename, field="geom")) # bounding box x1, y1, x2, y2 = item.bounding_box if any(x < xmin or x > xmax for x in (x1, x2)) or \ any(y < ymin or y > ymax for y in (y1, y2)): self.log_error(BoundingBoxError("Situé hors de l'emprise autorisée", filename=model.filename, field="geom")) v.validate(item.__dict__) for field, verrors in v.errors.items(): for err in verrors: self.log_error(DataError(_translate_messages(err), filename=model.filename, field=field)) @classmethod def _technical_validation(cls): raise NotImplementedError() def build_report(self, schema, filename): report = {} report["schema"] = schema report["filename"] = filename report["checkpoints"] = [{"name": chk.name, "valid": chk.valid} for chk in self.checkpoints] report["errors"] = {} for err in self.errors: if not err.name in report["errors"]: report["errors"][err.name] = {"help": err.help, "order_": err.order_, "list": []} err_report = {"filename": err.filename or "-", "field": err.field or "-", "message": err.message} if err_report not in report["errors"][err.name]["list"]: report["errors"][err.name]["list"].append(err_report) return report