validator.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. '''
  2. @author: olivier.massot, sept. 2018
  3. '''
  4. from qgis.core import QgsProject #@UnresolvedImport
  5. from core import constants
  6. from core.cerberus_extend import CerberusErrorHandler, \
  7. _translate_messages, ExtendedValidator
  8. from core.validation_errors import CRITICAL, DataError, GeomTypeError, BoundingBoxError, \
  9. MissingLayer, WrongSrid
  10. class ValidatorInterruption(BaseException):
  11. pass
  12. class Checkpoint():
  13. def __init__(self, name, valid=True):
  14. self.name = name
  15. self.valid = valid
  16. ########### MODELES ################
  17. class BaseModel():
  18. filename = ""
  19. pk = ""
  20. schema = {}
  21. def __init__(self, **kwargs):
  22. self.__dict__.update(kwargs)
  23. class QgsModel():
  24. layername = ""
  25. pk = ""
  26. geom_type = 0
  27. bounding_box = (0,0,1,1)
  28. schema = {}
  29. def __init__(self, qgs_feature):
  30. self.__dict__.update(qgs_feature.__dict__)
  31. ########### VALIDATION ################
  32. class BaseValidator():
  33. schema_name = ""
  34. models = {}
  35. dataset = {}
  36. def __init__(self):
  37. self.valid = True
  38. self.checkpoints = []
  39. self.errors = []
  40. self._current_checkpoint_valid = True
  41. self.dt = 0
  42. def checkpoint(self, title):
  43. self.checkpoints.append(Checkpoint(title, self._current_checkpoint_valid))
  44. self._current_checkpoint_valid = True
  45. if self.errors:
  46. self.valid = False
  47. if self.critical_happened():
  48. raise ValidatorInterruption()
  49. def critical_happened(self):
  50. return any([err.level == CRITICAL for err in self.errors])
  51. def log_error(self, validation_error):
  52. self._current_checkpoint_valid = False
  53. self.errors.append(validation_error)
  54. @classmethod
  55. def submit(cls, report_name=""):
  56. validator = cls()
  57. try:
  58. validator.validate()
  59. except ValidatorInterruption:
  60. pass
  61. report = validator.build_report(validator.schema_name, report_name or "(nom inconnu)")
  62. return report
  63. def validate(self):
  64. # Chargement des données en mémoire
  65. self._load_layers()
  66. self.checkpoint("Chargement des données")
  67. # Controle la structure des données (champs, formats et types)
  68. self._structure_validation()
  69. self.checkpoint("Contrôle de la structure des données")
  70. # Validation technique
  71. try:
  72. self._technical_validation()
  73. self.checkpoint("Validation Métier")
  74. except ValidatorInterruption:
  75. raise
  76. except:
  77. self.checkpoint("Validation Métier [interrompu]")
  78. def _load_layers(self):
  79. for model in self.models:
  80. layername = model.layername
  81. layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == layername))
  82. if not layer:
  83. self.log_error(MissingLayer("Couche manquante: '{}'".format(layername)))
  84. continue
  85. self.dataset[model] = []
  86. if layer.crs().authid() != model.crs:
  87. self.log_error(WrongSrid("Mauvaise projection: {} (attendu: {})".format(layer.crs().authid(), model.crs)))
  88. for feature in layer.getFeatures():
  89. item = model(feature)
  90. self.dataset[model].append(item)
  91. def _structure_validation(self):
  92. for model in self.models:
  93. v = ExtendedValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
  94. xmin, ymin, xmax, ymax = model.bounding_box
  95. for item in self.dataset[model]:
  96. # geom type
  97. if item.geom_type != model.geom_type:
  98. self.log_error(GeomTypeError("Type de géométrie invalide: {} (attendu: {})".format(item.geom_name, constants.GEOM_NAMES[model.geom_type]), filename=model.filename, field="geom"))
  99. # bounding box
  100. x1, y1, x2, y2 = item.bounding_box
  101. if any(x < xmin or x > xmax for x in (x1, x2)) or \
  102. any(y < ymin or y > ymax for y in (y1, y2)):
  103. self.log_error(BoundingBoxError("Situé hors de l'emprise autorisée", filename=model.filename, field="geom"))
  104. v.validate(item.__dict__)
  105. for field, verrors in v.errors.items():
  106. for err in verrors:
  107. self.log_error(DataError(_translate_messages(err), filename=model.filename, field=field))
  108. @classmethod
  109. def _technical_validation(cls):
  110. raise NotImplementedError()
  111. def build_report(self, schema, filename):
  112. report = {}
  113. report["schema"] = schema
  114. report["filename"] = filename
  115. report["checkpoints"] = [{"name": chk.name, "valid": chk.valid} for chk in self.checkpoints]
  116. report["errors"] = {}
  117. for err in self.errors:
  118. if not err.name in report["errors"]:
  119. report["errors"][err.name] = {"help": err.help, "order_": err.order_, "list": []}
  120. err_report = {"filename": err.filename or "-",
  121. "field": err.field or "-",
  122. "message": err.message}
  123. if err_report not in report["errors"][err.name]["list"]:
  124. report["errors"][err.name]["list"].append(err_report)
  125. return report