''' @author: olivier.massot, 2018 ''' import importlib import inspect import logging import pkgutil from qgis.core import QgsProject, QgsWkbTypes, QgsGeometry, QgsPoint from PyQt5.QtCore import QVariant, QDate, Qt import yaml from core.checking import BaseChecker from core.constants import USER_DATA from plugins import processing logger = logging.getLogger("mncheck") def list_schemas(): import schemas return [name for _, name, ispkg in pkgutil.iter_modules(schemas.__path__) if not (ispkg or name[0] == '_')] def get_schema(schema_name): return importlib.import_module("schemas." + schema_name) def get_checkers(schema): return [cls for _, cls in inspect.getmembers(schema, predicate=inspect.isclass) \ if issubclass(cls, BaseChecker) and not cls is BaseChecker] def get_user_data(): try: with open(USER_DATA, 'r') as f: return yaml.load(f.read()) or {} except FileNotFoundError: return {} def dump_user_data(conf): with open(USER_DATA, 'w+') as f: return yaml.dump(conf, f) class QgsModel(): GEOM_UNKNOWN = 0 GEOM_POINT = 1 GEOM_LINE = 2 GEOM_POLYGON = 3 GEOM_MULTIPOINT = 4 GEOM_MULTILINE = 5 GEOM_MULTIPOLYGON = 6 layername = "" required = True geom_type = 0 bounding_box = (0,0,1,1) schema = {} pk = "" def __init__(self, qgs_feature): self._feature = qgs_feature if self.pk: # On affecte une valeur par defaut à la clef primaire pour éviter des erreurs en cascade au chargement des données setattr(self, self.pk, None) for attr, value in self.attributes().items(): if isinstance(value, QVariant): value = value.value() if not value.isNull() else "" if isinstance(value, QDate): value = value.toString("dd/MM/yyyy") setattr(self, attr, value) def __repr__(self): try: return "{} {}".format(self.__class__.__name__, getattr(self, self.pk)) except AttributeError: return f"{self.__class__.__name__} (code manquant)" @property def feature(self): return self._feature def attributes(self): return dict(zip([f.name() for f in self._feature.fields()], self._feature.attributes())) @property def geom(self): if not self._feature: return None return self._feature.geometry() def is_geometry_valid(self): return self.geom and self.geom.isGeosValid() def get_geom_type(self): return QgsWkbTypes.singleType(self._feature.geometry().wkbType()) if self.geom else None @classmethod def get_geom_name(cls, wkb): try: return QgsWkbTypes.displayString(wkb) except (ValueError, TypeError): return "" def get_bounding_box(self): bb = self.geom.boundingBox() return (bb.xMinimum(), bb.yMinimum(), bb.xMaximum(), bb.yMaximum()) def get_points(self): if not self.geom or self.geom.isNull(): return [] multi_geom = QgsGeometry() temp_geom = [] if self.geom.type() == 0: # it's a point if self.geom.isMultipart(): temp_geom = self.geom.asMultiPoint() else: temp_geom.append(self.geom.asPoint()) elif self.geom.type() == 1: # it's a line if self.geom.isMultipart(): multi_geom = self.geom.asMultiPolyline() #multi_geog is a multiline for i in multi_geom: #i is a line temp_geom.extend( i ) else: temp_geom = self.geom.asPolyline() elif self.geom.type() == 2: # it's a polygon if self.geom.isMultipart(): multi_geom = self.geom.asMultiPolygon() #multi_geom is a multipolygon for i in multi_geom: #i is a polygon for j in i: #j is a line temp_geom.extend( j ) else: multi_geom = self.geom.asPolygon() #multi_geom is a polygon for i in multi_geom: #i is a line temp_geom.extend( i ) return [QgsPoint(p) for p in temp_geom] @classmethod def full_buffer(cls, distance): layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == cls.layername.lower())) result = processing.run("native:buffer", {'INPUT':layer.dataProvider().dataSourceUri(), #@UndefinedVariable 'DISTANCE':distance, 'SEGMENTS':5, 'END_CAP_STYLE':0, 'JOIN_STYLE':0, 'MITER_LIMIT':2, 'DISSOLVE':True, 'OUTPUT':'memory:mncheck_temp'}) try: buffer = next((f for f in result['OUTPUT'].getFeatures())) except StopIteration: return QgsGeometry.fromPolygonXY([]) if not buffer.geometry().isGeosValid(): raise ValueError("Buffer: géométrie invalide") return buffer.geometry() def validate(schema_name): try: schema = get_schema(schema_name) except ModuleNotFoundError: logger.critical(f"Le schéma {schema_name} n'existe pas") return results = schema.checker.run() return results