''' Schéma de validation des données MN2 @author: olivier.massot, 2018 ''' import logging from qgis.core import QgsProject from core.cerberus_ import is_positive_int, is_positive_float, CerberusValidator, \ CerberusErrorHandler, _translate_messages from core.checking import BaseChecker from core.mncheck import QgsModel logger = logging.getLogger("mncheck") SCHEMA_NAME = "Schéma MN v3 APD" XMIN, XMAX, YMIN, YMAX = 1341999, 1429750, 8147750, 8294000 CRS = 'EPSG:3949' # Coordinate Reference System TOLERANCE = 0.5 STATUTS = ['ETUDE PRELIMINAIRE', 'ETUDE DE DIAGNOSTIC', 'AVANT-PROJET PROJET', 'PROJET', 'PASSATION DES MARCHES DE TRAVAUX', 'ETUDE D EXECUTION', 'TRAVAUX' , 'RECOLEMENT', 'MAINTIEN EN CONDITIONS OPERATIONNELLES'] ##### Modeles class SiteTelecom(QgsModel): layername = "SITE_TELECOM" geom_type = QgsModel.GEOM_POINT bounding_box = (XMIN,YMIN,XMAX,YMAX) pk = "ST_CODE" schema = {'ST_CODE': {'type': 'string', 'empty': False}, 'ST_NOM': {'type': 'string', 'empty': False}, 'ST_TYPFCT': {'type': 'string', 'empty': False, 'allowed': ['SITE HEBERGEMENT', 'NOEUD RACCORDEMENT OPTIQUE', 'SOUS-REPARTITEUR OPTIQUE', 'SOUS-REPARTITEUR OPTIQUE COLOCALISE', 'NOEUD RACCORDEMENT D ABONNES', 'NOEUD RACCORDEMENT D ABONNES - HAUT DEBIT', 'NOEUD RACCORDEMENT D ABONNES - MONTEE EN DEBIT']}, 'ST_STATUT': {'type': 'string', 'empty': False, 'allowed': [STATUTS]}, 'ST_NBPRISE': {'empty': False, 'validator': is_positive_int} } class SiteClient(QgsModel): layername = "SITE_CLIENT" geom_type = QgsModel.GEOM_POINT bounding_box = (XMIN,YMIN,XMAX,YMAX) schema = {'SC_TYPFON': {'type': 'string', 'empty': False, 'allowed': ['RESIDENTIEL', 'PROFESSIONNEL', 'OPERATEUR', 'TECHNIQUE']}, 'SC_STATUT': {'type': 'string', 'empty': False, 'allowed': [STATUTS]}, 'SC_NBPRISE': {'empty': False, 'validator': is_positive_int}, 'SC_NBPRHAB': {'empty': False, 'validator': is_positive_int}, 'SC_NBPRPRO': {'empty': False, 'validator': is_positive_int}, 'SC_ID_SITE': {'type': 'string', 'empty': False}, 'SC_NUM_RUE': {'type': 'string', 'empty': False}, 'SC_REPET': {'type': 'string', 'empty': False}, 'SC_DNUBAT': {'type': 'string', 'empty': False}, 'SC_DESC': {'type': 'string', 'empty': False}, 'SC_NOM_RUE': {'type': 'string', 'empty': False}, 'SC_NOM_COM': {'type': 'string', 'empty': False}, 'SC_ETAT': {'type': 'string', 'empty': False, 'allowed': ['VALIDE', 'OBSOLETE', 'NOUVEAU', 'FUTUR']}, 'SC_ATT_PTO': {'empty': False, 'validator': is_positive_float} } class Cable(QgsModel): layername = "CABLE" geom_type = QgsModel.GEOM_LINE bounding_box = (XMIN,YMIN,XMAX,YMAX) schema = {'CA_TYPFON': {'type': 'string', 'empty': False, 'allowed': ['COLLECTE TRANSPORT DISTRIBUTION', 'COLLECTE', 'COLLECTE TRANSPORT', 'COLLECTE DISTRIBUTION', 'TRANSPORT DISTRIBUTION', 'TRANSPORT', 'DISTRIBUTION', 'RACCORDEMENT FINAL', 'BOUCLE METROPOLITAINE', 'LONGUE DISTANCE (LONG HAUL)', 'NON COMMUNIQUE']}, 'CA_TYPSTR': {'type': 'string', 'empty': False, 'allowed': ['CONDUITE', 'AERIEN', 'COLONNE MONTANTE', 'IMMERGE', 'FACADE']}, 'CA_STATUT': {'type': 'string', 'empty': False, 'allowed': [STATUTS]}, 'CA_CAPFO': {'empty': False, 'allowed': [720,576,288,144,96,72,48,24,12]}, 'CA_MODULO': {'empty': False, 'validator': is_positive_int}, 'CA_SUPPORT': {'type': 'string', 'empty': False, 'allowed': ['0', '1']} } class Zapbo(QgsModel): layername = "ZAPBO" geom_type = QgsModel.GEOM_POLYGON bounding_box = (XMIN,YMIN,XMAX,YMAX) schema = {'ZP_NBPRISE': {'empty': False, 'validator': is_positive_int}, 'ZP_ISOLE': {'type': 'string', 'empty': False, 'allowed': ['0', '1']}, 'ZP_STATUT': {'type': 'string', 'empty': False, 'allowed': [STATUTS]} } class Zasro(QgsModel): layername = "ZASRO" geom_type = QgsModel.GEOM_POLYGON bounding_box = (XMIN,YMIN,XMAX,YMAX) schema = {'ZS_CODE': {'type': 'string', 'empty': False}, 'ZS_NBPRISE': {'empty': False, 'validator': is_positive_int} } class Adduction(QgsModel): layername = "ADDUCTION" geom_type = QgsModel.GEOM_LINE bounding_box = (XMIN,YMIN,XMAX,YMAX) schema = {'AD_ISOLE': {'type': 'string', 'empty': False, 'allowed': ['0', '1']}, 'AD_LONG': {'empty': False, 'validator': is_positive_float} } models = [SiteTelecom, SiteClient, Cable, Zapbo, Zasro, Adduction] ####### Validateur class Mn3ApdChecker(BaseChecker): def setUp(self): self.dataset = {} for model in models: model.layer = next((l for l in QgsProject.instance().mapLayers().values() \ if l.name().lower() == model.layername.lower()), None) self.dataset[model] = [model(f) for f in model.layer.getFeatures()] if model.layer else [] self.sites_telecom = self.dataset.get(SiteTelecom, []) self.sites_client = self.dataset.get(SiteClient, []) self.cables = self.dataset.get(Cable, []) self.zapbos = self.dataset.get(Zapbo, []) self.zasros = self.dataset.get(Zasro, []) self.adductions = self.dataset.get(Adduction, []) def test_load_layers(self): """ Chargement des données Contrôle la présence des couches attendues """ for model in models: if model.layer is None: self.log_error("Couche manquante", model=model) continue if model.pk: if not model.pk.lower() in [f.name().lower() for f in model.layer.fields()]: self.log_error(f"Clef primaire manquante ({model.pk})", model=model) continue def test_scr(self): """ Contrôle des projections Vérifie que les couches ont le bon sytème de projection """ for model in models: if model.layer and model.layer.crs().authid() != model.crs: self.log_error(f"Mauvaise projection (attendu: {model.crs})", model=model) def _validate_structure(self, model, items): v = CerberusValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True) for item in items: v.validate(item.__dict__) for field, verrors in v.errors.items(): for err in verrors: self.log_error(f"{field} : {_translate_messages(err)}", item=item) def test_structure_sites_telecom(self): """ Structure des données: SiteTelecom Contrôle les données attributaires de la couche SITE_TELECOM: présence, format, valeurs autorisées... """ self._validate_structure(SiteTelecom, self.sites_telecom) def test_structure_sites_client(self): """ Structure des données: SiteClient Contrôle les données attributaires de la couche SITE_CLIENT: présence, format, valeurs autorisées... """ self._validate_structure(SiteClient, self.sites_client) def test_structure_cables(self): """ Structure des données: Cables Contrôle les données attributaires de la couche CABLE: présence, format, valeurs autorisées... """ self._validate_structure(Cable, self.cables) def test_structure_zapbos(self): """ Structure des données: Zapbo Contrôle les données attributaires de la couche ZAPBO: présence, format, valeurs autorisées... """ self._validate_structure(Zapbo, self.zapbos) def test_structure_zasros(self): """ Structure des données: Zasro Contrôle les données attributaires de la couche ZASRO: présence, format, valeurs autorisées... """ self._validate_structure(Zasro, self.zasros) def test_structure_adductions(self): """ Structure des données: Adduction Contrôle les données attributaires de la couche ADDUCTION: présence, format, valeurs autorisées... """ self._validate_structure(Adduction, self.adductions) def test_geometry_validity(self): """ Contrôle de la validité des géométries """ for model in models: for item in self.dataset[model]: if not item.is_geometry_valid(): self.log_error("La géométrie de l'objet est invalide", item=item) def test_geometry_type(self): """ Contrôle des types de géométries """ for model in models: for item in self.dataset[model]: item_geom_type = item.get_geom_type() if item_geom_type != model.geom_type: expected, found = model.get_geom_name(model.geom_type), model.get_geom_name(item_geom_type) self.log_error(f"Type de géométrie invalide (attendu: {expected}, trouvé: {found})", item=item) def test_bounding_box(self): """ Contrôle des emprises Vérifie que les objets sont dans le périmètre attendu """ for model in models: xmin, ymin, xmax, ymax = model.bounding_box for item in self.dataset[model]: if not item.is_geometry_valid(): continue x1, y1, x2, y2 = item.get_bounding_box() if any(x < xmin or x > xmax for x in (x1, x2)) or \ any(y < ymin or y > ymax for y in (y1, y2)): self.log_error("Hors de l'emprise autorisée", item=item) def test_duplicates(self): """ Recherche de doublons Recherche d'éventuels doublons dans des champs qui supposent l'unicité """ # doublons dans ST_CODE tmp = [] for site_telecom in self.sites_telecom: if not site_telecom.ST_CODE: continue if not site_telecom.ST_CODE in tmp: tmp.append(site_telecom.ST_CODE) else: self.log_error("Doublons dans le champs ST_CODE", item=site_telecom) def test_graphic_duplicates(self): """ Recherche de doublons graphiques """ for i, site_telecom in enumerate(self.sites_telecom): for other in self.sites_telecom[i+1:]: if site_telecom.geom.equals(other.geom): self.log_error("Une entité graphique est dupliquée", item=site_telecom) continue for i, site_client in enumerate(self.sites_client): for other in self.sites_client[i+1:]: if site_client.geom.equals(other.geom): self.log_error("Une entité graphique est dupliquée", item=site_client) continue # for i, cable in enumerate(self.cables): # for other in self.cables[i+1:]: # if cable.geom.equals(other.geom): # self.log_error("Une entité graphique est dupliquée", item=cable) # continue for i, zapbo in enumerate(self.zapbos): for other in self.zapbos[i+1:]: if zapbo.geom.equals(other.geom): self.log_error("Une entité graphique est dupliquée", item=zapbo) continue for i, zasro in enumerate(self.zasros): for other in self.zasros[i+1:]: if zasro.geom.equals(other.geom): self.log_error("Une entité graphique est dupliquée", item=zasro) continue # for i, adduction in enumerate(self.adductions): # for other in self.adductions[i+1:]: # if adduction.geom.equals(other.geom): # self.log_error("Une entité graphique est dupliquée", item=adduction) # continue def test_dimension_zasro(self): """ Contrôle le dimensionnement des ZASRO """ for zasro in self.zasros: if zasro.ZS.NBPRISES > 850: self.log_error("Le nombre de prises est supérieur à 850", item=zasro) def test_affaiblissement(self): """ Contrôle l'affaiblissement """ for site_client in self.site_clients: if site_client.ZS.NBPRISES > 28: self.log_error("L'affaiblissement est supérieur à 28", item=site_client) def test_capacite_modulo(self): """ Contrôle l'affaiblissement """ for cable in self.cables: if cable.CA_CAPFO in [720,576,288,144,96,72] and cable.MODULO != "M12": self.log_error(f"Modulo invalide (capacite: {cable.CA_CAPFO}, modulo: {cable.MODULO})", item=cable) elif cable.CA_CAPFO in [72,48,24,12] and cable.MODULO != "M6": self.log_error(f"Modulo invalide (capacite: {cable.CA_CAPFO}, modulo: {cable.MODULO})", item=cable) def test_longueur_racco(self): """ Contrôle la longueur des raccos """ for adduction in self.adductions: if adduction.AD_ISOLE == "1" and adduction.AD_LONG <= 100: self.log_error("L'adduction ne devrait pas être consiudérée comme isolée (long.: {adduction.AD_LONG})", item=adduction) elif adduction.AD_ISOLE == "0" and adduction.AD_LONG > 100: self.log_error("L'adduction devrait être consiudérée comme isolée (long.: {adduction.AD_LONG})", item=adduction) checkers = [Mn3ApdChecker]