| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- '''
- @author: olivier.massot, 2018
- '''
- import importlib
- import inspect
- import logging
- import pkgutil
- from qgis.core import QgsProject, QgsWkbTypes, QgsGeometry, QgsPoint #@UnresolvedImport
- from PyQt5.QtCore import QVariant, QDate
- import yaml
- from MnCheck.core.checking import BaseChecker
- from MnCheck.core.constants import USER_DATA
- from plugins import processing #@UnresolvedImport
- logger = logging.getLogger("mncheck")
- def list_schemas():
- import MnCheck.schemas
- return [name for _, name, ispkg in pkgutil.iter_modules(MnCheck.schemas.__path__) if not (ispkg or name[0] == '_')]
- def get_schema(schema_name):
- return importlib.import_module("MnCheck.schemas." + schema_name)
- def get_checkers(schema):
- return [cls for _, cls in inspect.getmembers(schema, predicate=inspect.isclass) \
- if issubclass(cls, BaseChecker) and not cls is BaseChecker]
- def get_user_data():
- try:
- with open(USER_DATA, 'r') as f:
- return yaml.load(f.read()) or {}
- except FileNotFoundError:
- return {}
- def dump_user_data(conf):
- with open(USER_DATA, 'w+') as f:
- return yaml.dump(conf, f)
- class QgsModel():
-
- GEOM_UNKNOWN = 0
- GEOM_POINT = 1
- GEOM_LINE = 2
- GEOM_POLYGON = 3
- GEOM_MULTIPOINT = 4
- GEOM_MULTILINE = 5
- GEOM_MULTIPOLYGON = 6
-
- layername = ""
- required = True
- geom_type = 0
- bounding_box = (0,0,1,1)
- schema = {}
- pk = ""
- def __init__(self, qgs_feature):
- self._feature = qgs_feature
-
- if self.pk:
- # On affecte une valeur par defaut à la clef primaire pour éviter des erreurs en cascade au chargement des données
- setattr(self, self.pk, None)
-
- for attr, value in self.attributes().items():
- if isinstance(value, QVariant):
- value = value.value() if not value.isNull() else ""
- if isinstance(value, QDate):
- value = value.toString("dd/MM/yyyy")
- setattr(self, attr, value)
-
- def __repr__(self):
- try:
- return "{} {}".format(self.__class__.__name__, getattr(self, self.pk))
- except AttributeError:
- return f"{self.__class__.__name__} (code manquant)"
-
- @property
- def feature(self):
- return self._feature
-
- def attributes(self):
- return dict(zip([f.name() for f in self._feature.fields()], self._feature.attributes()))
-
- @property
- def geom(self):
- if not self._feature:
- return None
- return self._feature.geometry()
-
- def is_geometry_valid(self):
- return self.geom and self.geom.isGeosValid()
-
- def get_geom_type(self):
- return QgsWkbTypes.singleType(self._feature.geometry().wkbType()) if self.geom else None
-
- @classmethod
- def get_geom_name(cls, wkb):
- try:
- return QgsWkbTypes.displayString(wkb)
- except (ValueError, TypeError):
- return "<unknown>"
-
- def get_bounding_box(self):
- bb = self.geom.boundingBox()
- return (bb.xMinimum(), bb.yMinimum(), bb.xMaximum(), bb.yMaximum())
-
- def get_points(self):
- if not self.geom or self.geom.isNull():
- return []
-
- multi_geom = QgsGeometry()
- temp_geom = []
-
- if self.geom.type() == 0: # it's a point
- if self.geom.isMultipart():
- temp_geom = self.geom.asMultiPoint()
- else:
- temp_geom.append(self.geom.asPoint())
- elif self.geom.type() == 1: # it's a line
- if self.geom.isMultipart():
- multi_geom = self.geom.asMultiPolyline() #multi_geog is a multiline
- for i in multi_geom: #i is a line
- temp_geom.extend( i )
- else:
- temp_geom = self.geom.asPolyline()
- elif self.geom.type() == 2: # it's a polygon
- if self.geom.isMultipart():
- multi_geom = self.geom.asMultiPolygon() #multi_geom is a multipolygon
- for i in multi_geom: #i is a polygon
- for j in i: #j is a line
- temp_geom.extend( j )
- else:
- multi_geom = self.geom.asPolygon() #multi_geom is a polygon
- for i in multi_geom: #i is a line
- temp_geom.extend( i )
-
- return [QgsPoint(p) for p in temp_geom]
- @classmethod
- def full_buffer(cls, distance):
- layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == cls.layername.lower()))
- result = processing.run("native:buffer", {'INPUT':layer.dataProvider().dataSourceUri(), #@UndefinedVariable
- 'DISTANCE':distance,
- 'SEGMENTS':5,
- 'END_CAP_STYLE':0,
- 'JOIN_STYLE':0,
- 'MITER_LIMIT':2,
- 'DISSOLVE':True,
- 'OUTPUT':'memory:mncheck_temp'})
- try:
- buffer = next((f for f in result['OUTPUT'].getFeatures()))
- except StopIteration:
- return QgsGeometry.fromPolygonXY([])
-
- if not buffer.geometry().isGeosValid():
- raise ValueError("Buffer: géométrie invalide")
- return buffer.geometry()
- def validate(schema_name):
- try:
- schema = get_schema(schema_name)
- except ModuleNotFoundError:
- logger.critical(f"Le schéma {schema_name} n'existe pas")
- return
-
- results = schema.checker.run()
-
- return results
|