mncheck.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. '''
  2. @author: olivier.massot, 2018
  3. '''
  4. import importlib
  5. import inspect
  6. import logging
  7. import pkgutil
  8. from qgis.core import QgsProject, QgsWkbTypes, QgsGeometry, QgsPoint #@UnresolvedImport
  9. from PyQt5.QtCore import QVariant, QDate
  10. import yaml
  11. from MnCheck.core.checking import BaseChecker
  12. from MnCheck.core.constants import USER_DATA
  13. from plugins import processing #@UnresolvedImport
  14. logger = logging.getLogger("mncheck")
  15. def list_schemas():
  16. import MnCheck.schemas
  17. return [name for _, name, ispkg in pkgutil.iter_modules(MnCheck.schemas.__path__) if not (ispkg or name[0] == '_')]
  18. def get_schema(schema_name):
  19. return importlib.import_module("MnCheck.schemas." + schema_name)
  20. def get_checkers(schema):
  21. return [cls for _, cls in inspect.getmembers(schema, predicate=inspect.isclass) \
  22. if issubclass(cls, BaseChecker) and not cls is BaseChecker]
  23. def get_user_data():
  24. try:
  25. with open(USER_DATA, 'r') as f:
  26. return yaml.load(f.read()) or {}
  27. except FileNotFoundError:
  28. return {}
  29. def dump_user_data(conf):
  30. with open(USER_DATA, 'w+') as f:
  31. return yaml.dump(conf, f)
  32. class QgsModel():
  33. GEOM_UNKNOWN = 0
  34. GEOM_POINT = 1
  35. GEOM_LINE = 2
  36. GEOM_POLYGON = 3
  37. GEOM_MULTIPOINT = 4
  38. GEOM_MULTILINE = 5
  39. GEOM_MULTIPOLYGON = 6
  40. layername = ""
  41. required = True
  42. geom_type = 0
  43. bounding_box = (0,0,1,1)
  44. schema = {}
  45. pk = ""
  46. def __init__(self, qgs_feature):
  47. self._feature = qgs_feature
  48. if self.pk:
  49. # On affecte une valeur par defaut à la clef primaire pour éviter des erreurs en cascade au chargement des données
  50. setattr(self, self.pk, None)
  51. for attr, value in self.attributes().items():
  52. if isinstance(value, QVariant):
  53. value = value.value() if not value.isNull() else ""
  54. if isinstance(value, QDate):
  55. value = value.toString("dd/MM/yyyy")
  56. setattr(self, attr, value)
  57. def __repr__(self):
  58. try:
  59. return "{} {}".format(self.__class__.__name__, getattr(self, self.pk))
  60. except AttributeError:
  61. return f"{self.__class__.__name__} (code manquant)"
  62. @property
  63. def feature(self):
  64. return self._feature
  65. def attributes(self):
  66. return dict(zip([f.name() for f in self._feature.fields()], self._feature.attributes()))
  67. @property
  68. def geom(self):
  69. if not self._feature:
  70. return None
  71. return self._feature.geometry()
  72. def is_geometry_valid(self):
  73. return self.geom and self.geom.isGeosValid()
  74. def get_geom_type(self):
  75. return QgsWkbTypes.singleType(self._feature.geometry().wkbType()) if self.geom else None
  76. @classmethod
  77. def get_geom_name(cls, wkb):
  78. try:
  79. return QgsWkbTypes.displayString(wkb)
  80. except (ValueError, TypeError):
  81. return "<unknown>"
  82. def get_bounding_box(self):
  83. bb = self.geom.boundingBox()
  84. return (bb.xMinimum(), bb.yMinimum(), bb.xMaximum(), bb.yMaximum())
  85. def get_points(self):
  86. if not self.geom or self.geom.isNull():
  87. return []
  88. multi_geom = QgsGeometry()
  89. temp_geom = []
  90. if self.geom.type() == 0: # it's a point
  91. if self.geom.isMultipart():
  92. temp_geom = self.geom.asMultiPoint()
  93. else:
  94. temp_geom.append(self.geom.asPoint())
  95. elif self.geom.type() == 1: # it's a line
  96. if self.geom.isMultipart():
  97. multi_geom = self.geom.asMultiPolyline() #multi_geog is a multiline
  98. for i in multi_geom: #i is a line
  99. temp_geom.extend( i )
  100. else:
  101. temp_geom = self.geom.asPolyline()
  102. elif self.geom.type() == 2: # it's a polygon
  103. if self.geom.isMultipart():
  104. multi_geom = self.geom.asMultiPolygon() #multi_geom is a multipolygon
  105. for i in multi_geom: #i is a polygon
  106. for j in i: #j is a line
  107. temp_geom.extend( j )
  108. else:
  109. multi_geom = self.geom.asPolygon() #multi_geom is a polygon
  110. for i in multi_geom: #i is a line
  111. temp_geom.extend( i )
  112. return [QgsPoint(p) for p in temp_geom]
  113. @classmethod
  114. def full_buffer(cls, distance):
  115. layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == cls.layername.lower()))
  116. result = processing.run("native:buffer", {'INPUT':layer.dataProvider().dataSourceUri(), #@UndefinedVariable
  117. 'DISTANCE':distance,
  118. 'SEGMENTS':5,
  119. 'END_CAP_STYLE':0,
  120. 'JOIN_STYLE':0,
  121. 'MITER_LIMIT':2,
  122. 'DISSOLVE':True,
  123. 'OUTPUT':'memory:mncheck_temp'})
  124. try:
  125. buffer = next((f for f in result['OUTPUT'].getFeatures()))
  126. except StopIteration:
  127. return QgsGeometry.fromPolygonXY([])
  128. if not buffer.geometry().isGeosValid():
  129. raise ValueError("Buffer: géométrie invalide")
  130. return buffer.geometry()
  131. def validate(schema_name):
  132. try:
  133. schema = get_schema(schema_name)
  134. except ModuleNotFoundError:
  135. logger.critical(f"Le schéma {schema_name} n'existe pas")
  136. return
  137. results = schema.checker.run()
  138. return results