''' @author: olivier.massot, 2018 ''' import importlib import inspect import logging import pkgutil from qgis.core import QgsProject, QgsWkbTypes, QgsGeometry, QgsPoint #@UnresolvedImport from PyQt5.QtCore import QVariant, QDate import yaml from MnCheck.core.checking import BaseChecker from MnCheck.core.constants import USER_DATA from plugins import processing #@UnresolvedImport logger = logging.getLogger("mncheck") def list_schemas(): import MnCheck.schemas return [name for _, name, ispkg in pkgutil.iter_modules(MnCheck.schemas.__path__) if not (ispkg or name[0] == '_')] def get_schema(schema_name): return importlib.import_module("MnCheck.schemas." + schema_name) def get_checkers(schema): return [cls for _, cls in inspect.getmembers(schema, predicate=inspect.isclass) \ if issubclass(cls, BaseChecker) and not cls is BaseChecker] def get_user_data(): try: with open(USER_DATA, 'r') as f: return yaml.load(f.read()) or {} except FileNotFoundError: return {} def dump_user_data(conf): with open(USER_DATA, 'w+') as f: return yaml.dump(conf, f) class QgsModel(): GEOM_UNKNOWN = 0 GEOM_POINT = 1 GEOM_LINE = 2 GEOM_POLYGON = 3 GEOM_MULTIPOINT = 4 GEOM_MULTILINE = 5 GEOM_MULTIPOLYGON = 6 layername = "" required = True geom_type = 0 bounding_box = (0,0,1,1) schema = {} pk = "" def __init__(self, qgs_feature): self._feature = qgs_feature if self.pk: # On affecte une valeur par defaut à la clef primaire pour éviter des erreurs en cascade au chargement des données setattr(self, self.pk, None) for attr, value in self.attributes().items(): if isinstance(value, QVariant): value = value.value() if not value.isNull() else "" if isinstance(value, QDate): value = value.toString("dd/MM/yyyy") setattr(self, attr, value) def __repr__(self): try: return "{} {}".format(self.__class__.__name__, getattr(self, self.pk)) except AttributeError: return f"{self.__class__.__name__} (code manquant)" @property def feature(self): return self._feature def attributes(self): return dict(zip([f.name() for f in self._feature.fields()], self._feature.attributes())) @property def geom(self): if not self._feature: return None return self._feature.geometry() def is_geometry_valid(self): return self.geom and self.geom.isGeosValid() def get_geom_type(self): return QgsWkbTypes.singleType(self._feature.geometry().wkbType()) if self.geom else None @classmethod def get_geom_name(cls, wkb): try: return QgsWkbTypes.displayString(wkb) except (ValueError, TypeError): return "" def get_bounding_box(self): bb = self.geom.boundingBox() return (bb.xMinimum(), bb.yMinimum(), bb.xMaximum(), bb.yMaximum()) def get_points(self): if not self.geom or self.geom.isNull(): return [] multi_geom = QgsGeometry() temp_geom = [] if self.geom.type() == 0: # it's a point if self.geom.isMultipart(): temp_geom = self.geom.asMultiPoint() else: temp_geom.append(self.geom.asPoint()) elif self.geom.type() == 1: # it's a line if self.geom.isMultipart(): multi_geom = self.geom.asMultiPolyline() #multi_geog is a multiline for i in multi_geom: #i is a line temp_geom.extend( i ) else: temp_geom = self.geom.asPolyline() elif self.geom.type() == 2: # it's a polygon if self.geom.isMultipart(): multi_geom = self.geom.asMultiPolygon() #multi_geom is a multipolygon for i in multi_geom: #i is a polygon for j in i: #j is a line temp_geom.extend( j ) else: multi_geom = self.geom.asPolygon() #multi_geom is a polygon for i in multi_geom: #i is a line temp_geom.extend( i ) return [QgsPoint(p) for p in temp_geom] @classmethod def full_buffer(cls, distance): layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == cls.layername.lower())) result = processing.run("native:buffer", {'INPUT':layer.dataProvider().dataSourceUri(), #@UndefinedVariable 'DISTANCE':distance, 'SEGMENTS':5, 'END_CAP_STYLE':0, 'JOIN_STYLE':0, 'MITER_LIMIT':2, 'DISSOLVE':True, 'OUTPUT':'memory:mncheck_temp'}) try: buffer = next((f for f in result['OUTPUT'].getFeatures())) except StopIteration: return QgsGeometry.fromPolygonXY([]) if not buffer.geometry().isGeosValid(): raise ValueError("Buffer: géométrie invalide") return buffer.geometry() def validate(schema_name): try: schema = get_schema(schema_name) except ModuleNotFoundError: logger.critical(f"Le schéma {schema_name} n'existe pas") return results = schema.checker.run() return results