validator.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. '''
  2. @author: olivier.massot, sept. 2018
  3. '''
  4. import logging
  5. from qgis.core import QgsProject
  6. from core.cerberus_ import CerberusErrorHandler, \
  7. _translate_messages, ExtendedValidator
  8. from core.model import QgsModel
  9. from core.validation_errors import CRITICAL, DataError, GeomTypeError, BoundingBoxError, \
  10. MissingLayer, WrongSrid, InvalidGeometry, TechnicalValidationError
  11. logger = logging.getLogger("mncheck")
  12. class ValidatorInterruption(BaseException):
  13. pass
  14. class Checkpoint():
  15. def __init__(self, name, valid=True):
  16. self.name = name
  17. self.valid = valid
  18. class BaseValidator():
  19. schema_name = ""
  20. models = {}
  21. dataset = {}
  22. def __init__(self):
  23. self.valid = True
  24. self.checkpoints = []
  25. self.errors = []
  26. self._current_checkpoint_valid = True
  27. self.dt = 0
  28. def checkpoint(self, title):
  29. logger.info(f"Checkpoint: {title}")
  30. self.checkpoints.append(Checkpoint(title, self._current_checkpoint_valid))
  31. self._current_checkpoint_valid = True
  32. if self.errors:
  33. self.valid = False
  34. if self.critical_happened():
  35. logger.info("Contrôle de données interrompu en raison d'une erreur critique dans les données")
  36. raise ValidatorInterruption()
  37. def critical_happened(self):
  38. return any([err.level == CRITICAL for err in self.errors])
  39. def log_error(self, validation_error):
  40. self._current_checkpoint_valid = False
  41. self.errors.append(validation_error)
  42. @classmethod
  43. def submit(cls, report_name=""):
  44. validator = cls()
  45. try:
  46. validator.validate()
  47. except ValidatorInterruption:
  48. pass
  49. report = validator.build_report(validator.schema_name, report_name or "(nom inconnu)")
  50. return report
  51. def validate(self):
  52. # Chargement des données en mémoire
  53. self._load_layers()
  54. self.checkpoint("Chargement des données")
  55. # Controle la structure des données (champs, formats et types)
  56. self._structure_validation()
  57. self.checkpoint("Contrôle de la structure des données")
  58. # Validation technique
  59. try:
  60. self._technical_validation()
  61. self.checkpoint("Validation Métier")
  62. except ValidatorInterruption:
  63. raise
  64. except:
  65. # logger.error("".join(traceback.format_exception(*sys.exc_info())))
  66. logger.exception("Une erreur s'est produite")
  67. self.log_error(TechnicalValidationError("Contrôle des données interrompu"))
  68. self.checkpoint("Validation Métier [interrompu]")
  69. def _load_layers(self):
  70. for model in self.models:
  71. layername = model.layername
  72. layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == layername))
  73. if not layer:
  74. self.log_error(MissingLayer("Couche manquante: '{}'".format(layername)))
  75. continue
  76. self.dataset[model] = []
  77. if layer.crs().authid() != model.crs:
  78. self.log_error(WrongSrid("Mauvaise projection: {} (attendu: {})".format(layer.crs().authid(), model.crs)))
  79. for feature in layer.getFeatures():
  80. item = model(feature)
  81. self.dataset[model].append(item)
  82. def _structure_validation(self):
  83. for model in self.models:
  84. v = ExtendedValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
  85. xmin, ymin, xmax, ymax = model.bounding_box
  86. for item in self.dataset[model]:
  87. # geom valid
  88. if not item.is_geometry_valid():
  89. self.log_error(InvalidGeometry("La géométrie de l'objet est invalide: {}".format(item), layername=model.layername, field="geom"))
  90. # geom type
  91. if item.get_geom_type() != model.geom_type:
  92. self.log_error(GeomTypeError("Type de géométrie invalide: {} (attendu: {})".format(item.geom_name, QgsModel.GEOM_NAMES[model.geom_type]), layername=model.layername, field="geom"))
  93. # bounding box
  94. x1, y1, x2, y2 = item.get_bounding_box()
  95. if any(x < xmin or x > xmax for x in (x1, x2)) or \
  96. any(y < ymin or y > ymax for y in (y1, y2)):
  97. self.log_error(BoundingBoxError("Situé hors de l'emprise autorisée", layername=model.layername, field="geom"))
  98. v.validate(item.__dict__)
  99. for field, verrors in v.errors.items():
  100. for err in verrors:
  101. self.log_error(DataError(_translate_messages(err), layername=model.layername, field=field))
  102. @classmethod
  103. def _technical_validation(cls):
  104. raise NotImplementedError()
  105. def build_report(self, schema, layername):
  106. report = {}
  107. report["schema"] = schema
  108. report["reportname"] = ""
  109. report["checkpoints"] = [{"name": chk.name, "valid": chk.valid} for chk in self.checkpoints]
  110. report["errors"] = {}
  111. for err in self.errors:
  112. if not err.name in report["errors"]:
  113. report["errors"][err.name] = {"help": err.help, "order_": err.order_, "level": err.level, "list": []}
  114. err_report = {"layername": err.layername or "-",
  115. "field": err.field or "-",
  116. "message": err.message}
  117. if err_report not in report["errors"][err.name]["list"]:
  118. report["errors"][err.name]["list"].append(err_report)
  119. return report