|
|
@@ -0,0 +1,438 @@
|
|
|
+'''
|
|
|
+
|
|
|
+@author: olivier.massot, janv. 2019
|
|
|
+'''
|
|
|
+from core import mncheck
|
|
|
+import logging
|
|
|
+from qgis.core import QgsCoordinateReferenceSystem, QgsGeometry, QgsPointXY
|
|
|
+import re
|
|
|
+
|
|
|
+from core.checking import SUCCESS, FAILURE
|
|
|
+from path import Path
|
|
|
+from test._base import SchemaTest
|
|
|
+
|
|
|
+
|
|
|
+logger = logging.getLogger("mncheck")
|
|
|
+
|
|
|
+class Test(SchemaTest):
|
|
|
+ SCHEMA_NAME = "mn2_rec"
|
|
|
+ PROJECT_FILE = Path(__file__).parent / 'projects' / 'mn2_rec' / '1_valid' / '1_valid.qgz'
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ SchemaTest.setUp(self)
|
|
|
+ self.schema = mncheck.get_schema(self.SCHEMA_NAME)
|
|
|
+ if not self.schema.checkers:
|
|
|
+ raise AttributeError("Aucun testeur trouvé dans le schéma")
|
|
|
+ self.checker = self.schema.checkers[0]()
|
|
|
+
|
|
|
+ def run_test(self, test_name, dry=False):
|
|
|
+ results = self.checker.run(test_name, dry)
|
|
|
+ if len(results) == 0:
|
|
|
+ raise Exception("No result for test")
|
|
|
+ elif len(results) > 1:
|
|
|
+ raise Exception("More than one result from test")
|
|
|
+ return results[0]
|
|
|
+
|
|
|
+ def assertSuccess(self, r):
|
|
|
+ if not r.status == SUCCESS:
|
|
|
+ raise AssertionError("Le test a échoué")
|
|
|
+
|
|
|
+ def assertFailure(self, r):
|
|
|
+ if not r.status == FAILURE:
|
|
|
+ raise AssertionError("Le test n'aurait pas dû réussir")
|
|
|
+
|
|
|
+ def assertErrorLogged(self, result, err_msg):
|
|
|
+ if not any((re.fullmatch(err_msg, err.message) for err in result.errors)):
|
|
|
+ print(result.errors)
|
|
|
+ raise AssertionError("Error was not logged: {}".format(err_msg))
|
|
|
+
|
|
|
+ def test_load_layers(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_load_layers")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Chargement des données')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # une couche manquante et une pk manquante
|
|
|
+ _pk_ini = self.schema.Cable.pk
|
|
|
+ try:
|
|
|
+ self.schema.Artere.layer = None
|
|
|
+ self.schema.Cable.pk = "not_a_qgis_field"
|
|
|
+
|
|
|
+ r = self.run_test("test_load_layers", True)
|
|
|
+
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, ".*Couche manquante.*")
|
|
|
+ self.assertErrorLogged(r, ".*Clef primaire manquante.*")
|
|
|
+ except:
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ self.schema.Cable.pk = _pk_ini
|
|
|
+
|
|
|
+
|
|
|
+ def test_scr(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_scr")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Contrôle des projections')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # mauvaise projection
|
|
|
+ self.schema.Artere.layer.setCrs(QgsCoordinateReferenceSystem())
|
|
|
+ r = self.run_test("test_scr", True)
|
|
|
+
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, f"Mauvaise projection.*")
|
|
|
+
|
|
|
+ def test_structure_arteres(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_structure_arteres")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Structure des données: Artères')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # valeur non autorisée
|
|
|
+ self.checker.arteres[0].AR_ID_INSE = "invalid"
|
|
|
+ r = self.run_test("test_structure_arteres", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+
|
|
|
+ def test_structure_cables(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_structure_cables")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Structure des données: Cables')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # valeur non autorisée
|
|
|
+ self.checker.cables[0].CA_TYPE = "invalid"
|
|
|
+ r = self.run_test("test_structure_cables", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+
|
|
|
+ def test_structure_equipements(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_structure_equipements")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Structure des données: Equipements')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # valeur non autorisée
|
|
|
+ self.checker.equipements[0].EQ_TYPE = "invalid"
|
|
|
+ r = self.run_test("test_structure_equipements", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+
|
|
|
+ def test_structure_noeuds(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_structure_noeuds")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Structure des données: Noeuds')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # valeur non autorisée
|
|
|
+ self.checker.noeuds[0].NO_ID_INSE = "invalid"
|
|
|
+ r = self.run_test("test_structure_noeuds", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+
|
|
|
+ def test_structure_tranchees(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_structure_tranchees")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Structure des données: Tranchées')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # valeur non autorisée
|
|
|
+ self.checker.tranchees[0].TR_REVET = "invalid"
|
|
|
+ r = self.run_test("test_structure_tranchees", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+
|
|
|
+ def test_structure_zapbos(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_structure_zapbos")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Structure des données: Zapbos')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # valeur non autorisée
|
|
|
+ self.checker.zapbos[0].STATUT = "invalid"
|
|
|
+ r = self.run_test("test_structure_zapbos", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+
|
|
|
+ def test_geometry_validity(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_geometry_validity")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Contrôle de la validité des géométries')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # géométrie invalide
|
|
|
+ self.checker.arteres[0]._feature.setGeometry(QgsGeometry())
|
|
|
+ r = self.run_test("test_geometry_validity", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "La géométrie de l'objet est invalide")
|
|
|
+
|
|
|
+ def test_geometry_type(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_geometry_type")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Contrôle des types de géométries')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # type de géométrie invalide
|
|
|
+ self.checker.arteres[0]._feature.setGeometry(QgsGeometry())
|
|
|
+ r = self.run_test("test_geometry_type", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Type de géométrie invalide.*")
|
|
|
+
|
|
|
+ def test_bounding_box(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_bounding_box")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Contrôle des emprises')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # hors de l'emprise
|
|
|
+ self.checker.noeuds[0]._feature.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(0,0)))
|
|
|
+ r = self.run_test("test_bounding_box", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Hors de l'emprise autorisée")
|
|
|
+
|
|
|
+ def test_duplicates(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_duplicates")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Recherche de doublons')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # cas de doublon
|
|
|
+ self.checker.noeuds[0].NO_NOM = self.checker.noeuds[1].NO_NOM
|
|
|
+ r = self.run_test("test_duplicates", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Doublons dans le champs NO_NOM")
|
|
|
+
|
|
|
+ def test_constraints_arteres_noeuds(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_constraints_arteres_noeuds")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Application des contraintes: Artères / Noeuds')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # contrainte invalide
|
|
|
+ self.checker.arteres[0].noeud_a = None
|
|
|
+ r = self.run_test("test_constraints_arteres_noeuds", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Le noeud lié '.+' n'existe pas")
|
|
|
+
|
|
|
+ def test_constraints_cables_equipements(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_constraints_cables_equipements")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Application des contraintes: Equipements / Cables')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # contrainte invalide
|
|
|
+ self.checker.cables[0].equipement_a = None
|
|
|
+ r = self.run_test("test_constraints_cables_equipements", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "L'équipement lié '.+' n'existe pas*")
|
|
|
+
|
|
|
+ def test_constraints_cables_equipements_b(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_constraints_cables_equipements_b")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Application des contraintes: Equipements B')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # contrainte invalide
|
|
|
+ for cable in self.checker.cables:
|
|
|
+ cable.CA_EQ_B = ""
|
|
|
+ r = self.run_test("test_constraints_cables_equipements_b", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "L'equipement '.+' n'est l'équipement B d'aucun cable.+")
|
|
|
+
|
|
|
+ def test_constraints_equipements_noeuds(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_constraints_equipements_noeuds")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Application des contraintes: Noeuds / Equipements')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # contrainte invalide
|
|
|
+ self.checker.equipements[0].noeud = None
|
|
|
+ r = self.run_test("test_constraints_equipements_noeuds", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Le noeud lié '.+' n'existe pas")
|
|
|
+
|
|
|
+ def test_graphic_duplicates(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_graphic_duplicates")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Recherche de doublons graphiques')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # doublon graphique
|
|
|
+ self.checker.noeuds[0]._feature.setGeometry(self.checker.noeuds[1]._feature.geometry())
|
|
|
+ r = self.run_test("test_graphic_duplicates", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Une entité graphique est dupliquée")
|
|
|
+
|
|
|
+ def test_positions_noeuds(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_positions_noeuds")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Topologie: Noeuds / Artères')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # erreur topo
|
|
|
+ self.checker.noeuds[0]._feature.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(0,0)))
|
|
|
+ r = self.run_test("test_positions_noeuds", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Pas de noeud aux coordonnées attendues.*")
|
|
|
+
|
|
|
+ def test_positions_equipements(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_positions_equipements")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Topologie: Equipements / Cables')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # erreur topo
|
|
|
+ self.checker.cables[0].equipement_b.noeud._feature.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(0,0)))
|
|
|
+ r = self.run_test("test_positions_equipements", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Pas d'équipement aux coordonnées attendues.*")
|
|
|
+
|
|
|
+ def test_tranchee_artere(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_tranchee_artere")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Topologie: Tranchées / Artères')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # erreur topo
|
|
|
+ new_geom = self.checker.tranchees[0]._feature.geometry()
|
|
|
+ new_geom.translate(10000, 10000)
|
|
|
+ self.checker.tranchees[0]._feature.setGeometry(new_geom)
|
|
|
+
|
|
|
+ r = self.run_test("test_tranchee_artere", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Tranchée ou portion de tranchée sans artère")
|
|
|
+
|
|
|
+ def test_cable_artere(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_cable_artere")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Topologie: Cables / Artères')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # erreur topo
|
|
|
+ self.checker.cables[0].CA_COMMENT = ""
|
|
|
+ new_geom = self.checker.cables[0]._feature.geometry()
|
|
|
+ new_geom.translate(10000, 10000)
|
|
|
+ self.checker.cables[0]._feature.setGeometry(new_geom)
|
|
|
+
|
|
|
+ r = self.run_test("test_cable_artere", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Cable ou portion de cable sans artère")
|
|
|
+
|
|
|
+ def test_artere_cable(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_artere_cable")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Topologie: Artères / Cables')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # erreur topo
|
|
|
+ self.checker.arteres[0].AR_COMMENT = ""
|
|
|
+ new_geom = self.checker.arteres[0]._feature.geometry()
|
|
|
+ new_geom.translate(10000, 10000)
|
|
|
+ self.checker.arteres[0]._feature.setGeometry(new_geom)
|
|
|
+
|
|
|
+ r = self.run_test("test_artere_cable", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Artère ou portion d'artère sans cable")
|
|
|
+
|
|
|
+ def test_dimensions_fourreaux(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_dimensions_fourreaux")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Dimensions logiques: fourreaux')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # nombre de fourreaux incohérent
|
|
|
+ self.checker.arteres[0].AR_FOU_DIS = 1000
|
|
|
+ r = self.run_test("test_dimensions_fourreaux", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Le nombre de fourreaux disponibles \(AR_FOU_DIS\) doit être inférieur au nombre total \(AR_NB_FOUR\)")
|
|
|
+
|
|
|
+ self.checker.arteres[0].AR_FOU_DIS = 0
|
|
|
+
|
|
|
+ def test_pbos(self):
|
|
|
+
|
|
|
+ # cas initial: valide
|
|
|
+ r = self.run_test("test_pbos")
|
|
|
+
|
|
|
+ self.assertEqual(r.title, 'Topologie: PBO / ZAPBO')
|
|
|
+ self.assertSuccess(r)
|
|
|
+ self.assertEqual(len(r.errors), 0)
|
|
|
+
|
|
|
+ # equipement hors zapbo
|
|
|
+ _ini_geom = self.checker.equipements[0]._feature.geometry()
|
|
|
+ self.checker.equipements[0]._feature.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(0,0)))
|
|
|
+
|
|
|
+ r = self.run_test("test_pbos", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Le PBO n'est contenu dans aucune ZAPBO")
|
|
|
+
|
|
|
+ self.checker.equipements[0]._feature.setGeometry(_ini_geom)
|
|
|
+
|
|
|
+ # noms equipement / zapbo incoherents
|
|
|
+ self.checker.equipements[0].EQ_CODE = "%__#123456789#__%"
|
|
|
+ r = self.run_test("test_pbos", True)
|
|
|
+ self.assertFailure(r)
|
|
|
+ self.assertErrorLogged(r, "Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contiennent")
|
|
|
+
|