|
@@ -2,10 +2,13 @@
|
|
|
|
|
|
|
|
@author: olivier.massot, 2018
|
|
@author: olivier.massot, 2018
|
|
|
'''
|
|
'''
|
|
|
-from core import mn, gis
|
|
|
|
|
|
|
+from osgeo import ogr
|
|
|
|
|
+
|
|
|
|
|
+from core import mn, gis_
|
|
|
from core.gis_ import Feature
|
|
from core.gis_ import Feature
|
|
|
-from core.validation import NetgeoValidator, DuplicatedPk, RelationError, \
|
|
|
|
|
- DuplicatedGeom, MissingItem, DimensionError
|
|
|
|
|
|
|
+from core.validation import NetgeoValidator, RelationError, \
|
|
|
|
|
+ DuplicatedGeom, MissingItem, DimensionError, TechnicalValidationError, \
|
|
|
|
|
+ InvalidGeometry, UniqueError
|
|
|
from schemas.netgeo_1_12_doe.models import Artere, Cable, Equipement, Noeud, \
|
|
from schemas.netgeo_1_12_doe.models import Artere, Cable, Equipement, Noeud, \
|
|
|
Tranchee, Zapbo
|
|
Tranchee, Zapbo
|
|
|
|
|
|
|
@@ -26,41 +29,41 @@ class Netgeo112DoeValidator(NetgeoValidator):
|
|
|
if not noeud.NO_NOM in noeuds:
|
|
if not noeud.NO_NOM in noeuds:
|
|
|
noeuds[noeud.NO_NOM] = noeud
|
|
noeuds[noeud.NO_NOM] = noeud
|
|
|
else:
|
|
else:
|
|
|
- self.log_error(DuplicatedPk("Doublons dans le champs: {}".format(noeud), filename=Noeud.filename, field="NO_NOM"))
|
|
|
|
|
|
|
+ self.log_error(UniqueError("Doublons dans le champs: {}".format(noeud), filename=Noeud.filename, field="NO_NOM"))
|
|
|
|
|
|
|
|
equipements = {}
|
|
equipements = {}
|
|
|
for equipement in self.dataset[Equipement]:
|
|
for equipement in self.dataset[Equipement]:
|
|
|
if not equipement.EQ_NOM in equipements:
|
|
if not equipement.EQ_NOM in equipements:
|
|
|
equipements[equipement.EQ_NOM] = equipement
|
|
equipements[equipement.EQ_NOM] = equipement
|
|
|
else:
|
|
else:
|
|
|
- self.log_error(DuplicatedPk("Doublons dans le champs: {}".format(equipement), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
|
|
|
|
+ self.log_error(UniqueError("Doublons dans le champs: {}".format(equipement), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
|
|
|
|
|
zapbos = {}
|
|
zapbos = {}
|
|
|
for zapbo in self.dataset[Zapbo]:
|
|
for zapbo in self.dataset[Zapbo]:
|
|
|
if not zapbo.ID_ZAPBO in zapbos:
|
|
if not zapbo.ID_ZAPBO in zapbos:
|
|
|
zapbos[zapbo.ID_ZAPBO] = zapbo
|
|
zapbos[zapbo.ID_ZAPBO] = zapbo
|
|
|
else:
|
|
else:
|
|
|
- self.log_error(DuplicatedPk("Doublons dans le champs: {}".format(zapbo), filename=Zapbo.filename, field="ID_ZAPBO"))
|
|
|
|
|
|
|
+ self.log_error(UniqueError("Doublons dans le champs: {}".format(zapbo), filename=Zapbo.filename, field="ID_ZAPBO"))
|
|
|
|
|
|
|
|
# contrôle de la validité des géométries
|
|
# contrôle de la validité des géométries
|
|
|
for artere in arteres:
|
|
for artere in arteres:
|
|
|
if not artere.valid:
|
|
if not artere.valid:
|
|
|
- self.log_error(DuplicatedPk("Géométrie invalide: {}".format(artere), filename=Artere.filename, field="geom"))
|
|
|
|
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(artere), filename=Artere.filename, field="geom"))
|
|
|
for tranchee in tranchees:
|
|
for tranchee in tranchees:
|
|
|
if not tranchee.valid:
|
|
if not tranchee.valid:
|
|
|
- self.log_error(DuplicatedPk("Géométrie invalide: {}".format(tranchee), filename=Tranchee.filename, field="geom"))
|
|
|
|
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(tranchee), filename=Tranchee.filename, field="geom"))
|
|
|
for cable in cables:
|
|
for cable in cables:
|
|
|
- if not cable.valid:
|
|
|
|
|
- self.log_error(DuplicatedPk("Géométrie invalide: {}".format(cable), filename=Cable.filename, field="geom"))
|
|
|
|
|
|
|
+ if not "baguette" in cable.CA_COMMENT.lower() and not cable.valid:
|
|
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(cable), filename=Cable.filename, field="geom"))
|
|
|
for noeud in noeuds.values():
|
|
for noeud in noeuds.values():
|
|
|
if not noeud.valid:
|
|
if not noeud.valid:
|
|
|
- self.log_error(DuplicatedPk("Géométrie invalide: {}".format(noeud), filename=Noeud.filename, field="geom"))
|
|
|
|
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(noeud), filename=Noeud.filename, field="geom"))
|
|
|
for equipement in equipements.values():
|
|
for equipement in equipements.values():
|
|
|
if not equipement.valid:
|
|
if not equipement.valid:
|
|
|
- self.log_error(DuplicatedPk("Géométrie invalide: {}".format(equipement), filename=Equipement.filename, field="geom"))
|
|
|
|
|
- for zapbo in zapbos:
|
|
|
|
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(equipement), filename=Equipement.filename, field="geom"))
|
|
|
|
|
+ for zapbo in zapbos.values():
|
|
|
if not zapbo.valid:
|
|
if not zapbo.valid:
|
|
|
- self.log_error(DuplicatedPk("Géométrie invalide: {}".format(zapbo), filename=Zapbo.filename, field="geom"))
|
|
|
|
|
|
|
+ self.log_error(InvalidGeometry("Géométrie invalide: {}".format(zapbo), filename=Zapbo.filename, field="geom"))
|
|
|
|
|
|
|
|
# rattachement les noeuds aux artères
|
|
# rattachement les noeuds aux artères
|
|
|
for artere in arteres:
|
|
for artere in arteres:
|
|
@@ -138,33 +141,32 @@ class Netgeo112DoeValidator(NetgeoValidator):
|
|
|
|
|
|
|
|
# Arteres: comparer la géométrie à celle des noeuds
|
|
# Arteres: comparer la géométrie à celle des noeuds
|
|
|
for artere in arteres:
|
|
for artere in arteres:
|
|
|
- point_a, point_b = artere.points[0], artere.points[-1]
|
|
|
|
|
-
|
|
|
|
|
- if not artere.noeud_a or artere.noeud_b:
|
|
|
|
|
|
|
+ if not artere.noeud_a or not artere.noeud_b:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- if point_a not in (artere.noeud_a.geom.points[0], artere.noeud_b.geom.points[0]):
|
|
|
|
|
- self.log_error(DuplicatedGeom("Pas de noeud aux coordonnées attendues ('{}')".format(artere), filename=Artere.filename, field="geom"))
|
|
|
|
|
|
|
+ buffer_a, buffer_b = artere.points[0].Buffer(1), artere.points[-1].Buffer(1)
|
|
|
|
|
+
|
|
|
|
|
+ if not (buffer_a.Contains(artere.noeud_a.points[0]) and buffer_b.Contains(artere.noeud_b.points[0])) \
|
|
|
|
|
+ and not (buffer_a.Contains(artere.noeud_b.points[0]) and buffer_b.Contains(artere.noeud_a.points[0])):
|
|
|
|
|
+
|
|
|
|
|
+ self.log_error(MissingItem("Pas de noeud aux coordonnées attendues ('{}')".format(artere), filename=Artere.filename, field="geom"))
|
|
|
|
|
|
|
|
- if point_b not in (artere.noeud_a.geom.points[0], artere.noeud_b.geom.points[0]):
|
|
|
|
|
- self.log_error(DuplicatedGeom("Pas de noeud aux coordonnées attendues ('{}')".format(artere), filename=Artere.filename, field="geom"))
|
|
|
|
|
|
|
|
|
|
# Cables: comparer la géométrie à celle des equipements (on utilise en fait la geom du noeud correspondant à l'équipement)
|
|
# Cables: comparer la géométrie à celle des equipements (on utilise en fait la geom du noeud correspondant à l'équipement)
|
|
|
for cable in cables:
|
|
for cable in cables:
|
|
|
- point_a, point_b = cable.points[0], cable.points[-1]
|
|
|
|
|
-
|
|
|
|
|
- if not cable.equipement_a or cable.equipement_b:
|
|
|
|
|
|
|
+ if not cable.equipement_a or not cable.equipement_b or not cable.valid or not cable.equipement_a.noeud or not cable.equipement_b.noeud:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- if point_a not in (cable.equipement_a.noeud.geom.points[0], cable.equipement_b.noeud.geom.points[0]):
|
|
|
|
|
- self.log_error(DuplicatedGeom("Pas d'equipement aux coordonnées attendues ('{}')".format(cable), filename=Cable.filename, field="geom"))
|
|
|
|
|
|
|
+ buffer_a, buffer_b = cable.points[0].Buffer(1), cable.points[-1].Buffer(1)
|
|
|
|
|
+
|
|
|
|
|
+ if not (buffer_a.Contains(cable.equipement_a.noeud.points[0]) and buffer_b.Contains(cable.equipement_b.noeud.points[0])) \
|
|
|
|
|
+ and not (buffer_a.Contains(cable.equipement_b.noeud.points[0]) and buffer_b.Contains(cable.equipement_a.noeud.points[0])):
|
|
|
|
|
+
|
|
|
|
|
+ self.log_error(MissingItem("Pas d'equipement aux coordonnées attendues ('{}')".format(cable), filename=Cable.filename, field="geom"))
|
|
|
|
|
|
|
|
- if point_b not in (cable.equipement_a.noeud.points[0], cable.equipement_b.noeud.geom.points[0]):
|
|
|
|
|
- self.log_error(DuplicatedGeom("Pas d'equipement aux coordonnées attendues ('{}')".format(cable), filename=Cable.filename, field="geom"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Verifie que chaque tranchée a au moins une artère
|
|
# Verifie que chaque tranchée a au moins une artère
|
|
|
- # TODO: Ajouter une tolérance de 50cm, et prendre en compte le fait que les cables et artères n'ont pas le même découpage
|
|
|
|
|
arteres_emprise = Feature.buffered_union(arteres, 0.5)
|
|
arteres_emprise = Feature.buffered_union(arteres, 0.5)
|
|
|
|
|
|
|
|
for tranchee in tranchees:
|
|
for tranchee in tranchees:
|
|
@@ -174,7 +176,7 @@ class Netgeo112DoeValidator(NetgeoValidator):
|
|
|
|
|
|
|
|
# Verifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
# Verifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
|
for cable in cables:
|
|
for cable in cables:
|
|
|
- if "baguette" in cable.CA_COMMENT.lower():
|
|
|
|
|
|
|
+ if "baguette" in cable.CA_COMMENT.lower() or not cable.valid:
|
|
|
continue
|
|
continue
|
|
|
if not arteres_emprise.Contains(cable.geom):
|
|
if not arteres_emprise.Contains(cable.geom):
|
|
|
self.log_error(MissingItem("Cable sans artère ('{}')".format(cable), filename=Cable.filename, field="-"))
|
|
self.log_error(MissingItem("Cable sans artère ('{}')".format(cable), filename=Cable.filename, field="-"))
|
|
@@ -209,24 +211,97 @@ class Netgeo112DoeValidator(NetgeoValidator):
|
|
|
except (TypeError, ValueError):
|
|
except (TypeError, ValueError):
|
|
|
pass
|
|
pass
|
|
|
|
|
|
|
|
- # Contrôler l'emprise des ZAPBO
|
|
|
|
|
|
|
+ ant_db = mn.ANTDb_0()
|
|
|
|
|
+ ant_db.execute("alter session set NLS_NUMERIC_CHARACTERS = '.,';") # definit le separateur decimal sur '.'
|
|
|
|
|
|
|
|
|
|
+ # Toutes les zapbo contiennent au moins une prise
|
|
|
|
|
+ for zapbo in zapbos.values():
|
|
|
|
|
+ sql = """Select SUM(NB_PRISE) AS NB_PRISES FROM SIG_ANT.FTTH_MN_PRISE_LOT z
|
|
|
|
|
+ WHERE SDO_INSIDE(z.GEOMETRY,
|
|
|
|
|
+ SDO_GEOMETRY(2003, 3949, SDO_POINT_TYPE(null,null,null), SDO_ELEM_INFO_ARRAY(1,1003,1), SDO_ORDINATE_ARRAY({}))
|
|
|
|
|
+ )='TRUE';""".format(", ".join(["{},{}".format(p.GetX(), p.GetY()) for p in zapbo.points]))
|
|
|
|
|
+ zapbo.nb_prises = int(ant_db.first(sql).NB_PRISES)
|
|
|
|
|
+ if not zapbo.nb_prises:
|
|
|
|
|
+ self.log_error(MissingItem("La Zapbo ne contient aucune prise: {}".format(zapbo), filename=Zapbo.filename, field="-"))
|
|
|
|
|
|
|
|
- # Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
|
|
|
|
|
|
|
|
|
|
+ # Toutes les prises de la ou les ZAPM impactées sont dans une zapbo
|
|
|
|
|
+ zapms = {}
|
|
|
|
|
+ # > on déduit la liste des zapms à partir de la position des zapbos
|
|
|
|
|
+ for zapbo in zapbos.values():
|
|
|
|
|
+ centre = zapbo.geom.Centroid()
|
|
|
|
|
+ zapm = ant_db.first("""SELECT z.ID_ZAPM
|
|
|
|
|
+ FROM SIG_ANT.FTTH_MN_ZAPM z
|
|
|
|
|
+ WHERE sdo_contains(z.GEOMETRY,
|
|
|
|
|
+ SDO_GEOMETRY(2001, 3949, SDO_POINT_TYPE({}, {}, NULL), NULL, NULL)) = 'TRUE'
|
|
|
|
|
+ """.format(centre.GetX(), centre.GetY()))
|
|
|
|
|
+ try:
|
|
|
|
|
+ zapms[zapm.ID_ZAPM].append(zapbo)
|
|
|
|
|
+ except KeyError:
|
|
|
|
|
+ zapms[zapm.ID_ZAPM] = [zapbo]
|
|
|
|
|
+
|
|
|
|
|
+ for id_zapm in zapms:
|
|
|
|
|
+ zapm_couverture = Feature.union(zapms[id_zapm])
|
|
|
|
|
+ for prise in ant_db.read("""SELECT t.X AS x, t.Y AS y
|
|
|
|
|
+ FROM SIG_ANT.FTTH_MN_PRISE_LOT z,
|
|
|
|
|
+ TABLE(SDO_UTIL.GETVERTICES(z.GEOMETRY)) t
|
|
|
|
|
+ WHERE T_ETAT<>'OBSOLETE' AND ID_ZAPM_PARTIELLE='{}';""".format(id_zapm)):
|
|
|
|
|
+ point = ogr.Geometry(ogr.wkbPoint)
|
|
|
|
|
+ point.AddPoint(prise.x, prise.y)
|
|
|
|
|
+ if not zapm_couverture.Contains(point):
|
|
|
|
|
+ self.log_error(MissingItem("Certaines prises de la ZAPM ne sont pas comprises dans une ZAPBO: {}".format(id_zapm), filename=Zapbo.filename, field="-"))
|
|
|
|
|
+
|
|
|
|
|
+ # Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
|
|
|
+
|
|
|
|
|
+ for equipement in equipements.values():
|
|
|
|
|
+ if not equipement.EQ_TYPE == "PBO":
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ #zapbos englobant l'equipement
|
|
|
|
|
+ candidates = []
|
|
|
|
|
+ for zapbo in zapbos.values():
|
|
|
|
|
+ if zapbo.geom.Contains(equipement.geom):
|
|
|
|
|
+ candidates.append(zapbo)
|
|
|
|
|
+
|
|
|
|
|
+ # le pbo doit être contenu dans une zapbo
|
|
|
|
|
+ if not candidates:
|
|
|
|
|
+ self.log_error(MissingItem("Le PBO n'est contenu dans aucune ZAPBO: {}".format(equipement), filename=Equipement.filename, field="geom"))
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # On se base sur le nom pour trouver la zapbo correspondante
|
|
|
|
|
+ try:
|
|
|
|
|
+ equipement.zapbo = next((z for z in candidates if equipement.EQ_NOM in z.ID_ZAPBO))
|
|
|
|
|
+ except StopIteration:
|
|
|
|
|
+ self.log_error(MissingItem("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contient: {}".format(equipement), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # Controle du dimensionnement des PBO
|
|
|
|
|
+ if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
|
|
|
|
|
+ self.log_error(DimensionError("Le PBO 6 contient plus de 5 prises: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
|
|
+
|
|
|
|
|
+ if equipement.EQ_TYPE_PH == 'PBO 12' and not equipement.zapbo.nb_prises >= 6 and equipement.zapbo.nb_prises <= 8:
|
|
|
|
|
+ self.log_error(DimensionError("Le PBO 12 contient mois de 6 prises ou plus de 8 prises: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
|
|
+
|
|
|
|
|
+ if equipement.zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
|
|
|
|
|
+ self.log_error(TechnicalValidationError("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
|
|
+
|
|
|
|
|
+ if equipement.EQ_STATUT == "REC" and not equipement.zapbo.STATUT == "REC" and not equipement.zapbo.ID_ZAPBO[:4].lower() == "att_":
|
|
|
|
|
+ self.log_error(TechnicalValidationError("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
|
|
|
|
|
- ### Verifs en base
|
|
|
|
|
-# ant_db = mn.ANTDb()
|
|
|
|
|
-
|
|
|
|
|
# Contrôler dans la base si des éléments portant ces codes existent à des emplacements différents
|
|
# Contrôler dans la base si des éléments portant ces codes existent à des emplacements différents
|
|
|
-
|
|
|
|
|
- # verifier que toutes les prises de la plaque sont inclues dans une zapbo
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
|
|
+ for noeud in noeuds.values():
|
|
|
|
|
+ sql = """Select z.ID_FTTH_MN_GR_NOEUD_GEO FROM SIG_ANT.FTTH_MN_GR_NOEUD_GEO z
|
|
|
|
|
+ WHERE z.NO_NOM='{}'
|
|
|
|
|
+ AND SDO_GEOM.SDO_DISTANCE(z.GEOMETRY, SDO_GEOMETRY(2001, 3949, SDO_POINT_TYPE({}, {}, NULL), NULL, NULL),0.005)>10;
|
|
|
|
|
+ """.format(noeud.NO_NOM, noeud.geom.GetX(), noeud.geom.GetY())
|
|
|
|
|
+ if ant_db.exists(sql):
|
|
|
|
|
+ self.log_error(DuplicatedGeom("Un noeud portant ce nom existe déjà ailleurs sur le territoire: {}".format(noeud), filename=Noeud.filename, field="NO_NOM"))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
from core.constants import MAIN
|
|
from core.constants import MAIN
|
|
|
- subject = MAIN / "work" / "STURNO_228CP0_APD_180301_OK.zip"
|
|
|
|
|
|
|
+ subject = MAIN / "work" / "SOGETREL_026AP0_REC_181001_OK"
|
|
|
report = Netgeo112DoeValidator.submit(subject)
|
|
report = Netgeo112DoeValidator.submit(subject)
|
|
|
print(report)
|
|
print(report)
|
|
|
|
|
|