|
|
@@ -313,4 +313,316 @@ class NetgeoValidator(BaseValidator):
|
|
|
for field, verrors in v.errors.items():
|
|
|
for err in verrors:
|
|
|
self.log_error(DataError(_translate_messages(err), filename=model.filename, field=field))
|
|
|
+
|
|
|
+ def _technical_validation(self):
|
|
|
+
|
|
|
+ # construction des index
|
|
|
+ arteres = self.dataset[Artere]
|
|
|
+ cables = self.dataset[Cable]
|
|
|
+ tranchees = self.dataset[Tranchee]
|
|
|
+
|
|
|
+ noeuds = {}
|
|
|
+ for noeud in self.dataset[Noeud]:
|
|
|
+ if not noeud.NO_NOM in noeuds:
|
|
|
+ noeuds[noeud.NO_NOM] = noeud
|
|
|
+ else:
|
|
|
+ self.log_error(UniqueError("Doublons dans le champs: {}".format(noeud), filename=Noeud.filename, field="NO_NOM"))
|
|
|
+
|
|
|
+ equipements = {}
|
|
|
+ for equipement in self.dataset[Equipement]:
|
|
|
+ if not equipement.EQ_NOM in equipements:
|
|
|
+ equipements[equipement.EQ_NOM] = equipement
|
|
|
+ else:
|
|
|
+ self.log_error(UniqueError("Doublons dans le champs: {}".format(equipement), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
+
|
|
|
+ zapbos = {}
|
|
|
+ for zapbo in self.dataset[Zapbo]:
|
|
|
+ if not zapbo.ID_ZAPBO in zapbos:
|
|
|
+ zapbos[zapbo.ID_ZAPBO] = zapbo
|
|
|
+ else:
|
|
|
+ self.log_error(UniqueError("Doublons dans le champs: {}".format(zapbo), filename=Zapbo.filename, field="ID_ZAPBO"))
|
|
|
+
|
|
|
+ # contrôle de la validité des géométries
|
|
|
+ for artere in arteres:
|
|
|
+ if not artere.valid:
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(artere), filename=Artere.filename, field="geom"))
|
|
|
+ for tranchee in tranchees:
|
|
|
+ if not tranchee.valid:
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(tranchee), filename=Tranchee.filename, field="geom"))
|
|
|
+ for cable in cables:
|
|
|
+ if not "baguette" in cable.CA_COMMENT.lower() and not cable.valid:
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(cable), filename=Cable.filename, field="geom"))
|
|
|
+ for noeud in noeuds.values():
|
|
|
+ if not noeud.valid:
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(noeud), filename=Noeud.filename, field="geom"))
|
|
|
+ for equipement in equipements.values():
|
|
|
+ if not equipement.valid:
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(equipement), filename=Equipement.filename, field="geom"))
|
|
|
+ for zapbo in zapbos.values():
|
|
|
+ if not zapbo.valid:
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(zapbo), filename=Zapbo.filename, field="geom"))
|
|
|
+
|
|
|
+ # rattachement les noeuds aux artères
|
|
|
+ for artere in arteres:
|
|
|
+ try:
|
|
|
+ artere.noeud_a = noeuds[artere.AR_NOEUD_A]
|
|
|
+ except KeyError:
|
|
|
+ artere.noeud_a = None
|
|
|
+ self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_A), filename=Artere.filename, field="AR_NOEUD_A"))
|
|
|
+
|
|
|
+ try:
|
|
|
+ artere.noeud_b = noeuds[artere.AR_NOEUD_B]
|
|
|
+ except KeyError:
|
|
|
+ artere.noeud_b = None
|
|
|
+ self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_B), filename=Artere.filename, field="AR_NOEUD_A"))
|
|
|
+
|
|
|
+ # rattachement des equipements aux cables
|
|
|
+ for cable in cables:
|
|
|
+ try:
|
|
|
+ cable.equipement_a = equipements[cable.CA_EQ_A]
|
|
|
+ except KeyError:
|
|
|
+ cable.equipement_a = None
|
|
|
+ self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_A), filename=Cable.filename, field="CA_EQ_A"))
|
|
|
+
|
|
|
+ try:
|
|
|
+ cable.equipement_b = equipements[cable.CA_EQ_B]
|
|
|
+ except KeyError:
|
|
|
+ cable.equipement_b = None
|
|
|
+ self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_B), filename=Cable.filename, field="CA_EQ_B"))
|
|
|
+
|
|
|
+ # rattachement des equipements aux noeuds
|
|
|
+ for equipement in equipements.values():
|
|
|
+ try:
|
|
|
+ equipement.noeud = noeuds[equipement.EQ_NOM_NOE]
|
|
|
+ except KeyError:
|
|
|
+ equipement.noeud = None
|
|
|
+ self.log_error(RelationError("Le noeud '{}' n'existe pas".format(equipement.EQ_NOM_NOE, equipement.EQ_NOM), filename=Equipement.filename, field="EQ_NOM_NOE"))
|
|
|
+
|
|
|
+ # verifie que tous les equipements sont l'equipement B d'au moins un cable
|
|
|
+ equipements_b = [cable.CA_EQ_B for cable in cables]
|
|
|
+ for eq_id in equipements:
|
|
|
+ if equipements[eq_id].EQ_TYPE == "BAI":
|
|
|
+ continue
|
|
|
+ if not eq_id in equipements_b:
|
|
|
+ self.log_error(RelationError("L'equipement '{}' n'est l'équipement B d'aucun cable".format(eq_id), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
+
|
|
|
+ # controle des doublons graphiques
|
|
|
+ for i, tranchee in enumerate(tranchees):
|
|
|
+ for other in tranchees[i+1:]:
|
|
|
+ if tranchee.geom == other.geom:
|
|
|
+ self.log_error(DuplicatedGeom("Une entité graphique est dupliquée".format(tranchee), filename=Tranchee.filename, field="geom"))
|
|
|
+
|
|
|
+ for i, artere in enumerate(arteres):
|
|
|
+ for other in arteres[i+1:]:
|
|
|
+ if artere.geom == other.geom:
|
|
|
+ self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(artere), filename=Artere.filename, field="geom"))
|
|
|
+
|
|
|
+ for i, cable in enumerate(cables):
|
|
|
+ for other in cables[i+1:]:
|
|
|
+ if cable.geom == other.geom and cable.CA_EQ_A == other.CA_EQ_A and cable.CA_EQ_B == other.CA_EQ_B:
|
|
|
+ self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(cable), filename=Cable.filename, field="geom"))
|
|
|
+
|
|
|
+ ls_noeuds = list(noeuds.values())
|
|
|
+ for i, noeud in enumerate(ls_noeuds):
|
|
|
+ for other in ls_noeuds[i+1:]:
|
|
|
+ if noeud.geom == other.geom:
|
|
|
+ self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(noeud), filename=Noeud.filename, field="geom"))
|
|
|
+ del ls_noeuds
|
|
|
+
|
|
|
+ ls_zapbos = list(zapbos.values())
|
|
|
+ for i, zapbo in enumerate(ls_zapbos):
|
|
|
+ for other in ls_zapbos[i+1:]:
|
|
|
+ if zapbo.geom == other.geom:
|
|
|
+ self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(zapbo), filename=Zapbo.filename, field="geom"))
|
|
|
+ del ls_zapbos
|
|
|
+
|
|
|
+ # Arteres: comparer la géométrie à celle des noeuds
|
|
|
+ for artere in arteres:
|
|
|
+ if not artere.noeud_a or not artere.noeud_b:
|
|
|
+ continue
|
|
|
+
|
|
|
+ buffer_a, buffer_b = artere.points[0].Buffer(TOLERANCE), artere.points[-1].Buffer(TOLERANCE)
|
|
|
+
|
|
|
+ if not (buffer_a.Contains(artere.noeud_a.points[0]) and buffer_b.Contains(artere.noeud_b.points[0])) \
|
|
|
+ and not (buffer_a.Contains(artere.noeud_b.points[0]) and buffer_b.Contains(artere.noeud_a.points[0])):
|
|
|
+
|
|
|
+ self.log_error(MissingItem("Pas de noeud aux coordonnées attendues ('{}')".format(artere), filename=Artere.filename, field="geom"))
|
|
|
+
|
|
|
+
|
|
|
+ # Cables: comparer la géométrie à celle des equipements (on utilise en fait la geom du noeud correspondant à l'équipement)
|
|
|
+ for cable in cables:
|
|
|
+ if not cable.equipement_a or not cable.equipement_b or not cable.valid or not cable.equipement_a.noeud or not cable.equipement_b.noeud:
|
|
|
+ continue
|
|
|
+
|
|
|
+ buffer_a, buffer_b = cable.points[0].Buffer(TOLERANCE), cable.points[-1].Buffer(TOLERANCE)
|
|
|
+
|
|
|
+ if not (buffer_a.Contains(cable.equipement_a.noeud.points[0]) and buffer_b.Contains(cable.equipement_b.noeud.points[0])) \
|
|
|
+ and not (buffer_a.Contains(cable.equipement_b.noeud.points[0]) and buffer_b.Contains(cable.equipement_a.noeud.points[0])):
|
|
|
+
|
|
|
+ self.log_error(MissingItem("Pas d'equipement aux coordonnées attendues ('{}')".format(cable), filename=Cable.filename, field="geom"))
|
|
|
+
|
|
|
+ del buffer_a, buffer_b
|
|
|
+
|
|
|
+ # Verifie que chaque tranchée a au moins une artère
|
|
|
+ arteres_emprise = Feature.buffered_union(arteres, TOLERANCE)
|
|
|
+
|
|
|
+ for tranchee in tranchees:
|
|
|
+ if not arteres_emprise.Contains(tranchee.geom):
|
|
|
+ self.log_error(MissingItem("Tranchée sans artère ('{}')".format(tranchee), filename=Tranchee.filename, field="-"))
|
|
|
+
|
|
|
+
|
|
|
+ # Verifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
|
+ for cable in cables:
|
|
|
+ if "baguette" in cable.CA_COMMENT.lower() or not cable.valid:
|
|
|
+ continue
|
|
|
+ if not arteres_emprise.Contains(cable.geom):
|
|
|
+ self.log_error(MissingItem("Cable sans artère ('{}')".format(cable), filename=Cable.filename, field="-"))
|
|
|
+
|
|
|
+ del arteres_emprise
|
|
|
+
|
|
|
+ # Verifie que chaque artère a au moins un cable (sauf si commentaire contient un de ces mots 'racco client adductio attente bus 'sans cable'')
|
|
|
+ cables_emprise = Feature.buffered_union(cables, TOLERANCE)
|
|
|
+
|
|
|
+ for artere in arteres:
|
|
|
+ if any(x in artere.AR_COMMENT.lower() for x in ['racco','client','adductio','attente','bus','sans cable']):
|
|
|
+ continue
|
|
|
+ if not cables_emprise.Contains(artere.geom):
|
|
|
+ self.log_error(MissingItem("Artère sans cable ('{}')".format(artere), filename=Artere.filename, field="-"))
|
|
|
+
|
|
|
+ del cables_emprise
|
|
|
+
|
|
|
+ # Contrôle des dimensions logiques
|
|
|
+ for artere in arteres:
|
|
|
+ try:
|
|
|
+ if not int(artere.AR_FOU_DIS) <= int(artere.AR_NB_FOUR):
|
|
|
+ self.log_error(DimensionError("Le nombre de fourreaux disponibles doit être inférieur au nombre total ('{}')".format(artere), filename=Artere.filename, field="AR_FOU_DIS"))
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ pass
|
|
|
+
|
|
|
+ for cable in cables:
|
|
|
+ try:
|
|
|
+ if not int(cable.CA_NB_FO_U) <= int(cable.CA_NB_FO):
|
|
|
+ self.log_error(DimensionError("Le nombre de fourreaux utilisés doit être inférieur au nombre total ('{}')".format(cable), filename=Cable.filename, field="CA_NB_FO_U"))
|
|
|
+ if not int(cable.CA_NB_FO_D) <= int(cable.CA_NB_FO):
|
|
|
+ self.log_error(DimensionError("Le nombre de fourreaux disponibles doit être inférieur au nombre total ('{}')".format(cable), filename=Cable.filename, field="CA_NB_FO_D"))
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ pass
|
|
|
+
|
|
|
+ ant_db = mn.ANTDb_0()
|
|
|
+ ant_db.execute("alter session set NLS_NUMERIC_CHARACTERS = '.,';") # definit le separateur decimal sur '.'
|
|
|
+
|
|
|
+ # Toutes les zapbo contiennent au moins une prise
|
|
|
+ for zapbo in zapbos.values():
|
|
|
+
|
|
|
+ if len(zapbo.points) >= 499:
|
|
|
+ # passe l'erreur provoquée par la limite au nombre d'arguments en SQL
|
|
|
+ zapbo.nb_prises = None
|
|
|
+ continue
|
|
|
+
|
|
|
+ sql = """Select SUM(NB_PRISE) AS NB_PRISES FROM SIG_ANT.FTTH_MN_PRISE_LOT z
|
|
|
+ WHERE SDO_INSIDE(z.GEOMETRY,
|
|
|
+ SDO_GEOMETRY(2003, 3949, SDO_POINT_TYPE(null,null,null), SDO_ELEM_INFO_ARRAY(1,1003,1), SDO_ORDINATE_ARRAY({}))
|
|
|
+ )='TRUE';""".format(", ".join(["{},{}".format(p.GetX(), p.GetY()) for p in zapbo.points]))
|
|
|
+
|
|
|
+ zapbo.nb_prises = int(ant_db.first(sql).NB_PRISES)
|
|
|
+ if not zapbo.nb_prises:
|
|
|
+ self.log_error(MissingItem("La Zapbo ne contient aucune prise: {}".format(zapbo), filename=Zapbo.filename, field="-"))
|
|
|
+
|
|
|
+ # Toutes les prises de la ou les ZAPM impactées sont dans une zapbo
|
|
|
+ zapms = {}
|
|
|
+ # > on déduit la liste des zapms à partir de la position des zapbos
|
|
|
+ for zapbo in zapbos.values():
|
|
|
+ centre = zapbo.geom.Centroid()
|
|
|
+ zapm = ant_db.first("""SELECT z.ID_ZAPM
|
|
|
+ FROM SIG_ANT.FTTH_MN_ZAPM z
|
|
|
+ WHERE sdo_contains(z.GEOMETRY,
|
|
|
+ SDO_GEOMETRY(2001, 3949, SDO_POINT_TYPE({}, {}, NULL), NULL, NULL)) = 'TRUE'
|
|
|
+ """.format(centre.GetX(), centre.GetY()))
|
|
|
+ try:
|
|
|
+ zapms[zapm.ID_ZAPM].append(zapbo)
|
|
|
+ except KeyError:
|
|
|
+ zapms[zapm.ID_ZAPM] = [zapbo]
|
|
|
+
|
|
|
+ for id_zapm in zapms:
|
|
|
+ zapm_couverture = Feature.union(zapms[id_zapm])
|
|
|
+ for prise in ant_db.read("""SELECT t.X AS x, t.Y AS y
|
|
|
+ FROM SIG_ANT.FTTH_MN_PRISE_LOT z,
|
|
|
+ TABLE(SDO_UTIL.GETVERTICES(z.GEOMETRY)) t
|
|
|
+ WHERE T_ETAT<>'OBSOLETE' AND ID_ZAPM_PARTIELLE='{}';""".format(id_zapm)):
|
|
|
+ point = ogr.Geometry(ogr.wkbPoint)
|
|
|
+ point.AddPoint(prise.x, prise.y)
|
|
|
+ if not zapm_couverture.Contains(point):
|
|
|
+ self.log_error(MissingItem("Certaines prises de la ZAPM ne sont pas comprises dans une ZAPBO: {}".format(id_zapm), filename=Zapbo.filename, field="-"))
|
|
|
+
|
|
|
+ # Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
|
+
|
|
|
+ for equipement in equipements.values():
|
|
|
+ if not equipement.EQ_TYPE == "PBO":
|
|
|
+ continue
|
|
|
+
|
|
|
+ #zapbos englobant l'equipement
|
|
|
+ candidates = []
|
|
|
+ for zapbo in zapbos.values():
|
|
|
+ if zapbo.geom.Contains(equipement.geom):
|
|
|
+ candidates.append(zapbo)
|
|
|
+
|
|
|
+ # le pbo doit être contenu dans une zapbo
|
|
|
+ if not candidates:
|
|
|
+ self.log_error(MissingItem("Le PBO n'est contenu dans aucune ZAPBO: {}".format(equipement), filename=Equipement.filename, field="geom"))
|
|
|
+ continue
|
|
|
+
|
|
|
+ # On se base sur le nom pour trouver la zapbo correspondante
|
|
|
+ try:
|
|
|
+ equipement.zapbo = next((z for z in candidates if equipement.EQ_NOM in z.ID_ZAPBO))
|
|
|
+ except StopIteration:
|
|
|
+ self.log_error(MissingItem("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contient: {}".format(equipement), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
+ break
|
|
|
+
|
|
|
+ # Controle du dimensionnement des PBO
|
|
|
+ if equipement.zapbo.nb_prises is not None:
|
|
|
+ if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
|
|
|
+ self.log_error(DimensionError("Le PBO 6 contient plus de 5 prises: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
+
|
|
|
+ if equipement.EQ_TYPE_PH == 'PBO 12' and not equipement.zapbo.nb_prises >= 6 and equipement.zapbo.nb_prises <= 8:
|
|
|
+ self.log_error(DimensionError("Le PBO 12 contient mois de 6 prises ou plus de 8 prises: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
+
|
|
|
+ if equipement.zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
|
|
|
+ self.log_error(TechnicalValidationError("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
+
|
|
|
+ if equipement.EQ_STATUT == "REC" and not equipement.zapbo.STATUT == "REC" and not equipement.zapbo.ID_ZAPBO[:4].lower() == "att_":
|
|
|
+ self.log_error(TechnicalValidationError("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
+
|
|
|
+ # Contrôler dans la base si des éléments portant ces codes existent à des emplacements différents
|
|
|
+ for noeud in noeuds.values():
|
|
|
+ sql = """SELECT z.NO_NOM, SDO_GEOM.SDO_DISTANCE(z.GEOMETRY, SDO_GEOMETRY(2001, 3949, SDO_POINT_TYPE({}, {}, NULL), NULL, NULL),0.005) AS DIST
|
|
|
+ FROM SIG_ANT.FTTH_MN_GR_NOEUD_GEO z
|
|
|
+ WHERE z.NO_NOM='{}';""".format(noeud.geom.GetX(), noeud.geom.GetY(), noeud.NO_NOM)
|
|
|
+ existing = ant_db.first(sql)
|
|
|
+ if existing:
|
|
|
+ if existing.DIST > TOLERANCE and existing.DIST < 20:
|
|
|
+ self.log_error(PositionError("La position du noeud ne correspond pas à l'existant: {}".format(noeud), filename=Noeud.filename, field="geom"))
|
|
|
+ elif existing.DIST > 20:
|
|
|
+ self.log_error(DuplicatedGeom("Un noeud portant ce nom existe déjà ailleurs sur le territoire: {}".format(noeud), filename=Noeud.filename, field="NO_NOM"))
|
|
|
+
|
|
|
+ for zapbo in zapbos.values():
|
|
|
+ sql = """SELECT z.ID_ZAPBO, SDO_GEOM.SDO_DISTANCE(SDO_GEOM.SDO_CENTROID(z.GEOMETRY,0.005), SDO_GEOMETRY(2001, 3949, SDO_POINT_TYPE({}, {}, NULL), NULL, NULL),0.005) AS DIST
|
|
|
+ FROM SIG_ANT.FTTH_MN_GR_ZAPBO_GEO z
|
|
|
+ WHERE z.ID_ZAPBO='{}';""".format(zapbo.geom.Centroid().GetX(), zapbo.geom.Centroid().GetY(), zapbo.ID_ZAPBO)
|
|
|
+ existing = ant_db.first(sql)
|
|
|
+ if existing:
|
|
|
+ if existing.DIST > TOLERANCE and existing.DIST < 20:
|
|
|
+ self.log_error(PositionError("La position de la ZAPBO ne correspond pas à l'existant: {}".format(zapbo), filename=Zapbo.filename, field="geom"))
|
|
|
+ elif existing.DIST > 20:
|
|
|
+ self.log_error(DuplicatedGeom("Une ZAPBO portant ce nom existe déjà ailleurs sur le territoire: {}".format(zapbo), filename=Zapbo.filename, field="ID_ZAPBO"))
|
|
|
+
|
|
|
+ # Contrôle si un equipement portant ce nom existe, mais associé à un noeud différent
|
|
|
+ for equipement in equipements.values():
|
|
|
+ sql = """SELECT z.EQ_NOM, z.EQ_NOM_NOEUD
|
|
|
+ FROM SIG_ANT.FTTH_MN_GR_EQ_PASSIF z
|
|
|
+ WHERE z.EQ_NOM='{}';""".format(equipement.EQ_NOM)
|
|
|
+ existing = ant_db.first(sql)
|
|
|
+ if existing and existing.EQ_NOM_NOEUD != equipement.EQ_NOM_NOE:
|
|
|
+ self.log_error(DuplicatedGeom("Un équipement portant ce nom ({}) existe déjà et est associé à un noeud différent ({})".format(equipement.NO_NOM, existing.EQ_NOM_NOEUD), filename=Noeud.filename, field="geom"))
|
|
|
+
|
|
|
+
|
|
|
|