|
@@ -6,14 +6,17 @@
|
|
|
'''
|
|
'''
|
|
|
|
|
|
|
|
import logging
|
|
import logging
|
|
|
-from qgis.core import QgsGeometry
|
|
|
|
|
|
|
+from qgis.core import QgsProject, QgsGeometry, QgsWkbTypes
|
|
|
|
|
|
|
|
from core.cerberus_ import is_float, is_multi_int, is_int, \
|
|
from core.cerberus_ import is_float, is_multi_int, is_int, \
|
|
|
- is_modern_french_date
|
|
|
|
|
|
|
+ is_modern_french_date, CerberusValidator, CerberusErrorHandler, \
|
|
|
|
|
+ _translate_messages
|
|
|
|
|
+from core.checker import BaseChecker
|
|
|
from core.validation_errors import UniqueError, RelationError, \
|
|
from core.validation_errors import UniqueError, RelationError, \
|
|
|
DuplicatedGeom, MissingItem, DimensionError, TechnicalValidationError
|
|
DuplicatedGeom, MissingItem, DimensionError, TechnicalValidationError
|
|
|
from core.validator import QgsModel, BaseValidator
|
|
from core.validator import QgsModel, BaseValidator
|
|
|
|
|
|
|
|
|
|
+
|
|
|
logger = logging.getLogger("mncheck")
|
|
logger = logging.getLogger("mncheck")
|
|
|
|
|
|
|
|
|
|
|
|
@@ -177,119 +180,224 @@ class Zapbo(QgsModel):
|
|
|
def __repr__(self):
|
|
def __repr__(self):
|
|
|
return "Zapbo {}".format(self.ID_ZAPBO)
|
|
return "Zapbo {}".format(self.ID_ZAPBO)
|
|
|
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+models = [Artere, Cable, Equipement, Noeud, Tranchee, Zapbo]
|
|
|
|
|
|
|
|
####### Validateur
|
|
####### Validateur
|
|
|
|
|
+
|
|
|
|
|
+class Mn1Checker(BaseChecker):
|
|
|
|
|
|
|
|
|
|
+ def __init__(self):
|
|
|
|
|
+ super().__init__()
|
|
|
|
|
|
|
|
-class Validator(BaseValidator):
|
|
|
|
|
- schema_name = "Mn1 REC"
|
|
|
|
|
- models = [Artere, Cable, Equipement, Noeud, Tranchee, Zapbo]
|
|
|
|
|
|
|
+ def test_load_layers(self):
|
|
|
|
|
+ """ Chargement des données """
|
|
|
|
|
+
|
|
|
|
|
+ for model in models:
|
|
|
|
|
+ layername = model.layername
|
|
|
|
|
+
|
|
|
|
|
+ layer = next((l for l in QgsProject.instance().mapLayers().values() if l.name().lower() == layername.lower()))
|
|
|
|
|
+ if not layer:
|
|
|
|
|
+ self.log_error("Couche manquante", layername=layername, critical=True)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ model.layer = layer
|
|
|
|
|
+
|
|
|
|
|
+ self.arteres = list(Artere.layer.getFeatures())
|
|
|
|
|
+ self.cables = list(Cable.layer.getFeatures())
|
|
|
|
|
+# self.equipements = list(Equipement.layer.getFeatures())
|
|
|
|
|
+# self.noeuds = list(Noeud.layer.getFeatures())
|
|
|
|
|
+ self.tranchees = list(Tranchee.layer.getFeatures())
|
|
|
|
|
+# self.zapbos = list(Zapbo.layer.getFeatures())
|
|
|
|
|
+
|
|
|
|
|
+ self.dataset = {Artere: self.arteres,
|
|
|
|
|
+ Cable: self.cables,
|
|
|
|
|
+ Equipement: self.equipements,
|
|
|
|
|
+ Noeud: self.noeuds,
|
|
|
|
|
+ Tranchee: self.tranchees,
|
|
|
|
|
+ Zapbo: self.zapbos}
|
|
|
|
|
|
|
|
- def _technical_validation(self):
|
|
|
|
|
|
|
+ def test_scr(self):
|
|
|
|
|
+ """ Contrôle des projections """
|
|
|
|
|
+ for model in models:
|
|
|
|
|
+ if model.layer.crs().authid() != model.crs:
|
|
|
|
|
+ self.log_error(f"Mauvaise projection (attendu: {model.crs})", layername=model.layername)
|
|
|
|
|
+
|
|
|
|
|
+ def _validate_structure(self, model, features):
|
|
|
|
|
+ v = CerberusValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
|
|
|
|
|
+ for feature in features:
|
|
|
|
|
+ attributes = dict(zip([f.name() for f in feature.fields()], feature.attributes()))
|
|
|
|
|
+ v.validate(attributes)
|
|
|
|
|
+
|
|
|
|
|
+ for field, verrors in v.errors.items():
|
|
|
|
|
+ for err in verrors:
|
|
|
|
|
+ self.log_error(_translate_messages(err), layername=model.layername, field=field)
|
|
|
|
|
+
|
|
|
|
|
+ def test_structure_arteres(self):
|
|
|
|
|
+ """ Structure des données: Artères """
|
|
|
|
|
+ self._validate_structure(Artere, self.arteres)
|
|
|
|
|
+
|
|
|
|
|
+ def test_structure_cables(self):
|
|
|
|
|
+ """ Structure des données: Cables """
|
|
|
|
|
+ self._validate_structure(Cable, self.cables)
|
|
|
|
|
+
|
|
|
|
|
+ def test_structure_equipements(self):
|
|
|
|
|
+ """ Structure des données: Equipements """
|
|
|
|
|
+ self._validate_structure(Equipement, self.equipements)
|
|
|
|
|
+
|
|
|
|
|
+ def test_structure_noeuds(self):
|
|
|
|
|
+ """ Structure des données: Noeuds """
|
|
|
|
|
+ self._validate_structure(Noeud, self.noeuds)
|
|
|
|
|
+
|
|
|
|
|
+ def test_structure_tranchees(self):
|
|
|
|
|
+ """ Structure des données: Tranchées """
|
|
|
|
|
+ self._validate_structure(Tranchee, self.tranchees)
|
|
|
|
|
+
|
|
|
|
|
+ def test_structure_zapbos(self):
|
|
|
|
|
+ """ Structure des données: Zapbos """
|
|
|
|
|
+ self._validate_structure(Zapbo, self.zapbos)
|
|
|
|
|
+
|
|
|
|
|
+ def test_geometry_validity(self):
|
|
|
|
|
+ """ Contrôle de la validité des géométries """
|
|
|
|
|
+ for model in models:
|
|
|
|
|
+ for f in self.dataset[model]:
|
|
|
|
|
+ if not f.geometry().isGeosValid():
|
|
|
|
|
+ self.log_error("La géométrie de l'objet est invalide", layername=model.layername, field="geom")
|
|
|
|
|
+
|
|
|
|
|
+ def test_geometry_type(self):
|
|
|
|
|
+ for model in models:
|
|
|
|
|
+ for f in self.dataset[model]:
|
|
|
|
|
+ if QgsWkbTypes.singleType(f.geometry().wkbType()) != model.geom_type:
|
|
|
|
|
+ self.log_error("Type de géométrie invalide (attendu: {})".format(QgsModel.GEOM_NAMES[model.geom_type]), layername=model.layername, field="geom")
|
|
|
|
|
+
|
|
|
|
|
+ def test_bounding_box(self):
|
|
|
|
|
+
|
|
|
|
|
+ for model in models:
|
|
|
|
|
+ xmin, ymin, xmax, ymax = model.bounding_box
|
|
|
|
|
+
|
|
|
|
|
+ for f in self.dataset[model]:
|
|
|
|
|
+
|
|
|
|
|
+ bb = f.geometry().boundingBox()
|
|
|
|
|
+ x1, y1, x2, y2 = (bb.xMinimum(), bb.yMinimum(), bb.xMaximum(), bb.yMaximum())
|
|
|
|
|
+
|
|
|
|
|
+ if any(x < xmin or x > xmax for x in (x1, x2)) or \
|
|
|
|
|
+ any(y < ymin or y > ymax for y in (y1, y2)):
|
|
|
|
|
+ self.log_error("Hors de l'emprise autorisée", layername=model.layername, field="geom")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def test_duplicates(self):
|
|
|
|
|
+ """ Recherche de doublons """
|
|
|
|
|
|
|
|
- # construction des index
|
|
|
|
|
- arteres = self.dataset[Artere]
|
|
|
|
|
- cables = self.dataset[Cable]
|
|
|
|
|
- tranchees = self.dataset[Tranchee]
|
|
|
|
|
|
|
+ # TODO: séparer le chargement des données et le contrôle des doublons
|
|
|
|
|
|
|
|
- noeuds = {}
|
|
|
|
|
|
|
+ self.noeuds = {}
|
|
|
for noeud in self.dataset[Noeud]:
|
|
for noeud in self.dataset[Noeud]:
|
|
|
- if not noeud.NO_NOM in noeuds:
|
|
|
|
|
- noeuds[noeud.NO_NOM] = noeud
|
|
|
|
|
|
|
+ if not noeud.NO_NOM in self.noeuds:
|
|
|
|
|
+ self.noeuds[noeud.NO_NOM] = noeud
|
|
|
else:
|
|
else:
|
|
|
- self.log_error(UniqueError("Doublons dans le champs: {}".format(noeud), layername=Noeud.layername, field="NO_NOM"))
|
|
|
|
|
|
|
+ self.log_error("Doublons dans le champs: {}".format(noeud), layername=Noeud.layername, field="NO_NOM")
|
|
|
|
|
|
|
|
- equipements = {}
|
|
|
|
|
|
|
+ self.equipements = {}
|
|
|
for equipement in self.dataset[Equipement]:
|
|
for equipement in self.dataset[Equipement]:
|
|
|
- if not equipement.EQ_NOM in equipements:
|
|
|
|
|
- equipements[equipement.EQ_NOM] = equipement
|
|
|
|
|
|
|
+ if not equipement.EQ_NOM in self.equipements:
|
|
|
|
|
+ self.equipements[equipement.EQ_NOM] = equipement
|
|
|
else:
|
|
else:
|
|
|
- self.log_error(UniqueError("Doublons dans le champs: {}".format(equipement), layername=Equipement.layername, field="EQ_NOM"))
|
|
|
|
|
|
|
+ self.log_error("Doublons dans le champs: {}".format(equipement), layername=Equipement.layername, field="EQ_NOM")
|
|
|
|
|
|
|
|
- zapbos = {}
|
|
|
|
|
|
|
+ self.zapbos = {}
|
|
|
for zapbo in self.dataset[Zapbo]:
|
|
for zapbo in self.dataset[Zapbo]:
|
|
|
- if not zapbo.ID_ZAPBO in zapbos:
|
|
|
|
|
- zapbos[zapbo.ID_ZAPBO] = zapbo
|
|
|
|
|
|
|
+ if not zapbo.ID_ZAPBO in self.zapbos:
|
|
|
|
|
+ self.zapbos[zapbo.ID_ZAPBO] = zapbo
|
|
|
else:
|
|
else:
|
|
|
- self.log_error(UniqueError("Doublons dans le champs: {}".format(zapbo), layername=Zapbo.layername, field="ID_ZAPBO"))
|
|
|
|
|
|
|
+ self.log_error("Doublons dans le champs: {}".format(zapbo), layername=Zapbo.layername, field="ID_ZAPBO")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def test_constraints(self):
|
|
|
|
|
+ """ Contrôle des contraintes relationnelles """
|
|
|
|
|
|
|
|
# rattachement les noeuds aux artères
|
|
# rattachement les noeuds aux artères
|
|
|
- for artere in arteres:
|
|
|
|
|
|
|
+ for artere in self.arteres:
|
|
|
try:
|
|
try:
|
|
|
- artere.noeud_a = noeuds[artere.AR_NOEUD_A]
|
|
|
|
|
|
|
+ artere.noeud_a = self.noeuds[artere.AR_NOEUD_A]
|
|
|
except KeyError:
|
|
except KeyError:
|
|
|
artere.noeud_a = None
|
|
artere.noeud_a = None
|
|
|
self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_A), layername=Artere.layername, field="AR_NOEUD_A"))
|
|
self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_A), layername=Artere.layername, field="AR_NOEUD_A"))
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
- artere.noeud_b = noeuds[artere.AR_NOEUD_B]
|
|
|
|
|
|
|
+ artere.noeud_b = self.noeuds[artere.AR_NOEUD_B]
|
|
|
except KeyError:
|
|
except KeyError:
|
|
|
artere.noeud_b = None
|
|
artere.noeud_b = None
|
|
|
self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_B), layername=Artere.layername, field="AR_NOEUD_A"))
|
|
self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_B), layername=Artere.layername, field="AR_NOEUD_A"))
|
|
|
|
|
|
|
|
# rattachement des equipements aux cables
|
|
# rattachement des equipements aux cables
|
|
|
- for cable in cables:
|
|
|
|
|
|
|
+ for cable in self.cables:
|
|
|
try:
|
|
try:
|
|
|
- cable.equipement_a = equipements[cable.CA_EQ_A]
|
|
|
|
|
|
|
+ cable.equipement_a = self.equipements[cable.CA_EQ_A]
|
|
|
except KeyError:
|
|
except KeyError:
|
|
|
cable.equipement_a = None
|
|
cable.equipement_a = None
|
|
|
self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_A), layername=Cable.layername, field="CA_EQ_A"))
|
|
self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_A), layername=Cable.layername, field="CA_EQ_A"))
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
- cable.equipement_b = equipements[cable.CA_EQ_B]
|
|
|
|
|
|
|
+ cable.equipement_b = self.equipements[cable.CA_EQ_B]
|
|
|
except KeyError:
|
|
except KeyError:
|
|
|
cable.equipement_b = None
|
|
cable.equipement_b = None
|
|
|
self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_B), layername=Cable.layername, field="CA_EQ_B"))
|
|
self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_B), layername=Cable.layername, field="CA_EQ_B"))
|
|
|
|
|
|
|
|
# rattachement des equipements aux noeuds
|
|
# rattachement des equipements aux noeuds
|
|
|
- for equipement in equipements.values():
|
|
|
|
|
|
|
+ for equipement in self.equipements.values():
|
|
|
try:
|
|
try:
|
|
|
- equipement.noeud = noeuds[equipement.EQ_NOM_NOE]
|
|
|
|
|
|
|
+ equipement.noeud = self.noeuds[equipement.EQ_NOM_NOE]
|
|
|
except KeyError:
|
|
except KeyError:
|
|
|
equipement.noeud = None
|
|
equipement.noeud = None
|
|
|
self.log_error(RelationError("Le noeud '{}' n'existe pas".format(equipement.EQ_NOM_NOE, equipement.EQ_NOM), layername=Equipement.layername, field="EQ_NOM_NOE"))
|
|
self.log_error(RelationError("Le noeud '{}' n'existe pas".format(equipement.EQ_NOM_NOE, equipement.EQ_NOM), layername=Equipement.layername, field="EQ_NOM_NOE"))
|
|
|
|
|
|
|
|
# verifie que tous les equipements sont l'equipement B d'au moins un cable
|
|
# verifie que tous les equipements sont l'equipement B d'au moins un cable
|
|
|
- equipements_b = [cable.CA_EQ_B for cable in cables]
|
|
|
|
|
- for eq_id in equipements:
|
|
|
|
|
- if equipements[eq_id].EQ_TYPE == "BAI":
|
|
|
|
|
|
|
+ equipements_b = [cable.CA_EQ_B for cable in self.cables]
|
|
|
|
|
+ for eq_id in self.equipements:
|
|
|
|
|
+ if self.equipements[eq_id].EQ_TYPE == "BAI":
|
|
|
continue
|
|
continue
|
|
|
if not eq_id in equipements_b:
|
|
if not eq_id in equipements_b:
|
|
|
self.log_error(RelationError("L'equipement '{}' n'est l'équipement B d'aucun cable".format(eq_id), layername=Equipement.layername, field="EQ_NOM"))
|
|
self.log_error(RelationError("L'equipement '{}' n'est l'équipement B d'aucun cable".format(eq_id), layername=Equipement.layername, field="EQ_NOM"))
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+ def test_graphic_duplicates(self):
|
|
|
|
|
+ """ Recherche de doublons graphiques """
|
|
|
|
|
+
|
|
|
# controle des doublons graphiques
|
|
# controle des doublons graphiques
|
|
|
- for i, tranchee in enumerate(tranchees):
|
|
|
|
|
- for other in tranchees[i+1:]:
|
|
|
|
|
|
|
+ for i, tranchee in enumerate(self.tranchees):
|
|
|
|
|
+ for other in self.tranchees[i+1:]:
|
|
|
if tranchee.geom == other.geom:
|
|
if tranchee.geom == other.geom:
|
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée".format(tranchee), layername=Tranchee.layername, field="geom"))
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée".format(tranchee), layername=Tranchee.layername, field="geom"))
|
|
|
|
|
|
|
|
- for i, artere in enumerate(arteres):
|
|
|
|
|
- for other in arteres[i+1:]:
|
|
|
|
|
|
|
+ for i, artere in enumerate(self.arteres):
|
|
|
|
|
+ for other in self.arteres[i+1:]:
|
|
|
if artere.geom == other.geom:
|
|
if artere.geom == other.geom:
|
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(artere), layername=Artere.layername, field="geom"))
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(artere), layername=Artere.layername, field="geom"))
|
|
|
|
|
|
|
|
- for i, cable in enumerate(cables):
|
|
|
|
|
- for other in cables[i+1:]:
|
|
|
|
|
|
|
+ for i, cable in enumerate(self.cables):
|
|
|
|
|
+ for other in self.cables[i+1:]:
|
|
|
if cable.geom == other.geom and cable.CA_EQ_A == other.CA_EQ_A and cable.CA_EQ_B == other.CA_EQ_B:
|
|
if cable.geom == other.geom and cable.CA_EQ_A == other.CA_EQ_A and cable.CA_EQ_B == other.CA_EQ_B:
|
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(cable), layername=Cable.layername, field="geom"))
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(cable), layername=Cable.layername, field="geom"))
|
|
|
|
|
|
|
|
- ls_noeuds = list(noeuds.values())
|
|
|
|
|
|
|
+ ls_noeuds = list(self.noeuds.values())
|
|
|
for i, noeud in enumerate(ls_noeuds):
|
|
for i, noeud in enumerate(ls_noeuds):
|
|
|
for other in ls_noeuds[i+1:]:
|
|
for other in ls_noeuds[i+1:]:
|
|
|
if noeud.geom == other.geom:
|
|
if noeud.geom == other.geom:
|
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(noeud), layername=Noeud.layername, field="geom"))
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(noeud), layername=Noeud.layername, field="geom"))
|
|
|
del ls_noeuds
|
|
del ls_noeuds
|
|
|
|
|
|
|
|
- ls_zapbos = list(zapbos.values())
|
|
|
|
|
|
|
+ ls_zapbos = list(self.zapbos.values())
|
|
|
for i, zapbo in enumerate(ls_zapbos):
|
|
for i, zapbo in enumerate(ls_zapbos):
|
|
|
for other in ls_zapbos[i+1:]:
|
|
for other in ls_zapbos[i+1:]:
|
|
|
if zapbo.geom == other.geom:
|
|
if zapbo.geom == other.geom:
|
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(zapbo), layername=Zapbo.layername, field="geom"))
|
|
self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(zapbo), layername=Zapbo.layername, field="geom"))
|
|
|
del ls_zapbos
|
|
del ls_zapbos
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ def test_positions_noeuds(self):
|
|
|
|
|
+ """ Contrôle de la position des noeuds """
|
|
|
|
|
+
|
|
|
# Arteres: comparer la géométrie à celle des noeuds
|
|
# Arteres: comparer la géométrie à celle des noeuds
|
|
|
- for artere in arteres:
|
|
|
|
|
|
|
+ for artere in self.arteres:
|
|
|
if not artere.noeud_a or not artere.noeud_b:
|
|
if not artere.noeud_a or not artere.noeud_b:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
@@ -299,9 +407,11 @@ class Validator(BaseValidator):
|
|
|
artere.points[-1].distanceSquared(artere.noeud_a.points[0]) <= TOLERANCE))):
|
|
artere.points[-1].distanceSquared(artere.noeud_a.points[0]) <= TOLERANCE))):
|
|
|
self.log_error(MissingItem("Pas de noeud aux coordonnées attendues ('{}')".format(artere), layername=Artere.layername, field="geom"))
|
|
self.log_error(MissingItem("Pas de noeud aux coordonnées attendues ('{}')".format(artere), layername=Artere.layername, field="geom"))
|
|
|
|
|
|
|
|
|
|
+ def test_positions_equipements(self):
|
|
|
|
|
+ """ Contrôle de la position des équipements """
|
|
|
|
|
|
|
|
# Cables: comparer la géométrie à celle des equipements (on utilise en fait la geom du noeud correspondant à l'équipement)
|
|
# Cables: comparer la géométrie à celle des equipements (on utilise en fait la geom du noeud correspondant à l'équipement)
|
|
|
- for cable in cables:
|
|
|
|
|
|
|
+ for cable in self.cables:
|
|
|
if not cable.equipement_a or not cable.equipement_b or not cable.is_geometry_valid() or not cable.equipement_a.noeud or not cable.equipement_b.noeud:
|
|
if not cable.equipement_a or not cable.equipement_b or not cable.is_geometry_valid() or not cable.equipement_a.noeud or not cable.equipement_b.noeud:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
@@ -312,42 +422,51 @@ class Validator(BaseValidator):
|
|
|
|
|
|
|
|
self.log_error(MissingItem("Pas d'equipement aux coordonnées attendues ('{}')".format(cable), layername=Cable.layername, field="geom"))
|
|
self.log_error(MissingItem("Pas d'equipement aux coordonnées attendues ('{}')".format(cable), layername=Cable.layername, field="geom"))
|
|
|
|
|
|
|
|
-
|
|
|
|
|
- # Verifie que chaque tranchée a au moins une artère
|
|
|
|
|
- arteres_full_buffer = QgsGeometry().unaryUnion([a.geom for a in arteres]).buffer(TOLERANCE, 100)
|
|
|
|
|
|
|
+ def test_tranchee_artere(self):
|
|
|
|
|
+ """ Chaque tranchée a au moins une artère """
|
|
|
|
|
+
|
|
|
|
|
+ arteres_full_buffer = QgsGeometry().unaryUnion([a.geom for a in self.arteres]).buffer(TOLERANCE, 100)
|
|
|
|
|
|
|
|
- for tranchee in tranchees:
|
|
|
|
|
|
|
+ for tranchee in self.tranchees:
|
|
|
if not arteres_full_buffer.contains(tranchee.geom):
|
|
if not arteres_full_buffer.contains(tranchee.geom):
|
|
|
self.log_error(MissingItem("Tranchée ou portion de tranchée sans artère ('{}')".format(tranchee), layername=Tranchee.layername, field="*"))
|
|
self.log_error(MissingItem("Tranchée ou portion de tranchée sans artère ('{}')".format(tranchee), layername=Tranchee.layername, field="*"))
|
|
|
|
|
|
|
|
|
|
+ def test_cable_artere(self):
|
|
|
|
|
+ """ Chaque cable a au moins une artère (sauf baguettes) """
|
|
|
|
|
+ arteres_full_buffer = QgsGeometry().unaryUnion([a.geom for a in self.arteres]).buffer(TOLERANCE, 100)
|
|
|
|
|
+
|
|
|
# Verifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
# Verifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
|
- for cable in cables:
|
|
|
|
|
|
|
+ for cable in self.cables:
|
|
|
if "baguette" in cable.CA_COMMENT.lower() or not cable.is_geometry_valid():
|
|
if "baguette" in cable.CA_COMMENT.lower() or not cable.is_geometry_valid():
|
|
|
continue
|
|
continue
|
|
|
if not arteres_full_buffer.contains(cable.geom):
|
|
if not arteres_full_buffer.contains(cable.geom):
|
|
|
self.log_error(MissingItem("Cable ou portion de cable sans artère ('{}')".format(cable), layername=Cable.layername, field="*"))
|
|
self.log_error(MissingItem("Cable ou portion de cable sans artère ('{}')".format(cable), layername=Cable.layername, field="*"))
|
|
|
|
|
|
|
|
- del arteres_full_buffer
|
|
|
|
|
|
|
+ def test_artere_cable(self):
|
|
|
|
|
+ """ Chaque artère a au moins une cable (sauf cas spécifiques) """
|
|
|
|
|
|
|
|
# Verifie que chaque artère a au moins un cable (sauf si commentaire contient un de ces mots 'racco client adductio attente bus 'sans cable'')
|
|
# Verifie que chaque artère a au moins un cable (sauf si commentaire contient un de ces mots 'racco client adductio attente bus 'sans cable'')
|
|
|
|
|
|
|
|
- cables_full_buffer = QgsGeometry().unaryUnion([c.geom for c in cables]).buffer(TOLERANCE, 100)
|
|
|
|
|
|
|
+ cables_full_buffer = QgsGeometry().unaryUnion([c.geom for c in self.cables]).buffer(TOLERANCE, 100)
|
|
|
|
|
|
|
|
- for artere in arteres:
|
|
|
|
|
|
|
+ for artere in self.arteres:
|
|
|
if any(x in artere.AR_COMMENT.lower() for x in ['racco','client','adductio','attente','bus','sans cable']):
|
|
if any(x in artere.AR_COMMENT.lower() for x in ['racco','client','adductio','attente','bus','sans cable']):
|
|
|
continue
|
|
continue
|
|
|
if not cables_full_buffer.contains(artere.geom):
|
|
if not cables_full_buffer.contains(artere.geom):
|
|
|
self.log_error(MissingItem("Artère sans cable ('{}')".format(artere), layername=Artere.layername, field="*"))
|
|
self.log_error(MissingItem("Artère sans cable ('{}')".format(artere), layername=Artere.layername, field="*"))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def test_dimensions(self):
|
|
|
|
|
+ """ Contrôle des dimensions logiques """
|
|
|
|
|
|
|
|
- # Contrôle des dimensions logiques
|
|
|
|
|
- for artere in arteres:
|
|
|
|
|
|
|
+ for artere in self.arteres:
|
|
|
try:
|
|
try:
|
|
|
if not int(artere.AR_FOU_DIS) <= int(artere.AR_NB_FOUR):
|
|
if not int(artere.AR_FOU_DIS) <= int(artere.AR_NB_FOUR):
|
|
|
self.log_error(DimensionError("Le nombre de fourreaux disponibles doit être inférieur au nombre total ('{}')".format(artere), layername=Artere.layername, field="AR_FOU_DIS"))
|
|
self.log_error(DimensionError("Le nombre de fourreaux disponibles doit être inférieur au nombre total ('{}')".format(artere), layername=Artere.layername, field="AR_FOU_DIS"))
|
|
|
except (TypeError, ValueError):
|
|
except (TypeError, ValueError):
|
|
|
pass
|
|
pass
|
|
|
|
|
|
|
|
- for cable in cables:
|
|
|
|
|
|
|
+ for cable in self.cables:
|
|
|
try:
|
|
try:
|
|
|
if not int(cable.CA_NB_FO_U) <= int(cable.CA_NB_FO):
|
|
if not int(cable.CA_NB_FO_U) <= int(cable.CA_NB_FO):
|
|
|
self.log_error(DimensionError("Le nombre de fourreaux utilisés doit être inférieur au nombre total ('{}')".format(cable), layername=Cable.layername, field="CA_NB_FO_U"))
|
|
self.log_error(DimensionError("Le nombre de fourreaux utilisés doit être inférieur au nombre total ('{}')".format(cable), layername=Cable.layername, field="CA_NB_FO_U"))
|
|
@@ -356,14 +475,17 @@ class Validator(BaseValidator):
|
|
|
except (TypeError, ValueError):
|
|
except (TypeError, ValueError):
|
|
|
pass
|
|
pass
|
|
|
|
|
|
|
|
|
|
+ def test_pbos(self):
|
|
|
|
|
+ """ PBO / ZAPBO """
|
|
|
|
|
+
|
|
|
# Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
# Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
|
- for equipement in equipements.values():
|
|
|
|
|
|
|
+ for equipement in self.equipements.values():
|
|
|
if not equipement.EQ_TYPE == "PBO":
|
|
if not equipement.EQ_TYPE == "PBO":
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
#zapbos englobant l'equipement
|
|
#zapbos englobant l'equipement
|
|
|
candidates = []
|
|
candidates = []
|
|
|
- for zapbo in zapbos.values():
|
|
|
|
|
|
|
+ for zapbo in self.zapbos.values():
|
|
|
if zapbo.geom.contains(equipement.geom):
|
|
if zapbo.geom.contains(equipement.geom):
|
|
|
candidates.append(zapbo)
|
|
candidates.append(zapbo)
|
|
|
|
|
|
|
@@ -378,10 +500,17 @@ class Validator(BaseValidator):
|
|
|
except StopIteration:
|
|
except StopIteration:
|
|
|
self.log_error(MissingItem("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contient: {}".format(equipement), layername=Equipement.layername, field="EQ_NOM"))
|
|
self.log_error(MissingItem("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contient: {}".format(equipement), layername=Equipement.layername, field="EQ_NOM"))
|
|
|
break
|
|
break
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ # TODO: revoir et déplacer le calcul du nombre de prises
|
|
|
if not hasattr(equipement.zapbo, "nb_prises") or equipement.zapbo.nb_prises is None:
|
|
if not hasattr(equipement.zapbo, "nb_prises") or equipement.zapbo.nb_prises is None:
|
|
|
equipement.zapbo.nb_prises = 0
|
|
equipement.zapbo.nb_prises = 0
|
|
|
|
|
|
|
|
|
|
+ def test_pbo_dimension(self):
|
|
|
|
|
+ """ Dimensionnement des PBO """
|
|
|
|
|
+ for equipement in self.equipements.values():
|
|
|
|
|
+ if not equipement.EQ_TYPE == "PBO":
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
# Controle du dimensionnement des PBO
|
|
# Controle du dimensionnement des PBO
|
|
|
if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
|
|
if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
|
|
|
self.log_error(DimensionError("Le PBO 6 contient plus de 5 prises: {}".format(equipement), layername=Equipement.layername, field="*"))
|
|
self.log_error(DimensionError("Le PBO 6 contient plus de 5 prises: {}".format(equipement), layername=Equipement.layername, field="*"))
|