|
|
@@ -6,14 +6,13 @@
|
|
|
'''
|
|
|
|
|
|
import logging
|
|
|
-from qgis.core import QgsProject, QgsGeometry
|
|
|
+from qgis.core import QgsProject
|
|
|
|
|
|
from core.cerberus_ import is_float, is_multi_int, is_int, \
|
|
|
is_modern_french_date, CerberusValidator, CerberusErrorHandler, \
|
|
|
_translate_messages
|
|
|
from core.checking import BaseChecker
|
|
|
from core.mncheck import QgsModel
|
|
|
-from schemas import mn1_rec
|
|
|
|
|
|
|
|
|
logger = logging.getLogger("mncheck")
|
|
|
@@ -25,7 +24,7 @@ STATUTS = ['EN ETUDE', 'EN REALISATION', 'EN SERVICE', 'HORS SERVICE']
|
|
|
XMIN, XMAX, YMIN, YMAX = 1341999, 1429750, 8147750, 8294000
|
|
|
CRS = 'EPSG:3949' # Coordinate Reference System
|
|
|
|
|
|
-TOLERANCE = 1
|
|
|
+TOLERANCE = 1.0
|
|
|
|
|
|
##### Modeles
|
|
|
|
|
|
@@ -36,8 +35,8 @@ class Artere(QgsModel):
|
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
pk = "AR_CODE"
|
|
|
- schema = {'AR_CODE': {'type': 'string', 'maxlength': 26},
|
|
|
- 'AR_NOM': {'type': 'string', 'maxlength': 26},
|
|
|
+ schema = {'AR_CODE': {'type': 'string', 'maxlength': 30},
|
|
|
+ 'AR_NOM': {'type': 'string', 'maxlength': 30},
|
|
|
'AR_ID_INSE': {'type': 'string', 'empty': False, 'regex': r'50\d{3}'},
|
|
|
'AR_LONG': {'empty': False, 'validator': is_float},
|
|
|
'AR_ETAT': {'type': 'string', 'empty': False, 'allowed': ['0', '1', '2', '3', '4']},
|
|
|
@@ -50,11 +49,11 @@ class Artere(QgsModel):
|
|
|
'AR_TYFO_AI': {'type': 'string', 'multiallowed': ['PVC', 'PEH', 'TUB', 'FAC', 'ENC', 'APP']},
|
|
|
'AR_DIAM_FO': {'type': 'string', 'multiallowed': ['10', '14', '18', '25', '28', '32', '40', '45', '60', '80', '150', 'NUL']},
|
|
|
'AR_PRO_FOU': {'type': 'string', 'multiallowed': ['MANCHE NUMERIQUE', 'COLLECTIVITE', 'ORANGE', 'PRIVE', 'ERDF', 'AUTRE (à préciser)']},
|
|
|
- 'AR_FABRICANT': {'type': 'string', 'empty': False, 'maxlength': 100},
|
|
|
- 'AR_REF_FABRICANT': {'type': 'string', 'empty': False, 'maxlength': 100},
|
|
|
+ 'AR_FAB': {'type': 'string', 'empty': False, 'maxlength': 100},
|
|
|
+ 'AR_REFFAB': {'type': 'string', 'empty': False, 'maxlength': 100},
|
|
|
'AR_COULEUR': {'type': 'string', 'empty': False, 'maxlength': 20},
|
|
|
- 'AR_AIGUILLEE': {'type': 'string', 'empty': False, 'maxlength': 3, 'allowed': ['OUI', 'NON']},
|
|
|
- 'AR_NB_CABLES': {'empty': False, 'validator': is_int},
|
|
|
+ 'AR_AIGUIL': {'type': 'string', 'empty': False, 'maxlength': 3, 'allowed': ['OUI', 'NON']},
|
|
|
+ 'AR_NBCABL': {'empty': False, 'validator': is_int},
|
|
|
'AR_PRO_CAB': {'type': 'string', 'empty': False, 'allowed': ['MANCHE NUMERIQUE']},
|
|
|
'AR_GEST_FO': {'type': 'string', 'multiallowed': ['MANCHE NUMERIQUE', 'MANCHE TELECOM', 'COLLECTIVITE', 'ORANGE', 'MANCHE FIBRE', 'PRIVE', 'ERDF', 'AUTRE (à préciser)', 'NUL']},
|
|
|
'AR_UTIL_FO': {'type': 'string', 'multiallowed': ['MANCHE NUMERIQUE', 'MANCHE TELECOM', 'COLLECTIVITE', 'ORANGE', 'MANCHE FIBRE', 'PRIVE', 'AUTRE (à préciser)', 'NUL']},
|
|
|
@@ -73,8 +72,8 @@ class Cable(QgsModel):
|
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
pk = "CA_CODE"
|
|
|
- schema = {'CA_CODE': {'type': 'string', 'maxlength': 18},
|
|
|
- 'CA_NOM': {'type': 'string', 'maxlength': 18},
|
|
|
+ schema = {'CA_CODE': {'type': 'string', 'maxlength': 30},
|
|
|
+ 'CA_NOM': {'type': 'string', 'maxlength': 30},
|
|
|
'CA_NUMERO': {'type': 'string', 'maxlength': 17},
|
|
|
'CA_EMPRISE': {'type': 'string', 'maxlength': 10},
|
|
|
'CA_FAB': {'type': 'string', 'maxlength': 100},
|
|
|
@@ -89,7 +88,6 @@ class Cable(QgsModel):
|
|
|
'CA_DIAMETR': {'empty': False, 'validator': is_float},
|
|
|
'CA_COULEUR': {'type': 'string', 'maxlength': 20, 'empty': False, 'allowed': ['NOIR', 'BLEU', 'BLANC']},
|
|
|
'CA_TECHNOL': {'type': 'string', 'maxlength': 17, 'empty': False, 'allowed': ['G657A2_M6', 'G657A2_M12']},
|
|
|
- 'CA_NB_FO': {'validator': is_int},
|
|
|
'CA_NB_FO_U': {'empty': False, 'validator': is_int},
|
|
|
'CA_NB_FO_D': {'empty': False, 'validator': is_int},
|
|
|
'CA_PRO': {'type': 'string', 'maxlength': 20, 'empty': False, 'allowed': ['MANCHE NUMERIQUE']},
|
|
|
@@ -104,8 +102,8 @@ class Equipement(QgsModel):
|
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
pk = "EQ_CODE"
|
|
|
- schema = {'EQ_CODE': {'type': 'string', 'maxlength': 18},
|
|
|
- 'EQ_NOM': {'type': 'string', 'maxlength': 10, 'contains_any_of': ['PBO', 'BPE', 'BAI']},
|
|
|
+ schema = {'EQ_CODE': {'type': 'string', 'maxlength': 30},
|
|
|
+ 'EQ_NOM': {'type': 'string', 'maxlength': 30, 'contains_any_of': ['PBO', 'BPE', 'BAI']},
|
|
|
'EQ_NOM_NOE': {'type': 'string', 'maxlength': 30},
|
|
|
'EQ_REF': {'type': 'string', 'maxlength': 100},
|
|
|
'EQ_EMPRISE': {'type': 'string', 'maxlength': 7},
|
|
|
@@ -135,7 +133,7 @@ class Noeud(QgsModel):
|
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
pk = "NO_CODE"
|
|
|
- schema = {'NO_CODE': {'type': 'string', 'maxlength': 18},
|
|
|
+ schema = {'NO_CODE': {'type': 'string', 'maxlength': 30},
|
|
|
'NO_NOM': {'type': 'string', 'maxlength': 30},
|
|
|
'NO_ID_INSE': {'type': 'string', 'empty': False, 'regex': r'50\d{3}'},
|
|
|
'NO_VOIE': {'type': 'string', 'maxlength': 100},
|
|
|
@@ -170,8 +168,8 @@ class Tranchee(QgsModel):
|
|
|
crs = CRS
|
|
|
bounding_box = (XMIN,YMIN,XMAX,YMAX)
|
|
|
pk = "TR_CODE"
|
|
|
- schema = {'TR_CODE': {'type': 'string', 'maxlength': 23},
|
|
|
- 'TR_NOM': {'type': 'string', 'maxlength': 23},
|
|
|
+ schema = {'TR_CODE': {'type': 'string', 'maxlength': 30},
|
|
|
+ 'TR_NOM': {'type': 'string', 'maxlength': 30},
|
|
|
'TR_ID_INSE': {'type': 'string', 'empty': False, 'regex': r'50\d{3}'},
|
|
|
'TR_VOIE': {'type': 'string', 'maxlength': 200},
|
|
|
'TR_TYP_IMP': {'type': 'string', 'empty': False, 'allowed': ['ACCOTEMENT STABILISE', 'ACCOTEMENT NON STABILISE', 'CHAUSSEE LOURDE', 'CHAUSSEE LEGERE', 'FOSSE', 'TROTTOIR', 'ESPACE VERT', 'ENCORBELLEMENT']},
|
|
|
@@ -210,39 +208,40 @@ models = [Artere, Cable, Equipement, Noeud, Tranchee, Zapbo]
|
|
|
|
|
|
|
|
|
class Mn2Checker(BaseChecker):
|
|
|
-
|
|
|
+
|
|
|
def test_load_layers(self):
|
|
|
""" Chargement des données
|
|
|
Contrôle la présence des couches attendues
|
|
|
"""
|
|
|
-
|
|
|
+
|
|
|
self.dataset = {}
|
|
|
-
|
|
|
+
|
|
|
for model in models:
|
|
|
layername = model.layername
|
|
|
-
|
|
|
+
|
|
|
try:
|
|
|
layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == layername.lower()))
|
|
|
except StopIteration:
|
|
|
self.log_critical("Couche manquante", model=model)
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
if model.pk:
|
|
|
if not model.pk.lower() in [f.name().lower() for f in layer.fields()]:
|
|
|
self.log_critical(f"Clef primaire manquante ({model.pk})", model=model)
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
model.layer = layer
|
|
|
-
|
|
|
+
|
|
|
self.dataset[model] = [model(f) for f in layer.getFeatures()]
|
|
|
-
|
|
|
+
|
|
|
self.arteres = self.dataset.get(Artere, [])
|
|
|
self.cables = self.dataset.get(Cable, [])
|
|
|
self.equipements = self.dataset.get(Equipement, [])
|
|
|
self.noeuds = self.dataset.get(Noeud, [])
|
|
|
self.tranchees = self.dataset.get(Tranchee, [])
|
|
|
self.zapbos = self.dataset.get(Zapbo, [])
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
def test_scr(self):
|
|
|
""" Contrôle des projections
|
|
|
Vérifie que les couches ont le bon sytème de projection
|
|
|
@@ -250,59 +249,59 @@ class Mn2Checker(BaseChecker):
|
|
|
for model in models:
|
|
|
if model.layer.crs().authid() != model.crs:
|
|
|
self.log_error(f"Mauvaise projection (attendu: {model.crs})", model=model)
|
|
|
-
|
|
|
+
|
|
|
def _validate_structure(self, model, items):
|
|
|
v = CerberusValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
|
|
|
-
|
|
|
+
|
|
|
for item in items:
|
|
|
v.validate(item.__dict__)
|
|
|
-
|
|
|
+
|
|
|
for field, verrors in v.errors.items():
|
|
|
for err in verrors:
|
|
|
self.log_error(f"{field} : {_translate_messages(err)}", item=item)
|
|
|
-
|
|
|
+
|
|
|
def test_structure_arteres(self):
|
|
|
""" Structure des données: Artères
|
|
|
Contrôle les données attributaires de la couche ARTERE_GEO:
|
|
|
présence, format, valeurs autorisées...
|
|
|
"""
|
|
|
self._validate_structure(Artere, self.arteres)
|
|
|
-
|
|
|
+
|
|
|
def test_structure_cables(self):
|
|
|
""" Structure des données: Cables
|
|
|
Contrôle les données attributaires de la couche CABLE_GEO:
|
|
|
présence, format, valeurs autorisées...
|
|
|
"""
|
|
|
self._validate_structure(Cable, self.cables)
|
|
|
-
|
|
|
+
|
|
|
def test_structure_equipements(self):
|
|
|
""" Structure des données: Equipements
|
|
|
Contrôle les données attributaires de la couche EQUIPEMENT_GEO:
|
|
|
présence, format, valeurs autorisées...
|
|
|
"""
|
|
|
self._validate_structure(Equipement, self.equipements)
|
|
|
-
|
|
|
+
|
|
|
def test_structure_noeuds(self):
|
|
|
""" Structure des données: Noeuds
|
|
|
Contrôle les données attributaires de la couche NOEUD_GEO:
|
|
|
présence, format, valeurs autorisées...
|
|
|
"""
|
|
|
self._validate_structure(Noeud, self.noeuds)
|
|
|
-
|
|
|
+
|
|
|
def test_structure_tranchees(self):
|
|
|
""" Structure des données: Tranchées
|
|
|
Contrôle les données attributaires de la couche TRANCHEE_GEO:
|
|
|
présence, format, valeurs autorisées...
|
|
|
"""
|
|
|
self._validate_structure(Tranchee, self.tranchees)
|
|
|
-
|
|
|
+
|
|
|
def test_structure_zapbos(self):
|
|
|
""" Structure des données: Zapbos
|
|
|
Contrôle les données attributaires de la couche ZAPBO_GEO:
|
|
|
présence, format, valeurs autorisées...
|
|
|
"""
|
|
|
self._validate_structure(Zapbo, self.zapbos)
|
|
|
-
|
|
|
+
|
|
|
def test_geometry_validity(self):
|
|
|
""" Contrôle de la validité des géométries
|
|
|
"""
|
|
|
@@ -310,37 +309,39 @@ class Mn2Checker(BaseChecker):
|
|
|
for item in self.dataset[model]:
|
|
|
if not item.is_geometry_valid():
|
|
|
self.log_error("La géométrie de l'objet est invalide", item=item)
|
|
|
-
|
|
|
+
|
|
|
def test_geometry_type(self):
|
|
|
""" Contrôle des types de géométries
|
|
|
"""
|
|
|
for model in models:
|
|
|
for item in self.dataset[model]:
|
|
|
- geom_type = item.get_geom_type()
|
|
|
- if geom_type != model.geom_type:
|
|
|
- self.log_error(f"Type de géométrie invalide (attendu: {QgsModel.GEOM_NAMES[geom_type]})", item=item)
|
|
|
-
|
|
|
+ item_geom_type = item.get_geom_type()
|
|
|
+ if item_geom_type != model.geom_type:
|
|
|
+ self.log_error(f"Type de géométrie invalide (attendu: {QgsModel.GEOM_NAMES[model.geom_type]}, trouvé: {QgsModel.GEOM_NAMES[item_geom_type]})", item=item)
|
|
|
+
|
|
|
def test_bounding_box(self):
|
|
|
""" Contrôle des emprises
|
|
|
Vérifie que les objets sont dans le périmètre attendu
|
|
|
"""
|
|
|
-
|
|
|
+
|
|
|
for model in models:
|
|
|
xmin, ymin, xmax, ymax = model.bounding_box
|
|
|
-
|
|
|
+
|
|
|
for item in self.dataset[model]:
|
|
|
-
|
|
|
+ if not item.is_geometry_valid():
|
|
|
+ continue
|
|
|
+
|
|
|
x1, y1, x2, y2 = item.get_bounding_box()
|
|
|
-
|
|
|
+
|
|
|
if any(x < xmin or x > xmax for x in (x1, x2)) or \
|
|
|
any(y < ymin or y > ymax for y in (y1, y2)):
|
|
|
self.log_error("Hors de l'emprise autorisée", item=item)
|
|
|
-
|
|
|
+
|
|
|
def test_duplicates(self):
|
|
|
""" Recherche de doublons
|
|
|
Recherche d'éventuels doublons dans des champs qui supposent l'unicité
|
|
|
"""
|
|
|
-
|
|
|
+
|
|
|
tmp = []
|
|
|
for noeud in self.noeuds:
|
|
|
if not noeud.NO_NOM:
|
|
|
@@ -349,7 +350,7 @@ class Mn2Checker(BaseChecker):
|
|
|
tmp.append(noeud.NO_NOM)
|
|
|
else:
|
|
|
self.log_error("Doublons dans le champs NO_NOM", item=noeud)
|
|
|
-
|
|
|
+
|
|
|
tmp = []
|
|
|
for equipement in self.equipements:
|
|
|
if not equipement.EQ_NOM:
|
|
|
@@ -358,7 +359,7 @@ class Mn2Checker(BaseChecker):
|
|
|
tmp.append(equipement.EQ_NOM)
|
|
|
else:
|
|
|
self.log_error("Doublons dans le champs EQ_NOM", item=equipement)
|
|
|
-
|
|
|
+
|
|
|
tmp = []
|
|
|
for zapbo in self.zapbos:
|
|
|
if not zapbo.ID_ZAPBO:
|
|
|
@@ -367,249 +368,232 @@ class Mn2Checker(BaseChecker):
|
|
|
tmp.append(zapbo.ID_ZAPBO)
|
|
|
else:
|
|
|
self.log_error("Doublons dans le champs ID_ZAPBO", item=zapbo)
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
def test_constraints_arteres_noeuds(self):
|
|
|
""" Application des contraintes: Artères / Noeuds
|
|
|
Vérifie que les noeuds attachés aux artères existent
|
|
|
"""
|
|
|
-
|
|
|
+
|
|
|
for artere in self.arteres:
|
|
|
try:
|
|
|
- artere.noeud_a = next((n for n in self.noeuds if n.NO_NOM == artere.AR_NOEUD_A))
|
|
|
+ artere.noeud_a = next((n for n in self.noeuds if n.NO_CODE == artere.AR_NOEUD_A))
|
|
|
except StopIteration:
|
|
|
artere.noeud_a = None
|
|
|
self.log_error(f"Le noeud lié '{artere.AR_NOEUD_A}' n'existe pas", item=artere)
|
|
|
-
|
|
|
+
|
|
|
try:
|
|
|
- artere.noeud_b = next((n for n in self.noeuds if n.NO_NOM == artere.AR_NOEUD_B))
|
|
|
+ artere.noeud_b = next((n for n in self.noeuds if n.NO_CODE == artere.AR_NOEUD_B))
|
|
|
except StopIteration:
|
|
|
artere.noeud_b = None
|
|
|
self.log_error(f"Le noeud lié '{artere.AR_NOEUD_B}' n'existe pas", item=artere)
|
|
|
-
|
|
|
+
|
|
|
def test_constraints_cables_equipements(self):
|
|
|
""" Application des contraintes: Equipements / Cables
|
|
|
Vérifie que les équipements attachés aux cables existent """
|
|
|
-
|
|
|
+
|
|
|
for cable in self.cables:
|
|
|
try:
|
|
|
- cable.equipement_a = next((e for e in self.equipements if e.EQ_NOM == cable.CA_EQ_A))
|
|
|
+ cable.equipement_a = next((e for e in self.equipements if e.EQ_CODE == cable.CA_EQ_A))
|
|
|
except StopIteration:
|
|
|
cable.equipement_a = None
|
|
|
self.log_error(f"L'équipement lié '{cable.CA_EQ_A}' n'existe pas", item=cable)
|
|
|
-
|
|
|
+
|
|
|
try:
|
|
|
- cable.equipement_b = next((e for e in self.equipements if e.EQ_NOM == cable.CA_EQ_B))
|
|
|
+ cable.equipement_b = next((e for e in self.equipements if e.EQ_CODE == cable.CA_EQ_B))
|
|
|
except StopIteration:
|
|
|
cable.equipement_b = None
|
|
|
self.log_error(f"L'équipement lié '{cable.CA_EQ_B}' n'existe pas", item=cable)
|
|
|
-
|
|
|
+
|
|
|
def test_constraints_cables_equipements_b(self):
|
|
|
""" Application des contraintes: Equipements B
|
|
|
Vérifie que tous les équipements sont l'équipement B d'au moins un cable """
|
|
|
-
|
|
|
+
|
|
|
equipements_b = [cable.CA_EQ_B for cable in self.cables]
|
|
|
for equipement in self.equipements:
|
|
|
if equipement.EQ_TYPE == "BAI":
|
|
|
continue
|
|
|
if not equipement.EQ_NOM in equipements_b:
|
|
|
- self.log_error(f"L'equipement lié '{equipement.EQ_NOM}' n'est l'équipement B d'aucun cable", item=equipement)
|
|
|
-
|
|
|
-
|
|
|
+ self.log_error(f"L'equipement '{equipement.EQ_NOM}' n'est l'équipement B d'aucun cable et n'est pas un PM", item=equipement)
|
|
|
+
|
|
|
+
|
|
|
def test_constraints_equipements_noeuds(self):
|
|
|
""" Application des contraintes: Noeuds / Equipements
|
|
|
Vérifie que les noeuds attachés aux équipements existent
|
|
|
"""
|
|
|
for equipement in self.equipements:
|
|
|
try:
|
|
|
- equipement.noeud = next((n for n in self.noeuds if n.NO_NOM == equipement.EQ_NOM_NOE))
|
|
|
+ equipement.noeud = next((n for n in self.noeuds if n.NO_CODE == equipement.EQ_NOM_NOE))
|
|
|
except StopIteration:
|
|
|
equipement.noeud = None
|
|
|
self.log_error(f"Le noeud lié '{equipement.EQ_NOM_NOE}' n'existe pas", item=equipement)
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
def test_graphic_duplicates(self):
|
|
|
""" Recherche de doublons graphiques """
|
|
|
-
|
|
|
+
|
|
|
for i, tranchee in enumerate(self.tranchees):
|
|
|
for other in self.tranchees[i+1:]:
|
|
|
if tranchee.geom == other.geom:
|
|
|
self.log_error("Une entité graphique est dupliquée", item=tranchee)
|
|
|
-
|
|
|
+
|
|
|
for i, artere in enumerate(self.arteres):
|
|
|
for other in self.arteres[i+1:]:
|
|
|
if artere.geom == other.geom:
|
|
|
self.log_error("Une entité graphique est dupliquée", item=artere)
|
|
|
-
|
|
|
+
|
|
|
for i, cable in enumerate(self.cables):
|
|
|
for other in self.cables[i+1:]:
|
|
|
if cable.geom == other.geom and cable.CA_EQ_A == other.CA_EQ_A and cable.CA_EQ_B == other.CA_EQ_B:
|
|
|
self.log_error("Une entité graphique est dupliquée", item=cable)
|
|
|
-
|
|
|
+
|
|
|
for i, noeud in enumerate(self.noeuds):
|
|
|
for other in self.noeuds[i+1:]:
|
|
|
if noeud.geom == other.geom:
|
|
|
self.log_error("Une entité graphique est dupliquée", item=noeud)
|
|
|
-
|
|
|
+
|
|
|
for i, zapbo in enumerate(self.zapbos):
|
|
|
for other in self.zapbos[i+1:]:
|
|
|
if zapbo.geom == other.geom:
|
|
|
self.log_error("Une entité graphique est dupliquée", item=zapbo)
|
|
|
-
|
|
|
+
|
|
|
def test_positions_noeuds(self):
|
|
|
""" Topologie: Noeuds / Artères
|
|
|
Compare la géométrie des noeuds à celle des artères
|
|
|
"""
|
|
|
-
|
|
|
+
|
|
|
for artere in self.arteres:
|
|
|
if not artere.noeud_a or not artere.noeud_b:
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
artere_points = artere.get_points()
|
|
|
noeud_a_point = artere.noeud_a.get_points()[0]
|
|
|
noeud_b_point = artere.noeud_b.get_points()[0]
|
|
|
-
|
|
|
- if not any(((artere_points[0].distanceSquared(noeud_a_point) <= TOLERANCE and \
|
|
|
- artere_points[-1].distanceSquared(noeud_b_point) <= TOLERANCE),
|
|
|
- (artere_points[0].distanceSquared(noeud_b_point) <= TOLERANCE and \
|
|
|
- artere_points[-1].distanceSquared(noeud_a_point) <= TOLERANCE))):
|
|
|
- self.log_error("Pas de noeud aux coordonnées attendues", item=artere)
|
|
|
-
|
|
|
+
|
|
|
+ if not (artere_points[0].distanceSquared(noeud_a_point) <= TOLERANCE and \
|
|
|
+ artere_points[-1].distanceSquared(noeud_b_point) <= TOLERANCE) \
|
|
|
+ and not (artere_points[-1].distanceSquared(noeud_a_point) <= TOLERANCE and \
|
|
|
+ artere_points[0].distanceSquared(noeud_b_point) <= TOLERANCE):
|
|
|
+ self.log_error(f"Pas de noeud aux coordonnées attendues (noeuds attendus: {artere.noeud_a.NO_CODE} ou {artere.noeud_b.NO_CODE})", item=artere)
|
|
|
+
|
|
|
def test_positions_equipements(self):
|
|
|
""" Topologie: Equipements / Cables
|
|
|
Compare la géométrie des équipements à celle des cables """
|
|
|
-
|
|
|
+
|
|
|
for cable in self.cables:
|
|
|
if not cable.equipement_a or not cable.equipement_b or not cable.is_geometry_valid() or not cable.equipement_a.noeud or not cable.equipement_b.noeud:
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
#! attention: on utilise la géométrie du noeud associé à l'équipement, pas celle de l'équipement lui-même
|
|
|
cable_points = cable.get_points()
|
|
|
equip_a_point = cable.equipement_a.noeud.get_points()[0]
|
|
|
equip_b_point = cable.equipement_b.noeud.get_points()[0]
|
|
|
-
|
|
|
- if not any(((cable_points[0].distanceSquared(equip_a_point) <= TOLERANCE and \
|
|
|
- cable_points[-1].distanceSquared(equip_b_point) <= TOLERANCE),
|
|
|
- (cable_points[0].distanceSquared(equip_b_point) <= TOLERANCE and \
|
|
|
- cable_points[-1].distanceSquared(equip_a_point) <= TOLERANCE))):
|
|
|
-
|
|
|
- self.log_error("Pas d'équipement aux coordonnées attendues", item=cable)
|
|
|
-
|
|
|
+
|
|
|
+ if not (cable_points[0].distanceSquared(equip_a_point) <= TOLERANCE and \
|
|
|
+ cable_points[-1].distanceSquared(equip_b_point) <= TOLERANCE) \
|
|
|
+ and not (cable_points[-1].distanceSquared(equip_a_point) <= TOLERANCE and \
|
|
|
+ cable_points[0].distanceSquared(equip_b_point) <= TOLERANCE):
|
|
|
+
|
|
|
+ self.log_error(f"Pas d'équipement aux coordonnées attendues (equipements attendus: {cable.equipement_a.EQ_CODE} ou {cable.equipement_b.EQ_CODE})", item=cable)
|
|
|
+
|
|
|
def test_tranchee_artere(self):
|
|
|
""" Topologie: Tranchées / Artères
|
|
|
Compare la géométrie des tranchées à celle des artères """
|
|
|
-
|
|
|
+
|
|
|
arteres_full_buffer = Artere.full_buffer(TOLERANCE)
|
|
|
-
|
|
|
- if not arteres_full_buffer.isGeosValid():
|
|
|
- raise ValueError("Buffer: géométrie invalide")
|
|
|
-
|
|
|
+
|
|
|
for tranchee in self.tranchees:
|
|
|
if not arteres_full_buffer.contains(tranchee.geom):
|
|
|
self.log_error("Tranchée ou portion de tranchée sans artère", item=tranchee)
|
|
|
-
|
|
|
+
|
|
|
def test_cable_artere(self):
|
|
|
""" Topologie: Cables / Artères
|
|
|
Compare la géométrie des cables à celle des artères """
|
|
|
-
|
|
|
+
|
|
|
# Vérifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
|
arteres_full_buffer = Artere.full_buffer(TOLERANCE)
|
|
|
-
|
|
|
- if not arteres_full_buffer.isGeosValid():
|
|
|
- raise ValueError("Buffer: géométrie invalide")
|
|
|
-
|
|
|
+
|
|
|
for cable in self.cables:
|
|
|
if "baguette" in cable.CA_COMMENT.lower() or not cable.is_geometry_valid():
|
|
|
continue
|
|
|
if not arteres_full_buffer.contains(cable.geom):
|
|
|
self.log_error("Cable ou portion de cable sans artère", item=cable)
|
|
|
-
|
|
|
+
|
|
|
def test_artere_cable(self):
|
|
|
""" Topologie: Artères / Cables
|
|
|
Compare la géométrie des artères à celle des cables """
|
|
|
-
|
|
|
+
|
|
|
# Vérifie que chaque artère a au moins un cable (sauf si commentaire contient un de ces mots 'racco client adductio attente bus 'sans cable'')
|
|
|
cables_full_buffer = Cable.full_buffer(TOLERANCE)
|
|
|
-
|
|
|
- if not cables_full_buffer.isGeosValid():
|
|
|
- raise ValueError("Buffer: géométrie invalide")
|
|
|
-
|
|
|
+
|
|
|
for artere in self.arteres:
|
|
|
if any(x in artere.AR_COMMENT.lower() for x in ['racco','client','adductio','attente','bus','sans cable']):
|
|
|
continue
|
|
|
if not cables_full_buffer.contains(artere.geom):
|
|
|
self.log_error("Artère ou portion d'artère sans cable", item=artere)
|
|
|
-
|
|
|
+
|
|
|
def test_dimensions_fourreaux(self):
|
|
|
""" Dimensions logiques: fourreaux
|
|
|
Vérifie que les nombres de fourreaux renseignés sont cohérents """
|
|
|
-
|
|
|
+
|
|
|
for artere in self.arteres:
|
|
|
try:
|
|
|
if not int(artere.AR_FOU_DIS) <= int(artere.AR_NB_FOUR):
|
|
|
self.log_error("Le nombre de fourreaux disponibles (AR_FOU_DIS) doit être inférieur au nombre total (AR_NB_FOUR)", item=artere)
|
|
|
except (TypeError, ValueError):
|
|
|
pass
|
|
|
-
|
|
|
- for cable in self.cables:
|
|
|
- try:
|
|
|
- if not int(cable.CA_NB_FO_U) <= int(cable.CA_NB_FO):
|
|
|
- self.log_error("Le nombre de fourreaux utilisés (CA_NB_FO_U) doit être inférieur au nombre total (CA_NB_FO)", item=cable)
|
|
|
- if not int(cable.CA_NB_FO_D) <= int(cable.CA_NB_FO):
|
|
|
- self.log_error("Le nombre de fourreaux disponibles (CA_NB_FO_D) doit être inférieur au nombre total (CA_NB_FO)", item=cable)
|
|
|
- except (TypeError, ValueError):
|
|
|
- pass
|
|
|
-
|
|
|
+
|
|
|
def test_pbos(self):
|
|
|
""" Topologie: PBO / ZAPBO
|
|
|
Compare la géométrie et le nom des équipements de type PBO à celle des ZAPBO
|
|
|
"""
|
|
|
-
|
|
|
+
|
|
|
# Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
|
for equipement in self.equipements:
|
|
|
if not equipement.EQ_TYPE == "PBO":
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
#zapbos englobant l'equipement
|
|
|
candidates = []
|
|
|
for zapbo in self.zapbos:
|
|
|
if zapbo.geom.contains(equipement.geom):
|
|
|
candidates.append(zapbo)
|
|
|
-
|
|
|
+
|
|
|
# le pbo doit être contenu dans une zapbo
|
|
|
if not candidates:
|
|
|
self.log_error("Le PBO n'est contenu dans aucune ZAPBO", item=equipement)
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
# On se base sur le nom pour trouver la zapbo correspondante
|
|
|
try:
|
|
|
- equipement.zapbo = next((z for z in candidates if equipement.EQ_NOM in z.ID_ZAPBO))
|
|
|
+ equipement.zapbo = next((z for z in candidates if equipement.EQ_CODE in z.ID_ZAPBO))
|
|
|
except StopIteration:
|
|
|
self.log_error("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contiennent", item=equipement)
|
|
|
break
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
# a venir (webservice?)
|
|
|
def __test_pbo_dimension(self):
|
|
|
""" Dimensionnement des PBO """
|
|
|
for equipement in self.equipements:
|
|
|
if not equipement.EQ_TYPE == "PBO":
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
if not hasattr(equipement.zapbo, "nb_prises") or equipement.zapbo.nb_prises is None:
|
|
|
equipement.zapbo.nb_prises = 0
|
|
|
-
|
|
|
+
|
|
|
# Controle du dimensionnement des PBO
|
|
|
if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
|
|
|
self.log_error("Le PBO 6 contient plus de 5 prises", item=equipement)
|
|
|
-
|
|
|
+
|
|
|
if equipement.EQ_TYPE_PH == 'PBO 12' and not equipement.zapbo.nb_prises >= 6 and equipement.zapbo.nb_prises <= 8:
|
|
|
self.log_error("Le PBO 12 contient mois de 6 prises ou plus de 8 prises", item=equipement)
|
|
|
-
|
|
|
+
|
|
|
if equipement.zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
|
|
|
self.log_error("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO", item=equipement)
|
|
|
-
|
|
|
+
|
|
|
if equipement.EQ_STATUT == "REC" and not equipement.zapbo.STATUT == "REC" and not equipement.zapbo.ID_ZAPBO[:4].lower() == "att_":
|
|
|
self.log_error("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO", item=equipement)
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
checkers = [Mn2Checker]
|