|
@@ -6,20 +6,17 @@
|
|
|
'''
|
|
'''
|
|
|
|
|
|
|
|
import logging
|
|
import logging
|
|
|
-from qgis.core import QgsProject, QgsGeometry, QgsWkbTypes
|
|
|
|
|
|
|
+
|
|
|
|
|
+from qgis.core import QgsProject, QgsGeometry
|
|
|
|
|
|
|
|
from core.cerberus_ import is_float, is_multi_int, is_int, \
|
|
from core.cerberus_ import is_float, is_multi_int, is_int, \
|
|
|
is_modern_french_date, CerberusValidator, CerberusErrorHandler, \
|
|
is_modern_french_date, CerberusValidator, CerberusErrorHandler, \
|
|
|
_translate_messages
|
|
_translate_messages
|
|
|
-from core.checker import BaseChecker
|
|
|
|
|
-from core.validation_errors import UniqueError, RelationError, \
|
|
|
|
|
- DuplicatedGeom, MissingItem, DimensionError, TechnicalValidationError
|
|
|
|
|
-from core.validator import QgsModel, BaseValidator
|
|
|
|
|
-
|
|
|
|
|
|
|
+from core.checking import BaseChecker
|
|
|
|
|
+from core.mncheck import QgsModel
|
|
|
|
|
|
|
|
logger = logging.getLogger("mncheck")
|
|
logger = logging.getLogger("mncheck")
|
|
|
|
|
|
|
|
-
|
|
|
|
|
SCHEMA_NAME = "Schéma MN v1 REC"
|
|
SCHEMA_NAME = "Schéma MN v1 REC"
|
|
|
|
|
|
|
|
XMIN, XMAX, YMIN, YMAX = 1341999, 1429750, 8147750, 8294000
|
|
XMIN, XMAX, YMIN, YMAX = 1341999, 1429750, 8147750, 8294000
|
|
@@ -54,7 +51,7 @@ class Artere(QgsModel):
|
|
|
'AR_PRO_MD': {'type': 'string', 'empty': False, 'default': 'MANCHE NUMERIQUE', 'allowed': ['MANCHE NUMERIQUE']},
|
|
'AR_PRO_MD': {'type': 'string', 'empty': False, 'default': 'MANCHE NUMERIQUE', 'allowed': ['MANCHE NUMERIQUE']},
|
|
|
'AR_COMMENT': {'type': 'string', 'maxlength': 300, 'empty': True},
|
|
'AR_COMMENT': {'type': 'string', 'maxlength': 300, 'empty': True},
|
|
|
'AR_STATUT': {'type': 'string', 'empty': False, 'allowed': ['APS', 'APD', 'EXE', 'REC']}}
|
|
'AR_STATUT': {'type': 'string', 'empty': False, 'allowed': ['APS', 'APD', 'EXE', 'REC']}}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def __repr__(self):
|
|
def __repr__(self):
|
|
|
return "Artere {}-{}".format(self.AR_NOEUD_A, self.AR_NOEUD_B)
|
|
return "Artere {}-{}".format(self.AR_NOEUD_A, self.AR_NOEUD_B)
|
|
|
|
|
|
|
@@ -63,6 +60,7 @@ class Cable(QgsModel):
|
|
|
geom_type = QgsModel.GEOM_LINE
|
|
geom_type = QgsModel.GEOM_LINE
|
|
|
crs = CRS
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
|
|
+ pk = "CA_NUMERO"
|
|
|
schema = {'CA_NUMERO': {'type': 'string', 'maxlength': 17},
|
|
schema = {'CA_NUMERO': {'type': 'string', 'maxlength': 17},
|
|
|
'CA_TYPE': {'type': 'string', 'maxlength': 10, 'empty': False, 'allowed': ['AERIEN', 'IMMEUBLE', 'FACADE', 'MIXTE', 'SOUTERRAIN']},
|
|
'CA_TYPE': {'type': 'string', 'maxlength': 10, 'empty': False, 'allowed': ['AERIEN', 'IMMEUBLE', 'FACADE', 'MIXTE', 'SOUTERRAIN']},
|
|
|
'CA_ETAT': {'type': 'string', 'maxlength': 1, 'empty': False, 'allowed': ['0', '1', '2', '3', '4']},
|
|
'CA_ETAT': {'type': 'string', 'maxlength': 1, 'empty': False, 'allowed': ['0', '1', '2', '3', '4']},
|
|
@@ -89,6 +87,7 @@ class Equipement(QgsModel):
|
|
|
geom_type = QgsModel.GEOM_POINT
|
|
geom_type = QgsModel.GEOM_POINT
|
|
|
crs = CRS
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
|
|
+ pk = "EQ_NOM"
|
|
|
schema = {'EQ_NOM': {'type': 'string', 'maxlength': 10, 'contains_any_of': ['PBO', 'BPE', 'BAI']},
|
|
schema = {'EQ_NOM': {'type': 'string', 'maxlength': 10, 'contains_any_of': ['PBO', 'BPE', 'BAI']},
|
|
|
'EQ_NOM_NOE': {'type': 'string', 'maxlength': 30},
|
|
'EQ_NOM_NOE': {'type': 'string', 'maxlength': 30},
|
|
|
'EQ_ETAT': {'type': 'string', 'maxlength': 1, 'empty': False, 'allowed': ['0', '1', '2', '3', '4']},
|
|
'EQ_ETAT': {'type': 'string', 'maxlength': 1, 'empty': False, 'allowed': ['0', '1', '2', '3', '4']},
|
|
@@ -111,6 +110,7 @@ class Noeud(QgsModel):
|
|
|
geom_type = QgsModel.GEOM_POINT
|
|
geom_type = QgsModel.GEOM_POINT
|
|
|
crs = CRS
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
|
|
+ pk = "NO_NOM"
|
|
|
schema = {'NO_NOM': {'type': 'string', 'maxlength': 30},
|
|
schema = {'NO_NOM': {'type': 'string', 'maxlength': 30},
|
|
|
'NO_ID_INSE': {'type': 'string', 'empty': False, 'regex': r'50\d{3}'},
|
|
'NO_ID_INSE': {'type': 'string', 'empty': False, 'regex': r'50\d{3}'},
|
|
|
'NO_VOIE': {'type': 'string', 'maxlength': 100},
|
|
'NO_VOIE': {'type': 'string', 'maxlength': 100},
|
|
@@ -173,6 +173,7 @@ class Zapbo(QgsModel):
|
|
|
geom_type = QgsModel.GEOM_POLYGON
|
|
geom_type = QgsModel.GEOM_POLYGON
|
|
|
crs = CRS
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
|
|
+ pk = "ID_ZAPBO"
|
|
|
schema = {'ID_ZAPBO': {'type': 'string', 'maxlength': 30, 'contains_any_of': ['PBO', 'BPE']},
|
|
schema = {'ID_ZAPBO': {'type': 'string', 'maxlength': 30, 'contains_any_of': ['PBO', 'BPE']},
|
|
|
'COMMENTAIR': {'type': 'string', 'maxlength': 254, 'empty': True},
|
|
'COMMENTAIR': {'type': 'string', 'maxlength': 254, 'empty': True},
|
|
|
'STATUT': {'type': 'string', 'empty': False, 'allowed': ['APS', 'APD', 'EXE', 'REC']}}
|
|
'STATUT': {'type': 'string', 'empty': False, 'allowed': ['APS', 'APD', 'EXE', 'REC']}}
|
|
@@ -193,45 +194,47 @@ class Mn1Checker(BaseChecker):
|
|
|
def test_load_layers(self):
|
|
def test_load_layers(self):
|
|
|
""" Chargement des données """
|
|
""" Chargement des données """
|
|
|
|
|
|
|
|
|
|
+ self.dataset = {}
|
|
|
|
|
+
|
|
|
for model in models:
|
|
for model in models:
|
|
|
layername = model.layername
|
|
layername = model.layername
|
|
|
|
|
|
|
|
layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == layername.lower()))
|
|
layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == layername.lower()))
|
|
|
if not layer:
|
|
if not layer:
|
|
|
- self.log_error("Couche manquante", layername=layername, critical=True)
|
|
|
|
|
|
|
+ self.log_critical("Couche manquante", model=model)
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
|
|
+ if model.pk:
|
|
|
|
|
+ if not model.pk.lower() in [f.name().lower() for f in layer.fields()]:
|
|
|
|
|
+ self.log_critical(f"Clef primaire manquante ({model.pk})", model=model)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
model.layer = layer
|
|
model.layer = layer
|
|
|
|
|
|
|
|
- self.arteres = list(Artere.layer.getFeatures())
|
|
|
|
|
- self.cables = list(Cable.layer.getFeatures())
|
|
|
|
|
-# self.equipements = list(Equipement.layer.getFeatures())
|
|
|
|
|
-# self.noeuds = list(Noeud.layer.getFeatures())
|
|
|
|
|
- self.tranchees = list(Tranchee.layer.getFeatures())
|
|
|
|
|
-# self.zapbos = list(Zapbo.layer.getFeatures())
|
|
|
|
|
-
|
|
|
|
|
- self.dataset = {Artere: self.arteres,
|
|
|
|
|
- Cable: self.cables,
|
|
|
|
|
- Equipement: self.equipements,
|
|
|
|
|
- Noeud: self.noeuds,
|
|
|
|
|
- Tranchee: self.tranchees,
|
|
|
|
|
- Zapbo: self.zapbos}
|
|
|
|
|
-
|
|
|
|
|
|
|
+ self.dataset[model] = [model(f) for f in layer.getFeatures()]
|
|
|
|
|
+
|
|
|
|
|
+ self.arteres = self.dataset.get(Artere, [])
|
|
|
|
|
+ self.cables = self.dataset.get(Cable, [])
|
|
|
|
|
+ self.equipements = self.dataset.get(Equipement, [])
|
|
|
|
|
+ self.noeuds = self.dataset.get(Noeud, [])
|
|
|
|
|
+ self.tranchees = self.dataset.get(Tranchee, [])
|
|
|
|
|
+ self.zapbos = self.dataset.get(Zapbo, [])
|
|
|
|
|
+
|
|
|
def test_scr(self):
|
|
def test_scr(self):
|
|
|
""" Contrôle des projections """
|
|
""" Contrôle des projections """
|
|
|
for model in models:
|
|
for model in models:
|
|
|
if model.layer.crs().authid() != model.crs:
|
|
if model.layer.crs().authid() != model.crs:
|
|
|
- self.log_error(f"Mauvaise projection (attendu: {model.crs})", layername=model.layername)
|
|
|
|
|
|
|
+ self.log_error(f"Mauvaise projection (attendu: {model.crs})", model=model)
|
|
|
|
|
|
|
|
- def _validate_structure(self, model, features):
|
|
|
|
|
|
|
+ def _validate_structure(self, model, items):
|
|
|
v = CerberusValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
|
|
v = CerberusValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
|
|
|
- for feature in features:
|
|
|
|
|
- attributes = dict(zip([f.name() for f in feature.fields()], feature.attributes()))
|
|
|
|
|
- v.validate(attributes)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ for item in items:
|
|
|
|
|
+ v.validate(item.attributes())
|
|
|
|
|
|
|
|
for field, verrors in v.errors.items():
|
|
for field, verrors in v.errors.items():
|
|
|
for err in verrors:
|
|
for err in verrors:
|
|
|
- self.log_error(_translate_messages(err), layername=model.layername, field=field)
|
|
|
|
|
|
|
+ self.log_error(f"{field} : {_translate_messages(err)}", item=item)
|
|
|
|
|
|
|
|
def test_structure_arteres(self):
|
|
def test_structure_arteres(self):
|
|
|
""" Structure des données: Artères """
|
|
""" Structure des données: Artères """
|
|
@@ -260,167 +263,181 @@ class Mn1Checker(BaseChecker):
|
|
|
def test_geometry_validity(self):
|
|
def test_geometry_validity(self):
|
|
|
""" Contrôle de la validité des géométries """
|
|
""" Contrôle de la validité des géométries """
|
|
|
for model in models:
|
|
for model in models:
|
|
|
- for f in self.dataset[model]:
|
|
|
|
|
- if not f.geometry().isGeosValid():
|
|
|
|
|
- self.log_error("La géométrie de l'objet est invalide", layername=model.layername, field="geom")
|
|
|
|
|
|
|
+ for item in self.dataset[model]:
|
|
|
|
|
+ if not item.is_geometry_valid():
|
|
|
|
|
+ self.log_error("La géométrie de l'objet est invalide", item=item)
|
|
|
|
|
|
|
|
def test_geometry_type(self):
|
|
def test_geometry_type(self):
|
|
|
|
|
+ """ Contrôle des types de géométries """
|
|
|
for model in models:
|
|
for model in models:
|
|
|
- for f in self.dataset[model]:
|
|
|
|
|
- if QgsWkbTypes.singleType(f.geometry().wkbType()) != model.geom_type:
|
|
|
|
|
- self.log_error("Type de géométrie invalide (attendu: {})".format(QgsModel.GEOM_NAMES[model.geom_type]), layername=model.layername, field="geom")
|
|
|
|
|
|
|
+ for item in self.dataset[model]:
|
|
|
|
|
+ geom_type = item.get_geom_type()
|
|
|
|
|
+ if geom_type != model.geom_type:
|
|
|
|
|
+ self.log_error(f"Type de géométrie invalide (attendu: {QgsModel.GEOM_NAMES[geom_type]})", item=item)
|
|
|
|
|
|
|
|
def test_bounding_box(self):
|
|
def test_bounding_box(self):
|
|
|
-
|
|
|
|
|
|
|
+ """ Contrôle des emprises """
|
|
|
|
|
+
|
|
|
for model in models:
|
|
for model in models:
|
|
|
xmin, ymin, xmax, ymax = model.bounding_box
|
|
xmin, ymin, xmax, ymax = model.bounding_box
|
|
|
|
|
|
|
|
- for f in self.dataset[model]:
|
|
|
|
|
|
|
+ for item in self.dataset[model]:
|
|
|
|
|
|
|
|
- bb = f.geometry().boundingBox()
|
|
|
|
|
- x1, y1, x2, y2 = (bb.xMinimum(), bb.yMinimum(), bb.xMaximum(), bb.yMaximum())
|
|
|
|
|
|
|
+ x1, y1, x2, y2 = item.get_bounding_box()
|
|
|
|
|
|
|
|
if any(x < xmin or x > xmax for x in (x1, x2)) or \
|
|
if any(x < xmin or x > xmax for x in (x1, x2)) or \
|
|
|
any(y < ymin or y > ymax for y in (y1, y2)):
|
|
any(y < ymin or y > ymax for y in (y1, y2)):
|
|
|
- self.log_error("Hors de l'emprise autorisée", layername=model.layername, field="geom")
|
|
|
|
|
-
|
|
|
|
|
|
|
+ self.log_error("Hors de l'emprise autorisée", item=item)
|
|
|
|
|
|
|
|
def test_duplicates(self):
|
|
def test_duplicates(self):
|
|
|
""" Recherche de doublons """
|
|
""" Recherche de doublons """
|
|
|
|
|
|
|
|
- # TODO: séparer le chargement des données et le contrôle des doublons
|
|
|
|
|
-
|
|
|
|
|
- self.noeuds = {}
|
|
|
|
|
- for noeud in self.dataset[Noeud]:
|
|
|
|
|
- if not noeud.NO_NOM in self.noeuds:
|
|
|
|
|
- self.noeuds[noeud.NO_NOM] = noeud
|
|
|
|
|
|
|
+ tmp = []
|
|
|
|
|
+ logger.info(self.noeuds)
|
|
|
|
|
+ for noeud in self.noeuds:
|
|
|
|
|
+ if not noeud.NO_NOM:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if not noeud.NO_NOM in tmp:
|
|
|
|
|
+ tmp.append(noeud.NO_NOM)
|
|
|
else:
|
|
else:
|
|
|
- self.log_error("Doublons dans le champs: {}".format(noeud), layername=Noeud.layername, field="NO_NOM")
|
|
|
|
|
|
|
+ self.log_error("Doublons dans le champs NO_NOM", item=noeud)
|
|
|
|
|
|
|
|
- self.equipements = {}
|
|
|
|
|
- for equipement in self.dataset[Equipement]:
|
|
|
|
|
- if not equipement.EQ_NOM in self.equipements:
|
|
|
|
|
- self.equipements[equipement.EQ_NOM] = equipement
|
|
|
|
|
|
|
+ tmp = []
|
|
|
|
|
+ for equipement in self.equipements:
|
|
|
|
|
+ if not equipement.EQ_NOM:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if not equipement.EQ_NOM in tmp:
|
|
|
|
|
+ tmp.append(equipement.EQ_NOM)
|
|
|
else:
|
|
else:
|
|
|
- self.log_error("Doublons dans le champs: {}".format(equipement), layername=Equipement.layername, field="EQ_NOM")
|
|
|
|
|
|
|
+ self.log_error("Doublons dans le champs EQ_NOM", item=equipement)
|
|
|
|
|
|
|
|
- self.zapbos = {}
|
|
|
|
|
- for zapbo in self.dataset[Zapbo]:
|
|
|
|
|
- if not zapbo.ID_ZAPBO in self.zapbos:
|
|
|
|
|
- self.zapbos[zapbo.ID_ZAPBO] = zapbo
|
|
|
|
|
|
|
+ tmp = []
|
|
|
|
|
+ for zapbo in self.zapbos:
|
|
|
|
|
+ if not zapbo.ID_ZAPBO:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if not zapbo.ID_ZAPBO in tmp:
|
|
|
|
|
+ tmp.append(zapbo.ID_ZAPBO)
|
|
|
else:
|
|
else:
|
|
|
- self.log_error("Doublons dans le champs: {}".format(zapbo), layername=Zapbo.layername, field="ID_ZAPBO")
|
|
|
|
|
|
|
+ self.log_error("Doublons dans le champs ID_ZAPBO", item=zapbo)
|
|
|
|
|
|
|
|
|
|
|
|
|
- def test_constraints(self):
|
|
|
|
|
- """ Contrôle des contraintes relationnelles """
|
|
|
|
|
|
|
+ def test_constraints_arteres_noeuds(self):
|
|
|
|
|
+ """ Les noeuds attachés aux artères existent """
|
|
|
|
|
|
|
|
- # rattachement les noeuds aux artères
|
|
|
|
|
for artere in self.arteres:
|
|
for artere in self.arteres:
|
|
|
try:
|
|
try:
|
|
|
- artere.noeud_a = self.noeuds[artere.AR_NOEUD_A]
|
|
|
|
|
- except KeyError:
|
|
|
|
|
|
|
+ artere.noeud_a = next((n for n in self.noeuds if n.NO_NOM == artere.AR_NOEUD_A))
|
|
|
|
|
+ except StopIteration:
|
|
|
artere.noeud_a = None
|
|
artere.noeud_a = None
|
|
|
- self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_A), layername=Artere.layername, field="AR_NOEUD_A"))
|
|
|
|
|
|
|
+ self.log_error(f"Le noeud '{artere.AR_NOEUD_A}' n'existe pas", item=artere)
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
- artere.noeud_b = self.noeuds[artere.AR_NOEUD_B]
|
|
|
|
|
- except KeyError:
|
|
|
|
|
|
|
+ artere.noeud_b = next((n for n in self.noeuds if n.NO_NOM == artere.AR_NOEUD_B))
|
|
|
|
|
+ except StopIteration:
|
|
|
artere.noeud_b = None
|
|
artere.noeud_b = None
|
|
|
- self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_B), layername=Artere.layername, field="AR_NOEUD_A"))
|
|
|
|
|
|
|
+ self.log_error(f"Le noeud '{artere.AR_NOEUD_B}' n'existe pas", item=artere)
|
|
|
|
|
+
|
|
|
|
|
+ def test_constraints_cables_equipements(self):
|
|
|
|
|
+ """ Les équipements attachés aux cables existent """
|
|
|
|
|
|
|
|
- # rattachement des equipements aux cables
|
|
|
|
|
for cable in self.cables:
|
|
for cable in self.cables:
|
|
|
try:
|
|
try:
|
|
|
- cable.equipement_a = self.equipements[cable.CA_EQ_A]
|
|
|
|
|
- except KeyError:
|
|
|
|
|
|
|
+ cable.equipement_a = next((e for e in self.equipements if e.EQ_NOM == cable.CA_EQ_A))
|
|
|
|
|
+ except StopIteration:
|
|
|
cable.equipement_a = None
|
|
cable.equipement_a = None
|
|
|
- self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_A), layername=Cable.layername, field="CA_EQ_A"))
|
|
|
|
|
|
|
+ self.log_error(f"L'équipement '{cable.CA_EQ_A}' n'existe pas", item=cable)
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
- cable.equipement_b = self.equipements[cable.CA_EQ_B]
|
|
|
|
|
- except KeyError:
|
|
|
|
|
|
|
+ cable.equipement_b = next((e for e in self.equipements if e.EQ_NOM == cable.CA_EQ_B))
|
|
|
|
|
+ except StopIteration:
|
|
|
cable.equipement_b = None
|
|
cable.equipement_b = None
|
|
|
- self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_B), layername=Cable.layername, field="CA_EQ_B"))
|
|
|
|
|
-
|
|
|
|
|
- # rattachement des equipements aux noeuds
|
|
|
|
|
- for equipement in self.equipements.values():
|
|
|
|
|
- try:
|
|
|
|
|
- equipement.noeud = self.noeuds[equipement.EQ_NOM_NOE]
|
|
|
|
|
- except KeyError:
|
|
|
|
|
- equipement.noeud = None
|
|
|
|
|
- self.log_error(RelationError("Le noeud '{}' n'existe pas".format(equipement.EQ_NOM_NOE, equipement.EQ_NOM), layername=Equipement.layername, field="EQ_NOM_NOE"))
|
|
|
|
|
|
|
+ self.log_error(f"L'équipement '{cable.CA_EQ_B}' n'existe pas", item=cable)
|
|
|
|
|
|
|
|
- # verifie que tous les equipements sont l'equipement B d'au moins un cable
|
|
|
|
|
|
|
+ def test_constraints_cables_equipements_b(self):
|
|
|
|
|
+ """ Tous les équipements sont l'équipement B d'au moins un cable """
|
|
|
|
|
+
|
|
|
equipements_b = [cable.CA_EQ_B for cable in self.cables]
|
|
equipements_b = [cable.CA_EQ_B for cable in self.cables]
|
|
|
- for eq_id in self.equipements:
|
|
|
|
|
- if self.equipements[eq_id].EQ_TYPE == "BAI":
|
|
|
|
|
|
|
+ for equipement in self.equipements:
|
|
|
|
|
+ if equipement.EQ_TYPE == "BAI":
|
|
|
continue
|
|
continue
|
|
|
- if not eq_id in equipements_b:
|
|
|
|
|
- self.log_error(RelationError("L'equipement '{}' n'est l'équipement B d'aucun cable".format(eq_id), layername=Equipement.layername, field="EQ_NOM"))
|
|
|
|
|
|
|
+ if not equipement.EQ_NOM in equipements_b:
|
|
|
|
|
+ self.log_error(f"L'equipement '{equipement.EQ_NOM}' n'est l'équipement B d'aucun cable", item=equipement)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def test_constraints_equipements_noeuds(self):
|
|
|
|
|
+ """ Les noeuds attachés aux équipements existent """
|
|
|
|
|
+
|
|
|
|
|
+ for equipement in self.equipements:
|
|
|
|
|
+ try:
|
|
|
|
|
+ equipement.noeud = next((n for n in self.noeuds if n.NO_NOM == equipement.EQ_NOM_NOE))
|
|
|
|
|
+ except StopIteration:
|
|
|
|
|
+ equipement.noeud = None
|
|
|
|
|
+ self.log_error(f"Le noeud '{equipement.EQ_NOM_NOE}' n'existe pas", item=equipement)
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphic_duplicates(self):
|
|
def test_graphic_duplicates(self):
|
|
|
""" Recherche de doublons graphiques """
|
|
""" Recherche de doublons graphiques """
|
|
|
|
|
|
|
|
- # controle des doublons graphiques
|
|
|
|
|
for i, tranchee in enumerate(self.tranchees):
|
|
for i, tranchee in enumerate(self.tranchees):
|
|
|
for other in self.tranchees[i+1:]:
|
|
for other in self.tranchees[i+1:]:
|
|
|
if tranchee.geom == other.geom:
|
|
if tranchee.geom == other.geom:
|
|
|
- self.log_error(DuplicatedGeom("Une entité graphique est dupliquée".format(tranchee), layername=Tranchee.layername, field="geom"))
|
|
|
|
|
|
|
+ self.log_error("Une entité graphique est dupliquée", item=tranchee)
|
|
|
|
|
|
|
|
for i, artere in enumerate(self.arteres):
|
|
for i, artere in enumerate(self.arteres):
|
|
|
for other in self.arteres[i+1:]:
|
|
for other in self.arteres[i+1:]:
|
|
|
if artere.geom == other.geom:
|
|
if artere.geom == other.geom:
|
|
|
- self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(artere), layername=Artere.layername, field="geom"))
|
|
|
|
|
|
|
+ self.log_error("Une entité graphique est dupliquée", item=artere)
|
|
|
|
|
|
|
|
for i, cable in enumerate(self.cables):
|
|
for i, cable in enumerate(self.cables):
|
|
|
for other in self.cables[i+1:]:
|
|
for other in self.cables[i+1:]:
|
|
|
if cable.geom == other.geom and cable.CA_EQ_A == other.CA_EQ_A and cable.CA_EQ_B == other.CA_EQ_B:
|
|
if cable.geom == other.geom and cable.CA_EQ_A == other.CA_EQ_A and cable.CA_EQ_B == other.CA_EQ_B:
|
|
|
- self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(cable), layername=Cable.layername, field="geom"))
|
|
|
|
|
|
|
+ self.log_error("Une entité graphique est dupliquée", item=cable)
|
|
|
|
|
|
|
|
- ls_noeuds = list(self.noeuds.values())
|
|
|
|
|
- for i, noeud in enumerate(ls_noeuds):
|
|
|
|
|
- for other in ls_noeuds[i+1:]:
|
|
|
|
|
|
|
+ for i, noeud in enumerate(self.noeuds):
|
|
|
|
|
+ for other in self.noeuds[i+1:]:
|
|
|
if noeud.geom == other.geom:
|
|
if noeud.geom == other.geom:
|
|
|
- self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(noeud), layername=Noeud.layername, field="geom"))
|
|
|
|
|
- del ls_noeuds
|
|
|
|
|
|
|
+ self.log_error("Une entité graphique est dupliquée", item=noeud)
|
|
|
|
|
|
|
|
- ls_zapbos = list(self.zapbos.values())
|
|
|
|
|
- for i, zapbo in enumerate(ls_zapbos):
|
|
|
|
|
- for other in ls_zapbos[i+1:]:
|
|
|
|
|
|
|
+ for i, zapbo in enumerate(self.zapbos):
|
|
|
|
|
+ for other in self.zapbos[i+1:]:
|
|
|
if zapbo.geom == other.geom:
|
|
if zapbo.geom == other.geom:
|
|
|
- self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(zapbo), layername=Zapbo.layername, field="geom"))
|
|
|
|
|
- del ls_zapbos
|
|
|
|
|
|
|
+ self.log_error("Une entité graphique est dupliquée", item=zapbo)
|
|
|
|
|
|
|
|
def test_positions_noeuds(self):
|
|
def test_positions_noeuds(self):
|
|
|
""" Contrôle de la position des noeuds """
|
|
""" Contrôle de la position des noeuds """
|
|
|
|
|
|
|
|
- # Arteres: comparer la géométrie à celle des noeuds
|
|
|
|
|
for artere in self.arteres:
|
|
for artere in self.arteres:
|
|
|
if not artere.noeud_a or not artere.noeud_b:
|
|
if not artere.noeud_a or not artere.noeud_b:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- if not any(((artere.points[0].distanceSquared(artere.noeud_a.points[0]) <= TOLERANCE and \
|
|
|
|
|
- artere.points[-1].distanceSquared(artere.noeud_b.points[0]) <= TOLERANCE),
|
|
|
|
|
- (artere.points[0].distanceSquared(artere.noeud_b.points[0]) <= TOLERANCE and \
|
|
|
|
|
- artere.points[-1].distanceSquared(artere.noeud_a.points[0]) <= TOLERANCE))):
|
|
|
|
|
- self.log_error(MissingItem("Pas de noeud aux coordonnées attendues ('{}')".format(artere), layername=Artere.layername, field="geom"))
|
|
|
|
|
|
|
+ artere_points = artere.get_points()
|
|
|
|
|
+ noeud_a_point = artere.noeud_a.get_points()[0]
|
|
|
|
|
+ noeud_b_point = artere.noeud_b.get_points()[0]
|
|
|
|
|
+
|
|
|
|
|
+ if not any(((artere_points[0].distanceSquared(noeud_a_point) <= TOLERANCE and \
|
|
|
|
|
+ artere_points[-1].distanceSquared(noeud_b_point) <= TOLERANCE),
|
|
|
|
|
+ (artere_points[0].distanceSquared(noeud_b_point) <= TOLERANCE and \
|
|
|
|
|
+ artere_points[-1].distanceSquared(noeud_a_point) <= TOLERANCE))):
|
|
|
|
|
+ self.log_error("Pas de noeud aux coordonnées attendues", item=artere)
|
|
|
|
|
|
|
|
def test_positions_equipements(self):
|
|
def test_positions_equipements(self):
|
|
|
""" Contrôle de la position des équipements """
|
|
""" Contrôle de la position des équipements """
|
|
|
|
|
|
|
|
- # Cables: comparer la géométrie à celle des equipements (on utilise en fait la geom du noeud correspondant à l'équipement)
|
|
|
|
|
for cable in self.cables:
|
|
for cable in self.cables:
|
|
|
if not cable.equipement_a or not cable.equipement_b or not cable.is_geometry_valid() or not cable.equipement_a.noeud or not cable.equipement_b.noeud:
|
|
if not cable.equipement_a or not cable.equipement_b or not cable.is_geometry_valid() or not cable.equipement_a.noeud or not cable.equipement_b.noeud:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- if not any(((cable.points[0].distanceSquared(cable.equipement_a.noeud.points[0]) <= TOLERANCE and \
|
|
|
|
|
- cable.points[-1].distanceSquared(cable.equipement_b.noeud.points[0]) <= TOLERANCE),
|
|
|
|
|
- (cable.points[0].distanceSquared(cable.equipement_b.noeud.points[0]) <= TOLERANCE and \
|
|
|
|
|
- cable.points[-1].distanceSquared(cable.equipement_a.noeud.points[0]) <= TOLERANCE))):
|
|
|
|
|
|
|
+ #! attention: on utilise la géométrie du noeud associé à l'équipement, pas celle de l'équipement lui-même
|
|
|
|
|
+ cable_points = cable.get_points()
|
|
|
|
|
+ equip_a_point = cable.equipement_a.noeud.get_points()[0]
|
|
|
|
|
+ equip_b_point = cable.equipement_b.noeud.get_points()[0]
|
|
|
|
|
+
|
|
|
|
|
+ if not any(((cable_points[0].distanceSquared(equip_a_point) <= TOLERANCE and \
|
|
|
|
|
+ cable_points[-1].distanceSquared(equip_b_point) <= TOLERANCE),
|
|
|
|
|
+ (cable_points[0].distanceSquared(equip_b_point) <= TOLERANCE and \
|
|
|
|
|
+ cable_points[-1].distanceSquared(equip_a_point) <= TOLERANCE))):
|
|
|
|
|
|
|
|
- self.log_error(MissingItem("Pas d'equipement aux coordonnées attendues ('{}')".format(cable), layername=Cable.layername, field="geom"))
|
|
|
|
|
|
|
+ self.log_error("Pas d'equipement aux coordonnées attendues", item=cable)
|
|
|
|
|
|
|
|
def test_tranchee_artere(self):
|
|
def test_tranchee_artere(self):
|
|
|
""" Chaque tranchée a au moins une artère """
|
|
""" Chaque tranchée a au moins une artère """
|
|
@@ -429,10 +446,11 @@ class Mn1Checker(BaseChecker):
|
|
|
|
|
|
|
|
for tranchee in self.tranchees:
|
|
for tranchee in self.tranchees:
|
|
|
if not arteres_full_buffer.contains(tranchee.geom):
|
|
if not arteres_full_buffer.contains(tranchee.geom):
|
|
|
- self.log_error(MissingItem("Tranchée ou portion de tranchée sans artère ('{}')".format(tranchee), layername=Tranchee.layername, field="*"))
|
|
|
|
|
|
|
+ self.log_error("Tranchée ou portion de tranchée sans artère", item=tranchee)
|
|
|
|
|
|
|
|
def test_cable_artere(self):
|
|
def test_cable_artere(self):
|
|
|
""" Chaque cable a au moins une artère (sauf baguettes) """
|
|
""" Chaque cable a au moins une artère (sauf baguettes) """
|
|
|
|
|
+
|
|
|
arteres_full_buffer = QgsGeometry().unaryUnion([a.geom for a in self.arteres]).buffer(TOLERANCE, 100)
|
|
arteres_full_buffer = QgsGeometry().unaryUnion([a.geom for a in self.arteres]).buffer(TOLERANCE, 100)
|
|
|
|
|
|
|
|
# Verifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
# Verifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
@@ -440,38 +458,36 @@ class Mn1Checker(BaseChecker):
|
|
|
if "baguette" in cable.CA_COMMENT.lower() or not cable.is_geometry_valid():
|
|
if "baguette" in cable.CA_COMMENT.lower() or not cable.is_geometry_valid():
|
|
|
continue
|
|
continue
|
|
|
if not arteres_full_buffer.contains(cable.geom):
|
|
if not arteres_full_buffer.contains(cable.geom):
|
|
|
- self.log_error(MissingItem("Cable ou portion de cable sans artère ('{}')".format(cable), layername=Cable.layername, field="*"))
|
|
|
|
|
|
|
+ self.log_error("Cable ou portion de cable sans artère", item=cable)
|
|
|
|
|
|
|
|
def test_artere_cable(self):
|
|
def test_artere_cable(self):
|
|
|
""" Chaque artère a au moins une cable (sauf cas spécifiques) """
|
|
""" Chaque artère a au moins une cable (sauf cas spécifiques) """
|
|
|
|
|
|
|
|
# Verifie que chaque artère a au moins un cable (sauf si commentaire contient un de ces mots 'racco client adductio attente bus 'sans cable'')
|
|
# Verifie que chaque artère a au moins un cable (sauf si commentaire contient un de ces mots 'racco client adductio attente bus 'sans cable'')
|
|
|
-
|
|
|
|
|
cables_full_buffer = QgsGeometry().unaryUnion([c.geom for c in self.cables]).buffer(TOLERANCE, 100)
|
|
cables_full_buffer = QgsGeometry().unaryUnion([c.geom for c in self.cables]).buffer(TOLERANCE, 100)
|
|
|
|
|
|
|
|
for artere in self.arteres:
|
|
for artere in self.arteres:
|
|
|
if any(x in artere.AR_COMMENT.lower() for x in ['racco','client','adductio','attente','bus','sans cable']):
|
|
if any(x in artere.AR_COMMENT.lower() for x in ['racco','client','adductio','attente','bus','sans cable']):
|
|
|
continue
|
|
continue
|
|
|
if not cables_full_buffer.contains(artere.geom):
|
|
if not cables_full_buffer.contains(artere.geom):
|
|
|
- self.log_error(MissingItem("Artère sans cable ('{}')".format(artere), layername=Artere.layername, field="*"))
|
|
|
|
|
-
|
|
|
|
|
|
|
+ self.log_error("Artère ou portion d'artère sans cable", item=artere)
|
|
|
|
|
|
|
|
- def test_dimensions(self):
|
|
|
|
|
- """ Contrôle des dimensions logiques """
|
|
|
|
|
|
|
+ def test_dimensions_fourreaux(self):
|
|
|
|
|
+ """ Nombres de fourreaux cohérents """
|
|
|
|
|
|
|
|
for artere in self.arteres:
|
|
for artere in self.arteres:
|
|
|
try:
|
|
try:
|
|
|
if not int(artere.AR_FOU_DIS) <= int(artere.AR_NB_FOUR):
|
|
if not int(artere.AR_FOU_DIS) <= int(artere.AR_NB_FOUR):
|
|
|
- self.log_error(DimensionError("Le nombre de fourreaux disponibles doit être inférieur au nombre total ('{}')".format(artere), layername=Artere.layername, field="AR_FOU_DIS"))
|
|
|
|
|
|
|
+ self.log_error("Le nombre de fourreaux disponibles (AR_FOU_DIS) doit être inférieur au nombre total (AR_NB_FOUR)", item=artere)
|
|
|
except (TypeError, ValueError):
|
|
except (TypeError, ValueError):
|
|
|
pass
|
|
pass
|
|
|
|
|
|
|
|
for cable in self.cables:
|
|
for cable in self.cables:
|
|
|
try:
|
|
try:
|
|
|
if not int(cable.CA_NB_FO_U) <= int(cable.CA_NB_FO):
|
|
if not int(cable.CA_NB_FO_U) <= int(cable.CA_NB_FO):
|
|
|
- self.log_error(DimensionError("Le nombre de fourreaux utilisés doit être inférieur au nombre total ('{}')".format(cable), layername=Cable.layername, field="CA_NB_FO_U"))
|
|
|
|
|
|
|
+ self.log_error("Le nombre de fourreaux utilisés (CA_NB_FO_U) doit être inférieur au nombre total (CA_NB_FO)", item=cable)
|
|
|
if not int(cable.CA_NB_FO_D) <= int(cable.CA_NB_FO):
|
|
if not int(cable.CA_NB_FO_D) <= int(cable.CA_NB_FO):
|
|
|
- self.log_error(DimensionError("Le nombre de fourreaux disponibles doit être inférieur au nombre total ('{}')".format(cable), layername=Cable.layername, field="CA_NB_FO_D"))
|
|
|
|
|
|
|
+ self.log_error("Le nombre de fourreaux disponibles (CA_NB_FO_D) doit être inférieur au nombre total (CA_NB_FO)", item=cable)
|
|
|
except (TypeError, ValueError):
|
|
except (TypeError, ValueError):
|
|
|
pass
|
|
pass
|
|
|
|
|
|
|
@@ -479,48 +495,49 @@ class Mn1Checker(BaseChecker):
|
|
|
""" PBO / ZAPBO """
|
|
""" PBO / ZAPBO """
|
|
|
|
|
|
|
|
# Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
# Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
|
- for equipement in self.equipements.values():
|
|
|
|
|
|
|
+ for equipement in self.equipements:
|
|
|
if not equipement.EQ_TYPE == "PBO":
|
|
if not equipement.EQ_TYPE == "PBO":
|
|
|
continue
|
|
continue
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
#zapbos englobant l'equipement
|
|
#zapbos englobant l'equipement
|
|
|
candidates = []
|
|
candidates = []
|
|
|
- for zapbo in self.zapbos.values():
|
|
|
|
|
|
|
+ for zapbo in self.zapbos:
|
|
|
if zapbo.geom.contains(equipement.geom):
|
|
if zapbo.geom.contains(equipement.geom):
|
|
|
candidates.append(zapbo)
|
|
candidates.append(zapbo)
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# le pbo doit être contenu dans une zapbo
|
|
# le pbo doit être contenu dans une zapbo
|
|
|
if not candidates:
|
|
if not candidates:
|
|
|
- self.log_error(MissingItem("Le PBO n'est contenu dans aucune ZAPBO: {}".format(equipement), layername=Equipement.layername, field="geom"))
|
|
|
|
|
|
|
+ self.log_error("Le PBO n'est contenu dans aucune ZAPBO", item=equipement)
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
# On se base sur le nom pour trouver la zapbo correspondante
|
|
# On se base sur le nom pour trouver la zapbo correspondante
|
|
|
try:
|
|
try:
|
|
|
equipement.zapbo = next((z for z in candidates if equipement.EQ_NOM in z.ID_ZAPBO))
|
|
equipement.zapbo = next((z for z in candidates if equipement.EQ_NOM in z.ID_ZAPBO))
|
|
|
except StopIteration:
|
|
except StopIteration:
|
|
|
- self.log_error(MissingItem("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contient: {}".format(equipement), layername=Equipement.layername, field="EQ_NOM"))
|
|
|
|
|
|
|
+ self.log_error("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contient", item=equipement)
|
|
|
break
|
|
break
|
|
|
|
|
|
|
|
- # TODO: revoir et déplacer le calcul du nombre de prises
|
|
|
|
|
- if not hasattr(equipement.zapbo, "nb_prises") or equipement.zapbo.nb_prises is None:
|
|
|
|
|
- equipement.zapbo.nb_prises = 0
|
|
|
|
|
-
|
|
|
|
|
- def test_pbo_dimension(self):
|
|
|
|
|
|
|
+
|
|
|
|
|
+ # a venir
|
|
|
|
|
+ def __test_pbo_dimension(self):
|
|
|
""" Dimensionnement des PBO """
|
|
""" Dimensionnement des PBO """
|
|
|
- for equipement in self.equipements.values():
|
|
|
|
|
|
|
+ for equipement in self.equipements:
|
|
|
if not equipement.EQ_TYPE == "PBO":
|
|
if not equipement.EQ_TYPE == "PBO":
|
|
|
continue
|
|
continue
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ if not hasattr(equipement.zapbo, "nb_prises") or equipement.zapbo.nb_prises is None:
|
|
|
|
|
+ equipement.zapbo.nb_prises = 0
|
|
|
|
|
+
|
|
|
# Controle du dimensionnement des PBO
|
|
# Controle du dimensionnement des PBO
|
|
|
if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
|
|
if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
|
|
|
- self.log_error(DimensionError("Le PBO 6 contient plus de 5 prises: {}".format(equipement), layername=Equipement.layername, field="*"))
|
|
|
|
|
|
|
+ self.log_error("Le PBO 6 contient plus de 5 prises", item=equipement)
|
|
|
|
|
|
|
|
if equipement.EQ_TYPE_PH == 'PBO 12' and not equipement.zapbo.nb_prises >= 6 and equipement.zapbo.nb_prises <= 8:
|
|
if equipement.EQ_TYPE_PH == 'PBO 12' and not equipement.zapbo.nb_prises >= 6 and equipement.zapbo.nb_prises <= 8:
|
|
|
- self.log_error(DimensionError("Le PBO 12 contient mois de 6 prises ou plus de 8 prises: {}".format(equipement), layername=Equipement.layername, field="*"))
|
|
|
|
|
|
|
+ self.log_error("Le PBO 12 contient mois de 6 prises ou plus de 8 prises", item=equipement)
|
|
|
|
|
|
|
|
if equipement.zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
|
|
if equipement.zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
|
|
|
- self.log_error(TechnicalValidationError("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO: {}".format(equipement), layername=Equipement.layername, field="*"))
|
|
|
|
|
|
|
+ self.log_error("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO", item=equipement)
|
|
|
|
|
|
|
|
if equipement.EQ_STATUT == "REC" and not equipement.zapbo.STATUT == "REC" and not equipement.zapbo.ID_ZAPBO[:4].lower() == "att_":
|
|
if equipement.EQ_STATUT == "REC" and not equipement.zapbo.STATUT == "REC" and not equipement.zapbo.ID_ZAPBO[:4].lower() == "att_":
|
|
|
- self.log_error(TechnicalValidationError("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO: {}".format(equipement), layername=Equipement.layername, field="*"))
|
|
|
|
|
|
|
+ self.log_error("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO", item=equipement)
|
|
|
|
|
|