|
|
@@ -0,0 +1,630 @@
|
|
|
+'''
|
|
|
+
|
|
|
+
|
|
|
+ @author: olivier.massot, sept. 2018
|
|
|
+'''
|
|
|
+import time
|
|
|
+import zipfile
|
|
|
+
|
|
|
+from path import Path, TempDir
|
|
|
+
|
|
|
+from core import gis_
|
|
|
+from core.cerberus_extend import CerberusErrorHandler, \
|
|
|
+ _translate_messages, ExtendedValidator
|
|
|
+from schemas.common import SRID
|
|
|
+
|
|
|
+
|
|
|
+class ValidatorInterruption(BaseException):
|
|
|
+ pass
|
|
|
+
|
|
|
+class Checkpoint():
|
|
|
+ def __init__(self, name, valid=True):
|
|
|
+ self.name = name
|
|
|
+ self.valid = valid
|
|
|
+
|
|
|
+
|
|
|
+########### MODELES ################
|
|
|
+
|
|
|
+class BaseModel():
|
|
|
+ filename = ""
|
|
|
+ pk = ""
|
|
|
+ schema = {}
|
|
|
+ def __init__(self, **kwargs):
|
|
|
+ self.__dict__.update(kwargs)
|
|
|
+
|
|
|
+class BaseGeoModel(gis_.Feature):
|
|
|
+ filename = ""
|
|
|
+ pk = ""
|
|
|
+ geom_type = 0
|
|
|
+ bounding_box = (0,0,1,1)
|
|
|
+ schema = {}
|
|
|
+
|
|
|
+ def __init__(self, feature):
|
|
|
+ self.__dict__.update(feature.__dict__)
|
|
|
+
|
|
|
+
|
|
|
+########### ERREURS DE VALIDATION ################
|
|
|
+
|
|
|
+VALIDATION_ERROR_LEVELS = {10: "MINEURE", 20: "AVERTISSEMENT", 30: "ERREUR", 40: "CRITIQUE"}
|
|
|
+MINOR = 10
|
|
|
+WARNING = 20
|
|
|
+ERROR = 30
|
|
|
+CRITICAL = 40
|
|
|
+
|
|
|
+class BaseValidationError():
|
|
|
+ order_ = 0
|
|
|
+ name = "Erreur"
|
|
|
+ level = ERROR
|
|
|
+ help = ""
|
|
|
+ def __init__(self, message, filename="", field=""):
|
|
|
+ self.message = message
|
|
|
+ self.filename = filename
|
|
|
+ self.field = field
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return " - ".join(filter(None, [self.name, self.filename, self.field, self.message]))
|
|
|
+
|
|
|
+# Erreurs dans le chargement des fichiers
|
|
|
+class InputError(BaseValidationError):
|
|
|
+ order_ = 0
|
|
|
+ level = CRITICAL
|
|
|
+ name = "Erreur de chargement"
|
|
|
+
|
|
|
+class MissingFile(InputError):
|
|
|
+ order_ = 1
|
|
|
+ name = "Fichier Manquant"
|
|
|
+
|
|
|
+class UnreadableFile(InputError):
|
|
|
+ order_ = 2
|
|
|
+ name = "Fichier Illisible"
|
|
|
+
|
|
|
+class WrongSrid(InputError):
|
|
|
+ order_ = 3
|
|
|
+ name = "Mauvais SRID"
|
|
|
+
|
|
|
+### Erreurs dans la structure des données
|
|
|
+class StructureError(BaseValidationError):
|
|
|
+ order_ = 10
|
|
|
+ name = "Erreur de structure"
|
|
|
+ level = ERROR
|
|
|
+
|
|
|
+class GeomTypeError(StructureError):
|
|
|
+ order_ = 12
|
|
|
+ name = "Type de géométrie invalide"
|
|
|
+ level = CRITICAL
|
|
|
+
|
|
|
+class BoundingBoxError(StructureError):
|
|
|
+ order_ = 11
|
|
|
+ name = "Coordonnées hors de la zone autorisée"
|
|
|
+
|
|
|
+class InvalidGeometry(StructureError):
|
|
|
+ order_ = 13
|
|
|
+ name = "Géométrie invalide"
|
|
|
+
|
|
|
+class DataError(StructureError):
|
|
|
+ order_ = 14
|
|
|
+ name = "Erreur de format"
|
|
|
+
|
|
|
+# Erreurs dans le contenu, erreurs métiers
|
|
|
+class TechnicalValidationError(BaseValidationError):
|
|
|
+ order_ = 20
|
|
|
+ level = ERROR
|
|
|
+ name = "Erreur technique"
|
|
|
+
|
|
|
+class UniqueError(TechnicalValidationError):
|
|
|
+ order_ = 21
|
|
|
+ name = "Doublons dans le champs"
|
|
|
+
|
|
|
+class RelationError(TechnicalValidationError):
|
|
|
+ order_ = 22
|
|
|
+ level = CRITICAL
|
|
|
+ name = "Un objet lié n'existe pas"
|
|
|
+
|
|
|
+class DuplicatedGeom(TechnicalValidationError):
|
|
|
+ order_ = 23
|
|
|
+ name = "Doublon graphique"
|
|
|
+
|
|
|
+class MissingItem(TechnicalValidationError):
|
|
|
+ order_ = 24
|
|
|
+ name = "Elément manquant"
|
|
|
+
|
|
|
+class DimensionError(TechnicalValidationError):
|
|
|
+ order_ = 25
|
|
|
+ name = "Elément de dimension"
|
|
|
+
|
|
|
+class PositionError(TechnicalValidationError):
|
|
|
+ order_ = 26
|
|
|
+ name = "Erreur de positionnement"
|
|
|
+
|
|
|
+########### VALIDATION ################
|
|
|
+
|
|
|
+class BaseValidator():
|
|
|
+ schema_name = ""
|
|
|
+ models = {}
|
|
|
+ dataset = {}
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.valid = True
|
|
|
+ self.checkpoints = []
|
|
|
+ self.errors = []
|
|
|
+ self._current_checkpoint_valid = True
|
|
|
+ self.dt = 0
|
|
|
+
|
|
|
+ def checkpoint(self, title):
|
|
|
+ self.checkpoints.append(Checkpoint(title, self._current_checkpoint_valid))
|
|
|
+ self._current_checkpoint_valid = True
|
|
|
+ if self.errors:
|
|
|
+ self.valid = False
|
|
|
+ if self.critical_happened():
|
|
|
+ raise ValidatorInterruption()
|
|
|
+
|
|
|
+ def critical_happened(self):
|
|
|
+ return any([err.level == CRITICAL for err in self.errors])
|
|
|
+
|
|
|
+ def log_error(self, validation_error):
|
|
|
+ self._current_checkpoint_valid = False
|
|
|
+ self.errors.append(validation_error)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def submit(cls, subject):
|
|
|
+ """ prends un dossier ou une archive en entrée et vérifie son contenu """
|
|
|
+ subject = Path(subject)
|
|
|
+
|
|
|
+ if subject.isfile():
|
|
|
+ with TempDir() as dirname:
|
|
|
+ zip_ref = zipfile.ZipFile(subject, 'r')
|
|
|
+ zip_ref.extractall(dirname)
|
|
|
+ zip_ref.close()
|
|
|
+ if Path(dirname / subject.stem).isdir(): # cas où l'archive contient un dossier qui lui-même contient les fichiers
|
|
|
+ dirname /= subject.stem
|
|
|
+ return cls._submit_folder(dirname)
|
|
|
+
|
|
|
+ elif subject.isdir():
|
|
|
+ return cls._submit_folder(subject)
|
|
|
+ else:
|
|
|
+ raise FileNotFoundError(f"Impossible de trouver le fichier ou répertoire: {subject}")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _submit_folder(cls, folder):
|
|
|
+
|
|
|
+ validator = cls()
|
|
|
+ t0 = time.time()
|
|
|
+ try:
|
|
|
+ validator.validate(folder)
|
|
|
+ except ValidatorInterruption:
|
|
|
+ pass
|
|
|
+ validator.dt = time.time() - t0
|
|
|
+
|
|
|
+ report = validator.build_report(validator.schema_name, folder.name)
|
|
|
+ return report
|
|
|
+
|
|
|
+ def validate(self, folder):
|
|
|
+
|
|
|
+ # Chargement des données en mémoire
|
|
|
+ self._load_files(folder)
|
|
|
+ self.checkpoint("Chargement des données")
|
|
|
+
|
|
|
+ # Controle la structure des données (champs, formats et types)
|
|
|
+ self._structure_validation()
|
|
|
+ self.checkpoint("Contrôle de la structure des données")
|
|
|
+
|
|
|
+ # Validation technique
|
|
|
+ try:
|
|
|
+ self._technical_validation()
|
|
|
+ self.checkpoint("Validation Métier")
|
|
|
+ except ValidatorInterruption:
|
|
|
+ raise
|
|
|
+ except:
|
|
|
+ self.checkpoint("Validation Métier [interrompu]")
|
|
|
+
|
|
|
+ def _load_files(self, folder):
|
|
|
+ """ Charge les données du fichier et les associe à un modèle.
|
|
|
+ Attention: pas de contrôle de format ou de validité à ce niveau! """
|
|
|
+ raise NotImplementedError()
|
|
|
+
|
|
|
+ def _structure_validation(self):
|
|
|
+
|
|
|
+ for model in self.models:
|
|
|
+ v = ExtendedValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
|
|
|
+
|
|
|
+ for item in self.dataset[model]:
|
|
|
+
|
|
|
+ v.validate(item.__dict__)
|
|
|
+
|
|
|
+ for field, verrors in v.errors.items():
|
|
|
+ for err in verrors:
|
|
|
+ self.log_error(DataError(_translate_messages(err), filename=model.filename, field=field))
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _technical_validation(cls):
|
|
|
+ raise NotImplementedError()
|
|
|
+
|
|
|
+
|
|
|
+ def build_report(self, schema, filename):
|
|
|
+ report = {}
|
|
|
+ report["schema"] = schema
|
|
|
+ report["filename"] = filename
|
|
|
+ report["exec_time"] = "{:.3g} s.".format(self.dt)
|
|
|
+ report["checkpoints"] = [{"name": chk.name, "valid": chk.valid} for chk in self.checkpoints]
|
|
|
+
|
|
|
+ report["errors"] = {}
|
|
|
+
|
|
|
+ for err in self.errors:
|
|
|
+ if not err.name in report["errors"]:
|
|
|
+ report["errors"][err.name] = {"help": err.help, "order_": err.order_, "list": []}
|
|
|
+
|
|
|
+ err_report = {"filename": err.filename or "-",
|
|
|
+ "field": err.field or "-",
|
|
|
+ "message": err.message}
|
|
|
+ if err_report not in report["errors"][err.name]["list"]:
|
|
|
+ report["errors"][err.name]["list"].append(err_report)
|
|
|
+
|
|
|
+ return report
|
|
|
+
|
|
|
+
|
|
|
+class NetgeoValidator(BaseValidator):
|
|
|
+
|
|
|
+ def _load_files(self, folder):
|
|
|
+
|
|
|
+ for model in self.models:
|
|
|
+ filename = model.filename
|
|
|
+ path_ = Path(folder) / filename
|
|
|
+
|
|
|
+ if not path_.isfile():
|
|
|
+ self.log_error(MissingFile("Fichier manquant: '{}'".format(filename)))
|
|
|
+ continue
|
|
|
+
|
|
|
+ self.dataset[model] = []
|
|
|
+ try:
|
|
|
+
|
|
|
+ ds = gis_.Datasource(path_)
|
|
|
+ layer = ds.layer
|
|
|
+
|
|
|
+ if layer.srid != SRID:
|
|
|
+ self.log_error(WrongSrid("Mauvaise projection: {} (attendu: {})".format(layer.srid, SRID)))
|
|
|
+
|
|
|
+ for feature in layer:
|
|
|
+
|
|
|
+ item = model(feature)
|
|
|
+
|
|
|
+ self.dataset[model].append(item)
|
|
|
+
|
|
|
+ except IOError:
|
|
|
+ self.log_error(UnreadableFile("Fichier illisible: {}".format(path_.name)))
|
|
|
+
|
|
|
+ def _structure_validation(self):
|
|
|
+
|
|
|
+ for model in self.models:
|
|
|
+ v = ExtendedValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
|
|
|
+ xmin, ymin, xmax, ymax = model.bounding_box
|
|
|
+
|
|
|
+ for item in self.dataset[model]:
|
|
|
+
|
|
|
+ # geom type
|
|
|
+ if item.geom_type != model.geom_type:
|
|
|
+ self.log_error(GeomTypeError("Type de géométrie invalide: {} (attendu: {})".format(item.geom_name, gis_.GEOM_NAMES[model.geom_type]), filename=model.filename, field="geom"))
|
|
|
+
|
|
|
+ # bounding box
|
|
|
+ x1, y1, x2, y2 = item.bounding_box
|
|
|
+ if any(x < xmin or x > xmax for x in (x1, x2)) or \
|
|
|
+ any(y < ymin or y > ymax for y in (y1, y2)):
|
|
|
+ self.log_error(BoundingBoxError("Situé hors de l'emprise autorisée", filename=model.filename, field="geom"))
|
|
|
+
|
|
|
+ v.validate(item.__dict__)
|
|
|
+
|
|
|
+ for field, verrors in v.errors.items():
|
|
|
+ for err in verrors:
|
|
|
+ self.log_error(DataError(_translate_messages(err), filename=model.filename, field=field))
|
|
|
+
|
|
|
+# def _technical_validation(self):
|
|
|
+#
|
|
|
+# # construction des index
|
|
|
+# arteres = self.dataset[Artere]
|
|
|
+# cables = self.dataset[Cable]
|
|
|
+# tranchees = self.dataset[Tranchee]
|
|
|
+#
|
|
|
+# noeuds = {}
|
|
|
+# for noeud in self.dataset[Noeud]:
|
|
|
+# if not noeud.NO_NOM in noeuds:
|
|
|
+# noeuds[noeud.NO_NOM] = noeud
|
|
|
+# else:
|
|
|
+# self.log_error(UniqueError("Doublons dans le champs: {}".format(noeud), filename=Noeud.filename, field="NO_NOM"))
|
|
|
+#
|
|
|
+# equipements = {}
|
|
|
+# for equipement in self.dataset[Equipement]:
|
|
|
+# if not equipement.EQ_NOM in equipements:
|
|
|
+# equipements[equipement.EQ_NOM] = equipement
|
|
|
+# else:
|
|
|
+# self.log_error(UniqueError("Doublons dans le champs: {}".format(equipement), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
+#
|
|
|
+# zapbos = {}
|
|
|
+# for zapbo in self.dataset[Zapbo]:
|
|
|
+# if not zapbo.ID_ZAPBO in zapbos:
|
|
|
+# zapbos[zapbo.ID_ZAPBO] = zapbo
|
|
|
+# else:
|
|
|
+# self.log_error(UniqueError("Doublons dans le champs: {}".format(zapbo), filename=Zapbo.filename, field="ID_ZAPBO"))
|
|
|
+#
|
|
|
+# # contrôle de la validité des géométries
|
|
|
+# for artere in arteres:
|
|
|
+# if not artere.valid:
|
|
|
+# self.log_error(InvalidGeometry("Géométrie invalide: {}".format(artere), filename=Artere.filename, field="geom"))
|
|
|
+# for tranchee in tranchees:
|
|
|
+# if not tranchee.valid:
|
|
|
+# self.log_error(InvalidGeometry("Géométrie invalide: {}".format(tranchee), filename=Tranchee.filename, field="geom"))
|
|
|
+# for cable in cables:
|
|
|
+# if not "baguette" in cable.CA_COMMENT.lower() and not cable.valid:
|
|
|
+# self.log_error(InvalidGeometry("Géométrie invalide: {}".format(cable), filename=Cable.filename, field="geom"))
|
|
|
+# for noeud in noeuds.values():
|
|
|
+# if not noeud.valid:
|
|
|
+# self.log_error(InvalidGeometry("Géométrie invalide: {}".format(noeud), filename=Noeud.filename, field="geom"))
|
|
|
+# for equipement in equipements.values():
|
|
|
+# if not equipement.valid:
|
|
|
+# self.log_error(InvalidGeometry("Géométrie invalide: {}".format(equipement), filename=Equipement.filename, field="geom"))
|
|
|
+# for zapbo in zapbos.values():
|
|
|
+# if not zapbo.valid:
|
|
|
+# self.log_error(InvalidGeometry("Géométrie invalide: {}".format(zapbo), filename=Zapbo.filename, field="geom"))
|
|
|
+#
|
|
|
+# # rattachement les noeuds aux artères
|
|
|
+# for artere in arteres:
|
|
|
+# try:
|
|
|
+# artere.noeud_a = noeuds[artere.AR_NOEUD_A]
|
|
|
+# except KeyError:
|
|
|
+# artere.noeud_a = None
|
|
|
+# self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_A), filename=Artere.filename, field="AR_NOEUD_A"))
|
|
|
+#
|
|
|
+# try:
|
|
|
+# artere.noeud_b = noeuds[artere.AR_NOEUD_B]
|
|
|
+# except KeyError:
|
|
|
+# artere.noeud_b = None
|
|
|
+# self.log_error(RelationError("Le noeud '{}' n'existe pas".format(artere.AR_NOEUD_B), filename=Artere.filename, field="AR_NOEUD_A"))
|
|
|
+#
|
|
|
+# # rattachement des equipements aux cables
|
|
|
+# for cable in cables:
|
|
|
+# try:
|
|
|
+# cable.equipement_a = equipements[cable.CA_EQ_A]
|
|
|
+# except KeyError:
|
|
|
+# cable.equipement_a = None
|
|
|
+# self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_A), filename=Cable.filename, field="CA_EQ_A"))
|
|
|
+#
|
|
|
+# try:
|
|
|
+# cable.equipement_b = equipements[cable.CA_EQ_B]
|
|
|
+# except KeyError:
|
|
|
+# cable.equipement_b = None
|
|
|
+# self.log_error(RelationError("L'équipement '{}' n'existe pas".format(cable.CA_EQ_B), filename=Cable.filename, field="CA_EQ_B"))
|
|
|
+#
|
|
|
+# # rattachement des equipements aux noeuds
|
|
|
+# for equipement in equipements.values():
|
|
|
+# try:
|
|
|
+# equipement.noeud = noeuds[equipement.EQ_NOM_NOE]
|
|
|
+# except KeyError:
|
|
|
+# equipement.noeud = None
|
|
|
+# self.log_error(RelationError("Le noeud '{}' n'existe pas".format(equipement.EQ_NOM_NOE, equipement.EQ_NOM), filename=Equipement.filename, field="EQ_NOM_NOE"))
|
|
|
+#
|
|
|
+# # verifie que tous les equipements sont l'equipement B d'au moins un cable
|
|
|
+# equipements_b = [cable.CA_EQ_B for cable in cables]
|
|
|
+# for eq_id in equipements:
|
|
|
+# if equipements[eq_id].EQ_TYPE == "BAI":
|
|
|
+# continue
|
|
|
+# if not eq_id in equipements_b:
|
|
|
+# self.log_error(RelationError("L'equipement '{}' n'est l'équipement B d'aucun cable".format(eq_id), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
+#
|
|
|
+# # controle des doublons graphiques
|
|
|
+# for i, tranchee in enumerate(tranchees):
|
|
|
+# for other in tranchees[i+1:]:
|
|
|
+# if tranchee.geom == other.geom:
|
|
|
+# self.log_error(DuplicatedGeom("Une entité graphique est dupliquée".format(tranchee), filename=Tranchee.filename, field="geom"))
|
|
|
+#
|
|
|
+# for i, artere in enumerate(arteres):
|
|
|
+# for other in arteres[i+1:]:
|
|
|
+# if artere.geom == other.geom:
|
|
|
+# self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(artere), filename=Artere.filename, field="geom"))
|
|
|
+#
|
|
|
+# for i, cable in enumerate(cables):
|
|
|
+# for other in cables[i+1:]:
|
|
|
+# if cable.geom == other.geom and cable.CA_EQ_A == other.CA_EQ_A and cable.CA_EQ_B == other.CA_EQ_B:
|
|
|
+# self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(cable), filename=Cable.filename, field="geom"))
|
|
|
+#
|
|
|
+# ls_noeuds = list(noeuds.values())
|
|
|
+# for i, noeud in enumerate(ls_noeuds):
|
|
|
+# for other in ls_noeuds[i+1:]:
|
|
|
+# if noeud.geom == other.geom:
|
|
|
+# self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(noeud), filename=Noeud.filename, field="geom"))
|
|
|
+# del ls_noeuds
|
|
|
+#
|
|
|
+# ls_zapbos = list(zapbos.values())
|
|
|
+# for i, zapbo in enumerate(ls_zapbos):
|
|
|
+# for other in ls_zapbos[i+1:]:
|
|
|
+# if zapbo.geom == other.geom:
|
|
|
+# self.log_error(DuplicatedGeom("Une entité graphique est dupliquée ('{}')".format(zapbo), filename=Zapbo.filename, field="geom"))
|
|
|
+# del ls_zapbos
|
|
|
+#
|
|
|
+# # Arteres: comparer la géométrie à celle des noeuds
|
|
|
+# for artere in arteres:
|
|
|
+# if not artere.noeud_a or not artere.noeud_b:
|
|
|
+# continue
|
|
|
+#
|
|
|
+# buffer_a, buffer_b = artere.points[0].Buffer(TOLERANCE), artere.points[-1].Buffer(TOLERANCE)
|
|
|
+#
|
|
|
+# if not (buffer_a.Contains(artere.noeud_a.points[0]) and buffer_b.Contains(artere.noeud_b.points[0])) \
|
|
|
+# and not (buffer_a.Contains(artere.noeud_b.points[0]) and buffer_b.Contains(artere.noeud_a.points[0])):
|
|
|
+#
|
|
|
+# self.log_error(MissingItem("Pas de noeud aux coordonnées attendues ('{}')".format(artere), filename=Artere.filename, field="geom"))
|
|
|
+#
|
|
|
+#
|
|
|
+# # Cables: comparer la géométrie à celle des equipements (on utilise en fait la geom du noeud correspondant à l'équipement)
|
|
|
+# for cable in cables:
|
|
|
+# if not cable.equipement_a or not cable.equipement_b or not cable.valid or not cable.equipement_a.noeud or not cable.equipement_b.noeud:
|
|
|
+# continue
|
|
|
+#
|
|
|
+# buffer_a, buffer_b = cable.points[0].Buffer(TOLERANCE), cable.points[-1].Buffer(TOLERANCE)
|
|
|
+#
|
|
|
+# if not (buffer_a.Contains(cable.equipement_a.noeud.points[0]) and buffer_b.Contains(cable.equipement_b.noeud.points[0])) \
|
|
|
+# and not (buffer_a.Contains(cable.equipement_b.noeud.points[0]) and buffer_b.Contains(cable.equipement_a.noeud.points[0])):
|
|
|
+#
|
|
|
+# self.log_error(MissingItem("Pas d'equipement aux coordonnées attendues ('{}')".format(cable), filename=Cable.filename, field="geom"))
|
|
|
+#
|
|
|
+# del buffer_a, buffer_b
|
|
|
+#
|
|
|
+# # Verifie que chaque tranchée a au moins une artère
|
|
|
+# arteres_emprise = Feature.buffered_union(arteres, TOLERANCE)
|
|
|
+#
|
|
|
+# for tranchee in tranchees:
|
|
|
+# if not arteres_emprise.Contains(tranchee.geom):
|
|
|
+# self.log_error(MissingItem("Tranchée sans artère ('{}')".format(tranchee), filename=Tranchee.filename, field="-"))
|
|
|
+#
|
|
|
+#
|
|
|
+# # Verifie que chaque cable a au moins une artère (sauf si commentaire contient 'baguette')
|
|
|
+# for cable in cables:
|
|
|
+# if "baguette" in cable.CA_COMMENT.lower() or not cable.valid:
|
|
|
+# continue
|
|
|
+# if not arteres_emprise.Contains(cable.geom):
|
|
|
+# self.log_error(MissingItem("Cable sans artère ('{}')".format(cable), filename=Cable.filename, field="-"))
|
|
|
+#
|
|
|
+# del arteres_emprise
|
|
|
+#
|
|
|
+# # Verifie que chaque artère a au moins un cable (sauf si commentaire contient un de ces mots 'racco client adductio attente bus 'sans cable'')
|
|
|
+# cables_emprise = Feature.buffered_union(cables, TOLERANCE)
|
|
|
+#
|
|
|
+# for artere in arteres:
|
|
|
+# if any(x in artere.AR_COMMENT.lower() for x in ['racco','client','adductio','attente','bus','sans cable']):
|
|
|
+# continue
|
|
|
+# if not cables_emprise.Contains(artere.geom):
|
|
|
+# self.log_error(MissingItem("Artère sans cable ('{}')".format(artere), filename=Artere.filename, field="-"))
|
|
|
+#
|
|
|
+# del cables_emprise
|
|
|
+#
|
|
|
+# # Contrôle des dimensions logiques
|
|
|
+# for artere in arteres:
|
|
|
+# try:
|
|
|
+# if not int(artere.AR_FOU_DIS) <= int(artere.AR_NB_FOUR):
|
|
|
+# self.log_error(DimensionError("Le nombre de fourreaux disponibles doit être inférieur au nombre total ('{}')".format(artere), filename=Artere.filename, field="AR_FOU_DIS"))
|
|
|
+# except (TypeError, ValueError):
|
|
|
+# pass
|
|
|
+#
|
|
|
+# for cable in cables:
|
|
|
+# try:
|
|
|
+# if not int(cable.CA_NB_FO_U) <= int(cable.CA_NB_FO):
|
|
|
+# self.log_error(DimensionError("Le nombre de fourreaux utilisés doit être inférieur au nombre total ('{}')".format(cable), filename=Cable.filename, field="CA_NB_FO_U"))
|
|
|
+# if not int(cable.CA_NB_FO_D) <= int(cable.CA_NB_FO):
|
|
|
+# self.log_error(DimensionError("Le nombre de fourreaux disponibles doit être inférieur au nombre total ('{}')".format(cable), filename=Cable.filename, field="CA_NB_FO_D"))
|
|
|
+# except (TypeError, ValueError):
|
|
|
+# pass
|
|
|
+#
|
|
|
+# ant_db = mn.ANTDb_0()
|
|
|
+# ant_db.execute("alter session set NLS_NUMERIC_CHARACTERS = '.,';") # definit le separateur decimal sur '.'
|
|
|
+#
|
|
|
+# # Toutes les zapbo contiennent au moins une prise
|
|
|
+# for zapbo in zapbos.values():
|
|
|
+#
|
|
|
+# if len(zapbo.points) >= 499:
|
|
|
+# # passe l'erreur provoquée par la limite au nombre d'arguments en SQL
|
|
|
+# zapbo.nb_prises = None
|
|
|
+# continue
|
|
|
+#
|
|
|
+# sql = """Select SUM(NB_PRISE) AS NB_PRISES FROM SIG_ANT.FTTH_MN_PRISE_LOT z
|
|
|
+# WHERE SDO_INSIDE(z.GEOMETRY,
|
|
|
+# SDO_GEOMETRY(2003, 3949, SDO_POINT_TYPE(null,null,null), SDO_ELEM_INFO_ARRAY(1,1003,1), SDO_ORDINATE_ARRAY({}))
|
|
|
+# )='TRUE';""".format(", ".join(["{},{}".format(p.GetX(), p.GetY()) for p in zapbo.points]))
|
|
|
+#
|
|
|
+# zapbo.nb_prises = int(ant_db.first(sql).NB_PRISES)
|
|
|
+# if not zapbo.nb_prises:
|
|
|
+# self.log_error(MissingItem("La Zapbo ne contient aucune prise: {}".format(zapbo), filename=Zapbo.filename, field="-"))
|
|
|
+#
|
|
|
+# # Toutes les prises de la ou les ZAPM impactées sont dans une zapbo
|
|
|
+# zapms = {}
|
|
|
+# # > on déduit la liste des zapms à partir de la position des zapbos
|
|
|
+# for zapbo in zapbos.values():
|
|
|
+# centre = zapbo.geom.Centroid()
|
|
|
+# zapm = ant_db.first("""SELECT z.ID_ZAPM
|
|
|
+# FROM SIG_ANT.FTTH_MN_ZAPM z
|
|
|
+# WHERE sdo_contains(z.GEOMETRY,
|
|
|
+# SDO_GEOMETRY(2001, 3949, SDO_POINT_TYPE({}, {}, NULL), NULL, NULL)) = 'TRUE'
|
|
|
+# """.format(centre.GetX(), centre.GetY()))
|
|
|
+# try:
|
|
|
+# zapms[zapm.ID_ZAPM].append(zapbo)
|
|
|
+# except KeyError:
|
|
|
+# zapms[zapm.ID_ZAPM] = [zapbo]
|
|
|
+#
|
|
|
+# for id_zapm in zapms:
|
|
|
+# zapm_couverture = Feature.union(zapms[id_zapm])
|
|
|
+# for prise in ant_db.read("""SELECT t.X AS x, t.Y AS y
|
|
|
+# FROM SIG_ANT.FTTH_MN_PRISE_LOT z,
|
|
|
+# TABLE(SDO_UTIL.GETVERTICES(z.GEOMETRY)) t
|
|
|
+# WHERE T_ETAT<>'OBSOLETE' AND ID_ZAPM_PARTIELLE='{}';""".format(id_zapm)):
|
|
|
+# point = ogr.Geometry(ogr.wkbPoint)
|
|
|
+# point.AddPoint(prise.x, prise.y)
|
|
|
+# if not zapm_couverture.Contains(point):
|
|
|
+# self.log_error(MissingItem("Certaines prises de la ZAPM ne sont pas comprises dans une ZAPBO: {}".format(id_zapm), filename=Zapbo.filename, field="-"))
|
|
|
+#
|
|
|
+# # Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
|
|
|
+#
|
|
|
+# for equipement in equipements.values():
|
|
|
+# if not equipement.EQ_TYPE == "PBO":
|
|
|
+# continue
|
|
|
+#
|
|
|
+# #zapbos englobant l'equipement
|
|
|
+# candidates = []
|
|
|
+# for zapbo in zapbos.values():
|
|
|
+# if zapbo.geom.Contains(equipement.geom):
|
|
|
+# candidates.append(zapbo)
|
|
|
+#
|
|
|
+# # le pbo doit être contenu dans une zapbo
|
|
|
+# if not candidates:
|
|
|
+# self.log_error(MissingItem("Le PBO n'est contenu dans aucune ZAPBO: {}".format(equipement), filename=Equipement.filename, field="geom"))
|
|
|
+# continue
|
|
|
+#
|
|
|
+# # On se base sur le nom pour trouver la zapbo correspondante
|
|
|
+# try:
|
|
|
+# equipement.zapbo = next((z for z in candidates if equipement.EQ_NOM in z.ID_ZAPBO))
|
|
|
+# except StopIteration:
|
|
|
+# self.log_error(MissingItem("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contient: {}".format(equipement), filename=Equipement.filename, field="EQ_NOM"))
|
|
|
+# break
|
|
|
+#
|
|
|
+# # Controle du dimensionnement des PBO
|
|
|
+# if equipement.zapbo.nb_prises is not None:
|
|
|
+# if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
|
|
|
+# self.log_error(DimensionError("Le PBO 6 contient plus de 5 prises: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
+#
|
|
|
+# if equipement.EQ_TYPE_PH == 'PBO 12' and not equipement.zapbo.nb_prises >= 6 and equipement.zapbo.nb_prises <= 8:
|
|
|
+# self.log_error(DimensionError("Le PBO 12 contient mois de 6 prises ou plus de 8 prises: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
+#
|
|
|
+# if equipement.zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
|
|
|
+# self.log_error(TechnicalValidationError("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
+#
|
|
|
+# if equipement.EQ_STATUT == "REC" and not equipement.zapbo.STATUT == "REC" and not equipement.zapbo.ID_ZAPBO[:4].lower() == "att_":
|
|
|
+# self.log_error(TechnicalValidationError("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO: {}".format(equipement), filename=Equipement.filename, field="-"))
|
|
|
+#
|
|
|
+# # Contrôler dans la base si des éléments portant ces codes existent à des emplacements différents
|
|
|
+# for noeud in noeuds.values():
|
|
|
+# sql = """SELECT z.NO_NOM, SDO_GEOM.SDO_DISTANCE(z.GEOMETRY, SDO_GEOMETRY(2001, 3949, SDO_POINT_TYPE({}, {}, NULL), NULL, NULL),0.005) AS DIST
|
|
|
+# FROM SIG_ANT.FTTH_MN_GR_NOEUD_GEO z
|
|
|
+# WHERE z.NO_NOM='{}';""".format(noeud.geom.GetX(), noeud.geom.GetY(), noeud.NO_NOM)
|
|
|
+# existing = ant_db.first(sql)
|
|
|
+# if existing:
|
|
|
+# if existing.DIST > TOLERANCE and existing.DIST < 20:
|
|
|
+# self.log_error(PositionError("La position du noeud ne correspond pas à l'existant: {}".format(noeud), filename=Noeud.filename, field="geom"))
|
|
|
+# elif existing.DIST > 20:
|
|
|
+# self.log_error(DuplicatedGeom("Un noeud portant ce nom existe déjà ailleurs sur le territoire: {}".format(noeud), filename=Noeud.filename, field="NO_NOM"))
|
|
|
+#
|
|
|
+# for zapbo in zapbos.values():
|
|
|
+# sql = """SELECT z.ID_ZAPBO, SDO_GEOM.SDO_DISTANCE(SDO_GEOM.SDO_CENTROID(z.GEOMETRY,0.005), SDO_GEOMETRY(2001, 3949, SDO_POINT_TYPE({}, {}, NULL), NULL, NULL),0.005) AS DIST
|
|
|
+# FROM SIG_ANT.FTTH_MN_GR_ZAPBO_GEO z
|
|
|
+# WHERE z.ID_ZAPBO='{}';""".format(zapbo.geom.Centroid().GetX(), zapbo.geom.Centroid().GetY(), zapbo.ID_ZAPBO)
|
|
|
+# existing = ant_db.first(sql)
|
|
|
+# if existing:
|
|
|
+# if existing.DIST > TOLERANCE and existing.DIST < 20:
|
|
|
+# self.log_error(PositionError("La position de la ZAPBO ne correspond pas à l'existant: {}".format(zapbo), filename=Zapbo.filename, field="geom"))
|
|
|
+# elif existing.DIST > 20:
|
|
|
+# self.log_error(DuplicatedGeom("Une ZAPBO portant ce nom existe déjà ailleurs sur le territoire: {}".format(zapbo), filename=Zapbo.filename, field="ID_ZAPBO"))
|
|
|
+#
|
|
|
+# # Contrôle si un equipement portant ce nom existe, mais associé à un noeud différent
|
|
|
+# for equipement in equipements.values():
|
|
|
+# sql = """SELECT z.EQ_NOM, z.EQ_NOM_NOEUD
|
|
|
+# FROM SIG_ANT.FTTH_MN_GR_EQ_PASSIF z
|
|
|
+# WHERE z.EQ_NOM='{}';""".format(equipement.EQ_NOM)
|
|
|
+# existing = ant_db.first(sql)
|
|
|
+# if existing and existing.EQ_NOM_NOEUD != equipement.EQ_NOM_NOE:
|
|
|
+# self.log_error(DuplicatedGeom("Un équipement portant ce nom ({}) existe déjà et est associé à un noeud différent ({})".format(equipement.NO_NOM, existing.EQ_NOM_NOEUD), filename=Noeud.filename, field="geom"))
|
|
|
+#
|
|
|
+#
|
|
|
+
|