|
|
@@ -1,339 +0,0 @@
|
|
|
-'''
|
|
|
-
|
|
|
-@author: olivier.massot, janv. 2019
|
|
|
-'''
|
|
|
-from core import mncheck
|
|
|
-import logging
|
|
|
-from qgis.core import QgsCoordinateReferenceSystem, QgsGeometry, QgsPointXY
|
|
|
-import re
|
|
|
-
|
|
|
-from core.checking import SUCCESS, FAILURE, ERROR
|
|
|
-from path import Path
|
|
|
-from schemas.mn3_apd import SiteTelecom, Zapbo
|
|
|
-from test._base import SchemaTest
|
|
|
-
|
|
|
-
|
|
|
-logger = logging.getLogger("mncheck")
|
|
|
-
|
|
|
-class Test(SchemaTest):
|
|
|
- SCHEMA_NAME = "mn3_apd"
|
|
|
- PROJECT_FILE = Path(__file__).parent / 'projects' / 'mn3_apd' / 'mn3_apd_valid.qgz'
|
|
|
-
|
|
|
- def setUp(self):
|
|
|
- SchemaTest.setUp(self)
|
|
|
- self.schema = mncheck.get_schema(self.SCHEMA_NAME)
|
|
|
- if not self.schema.checkers:
|
|
|
- raise AttributeError("Aucun testeur trouvé dans le schéma")
|
|
|
- self.checker = self.schema.checkers[0]()
|
|
|
-
|
|
|
- def run_test(self, test_name, dry=False):
|
|
|
- results = self.checker.run(test_name, dry)
|
|
|
- if len(results) == 0:
|
|
|
- raise Exception("No result for test")
|
|
|
- elif len(results) > 1:
|
|
|
- raise Exception("More than one result from test")
|
|
|
- return results[0]
|
|
|
-
|
|
|
- def assertSuccess(self, r):
|
|
|
- if not r.status == SUCCESS:
|
|
|
- raise AssertionError("Le test a échoué")
|
|
|
-
|
|
|
- def assertFailure(self, r):
|
|
|
- if not r.status == FAILURE:
|
|
|
- raise AssertionError("Le test n'aurait pas dû réussir")
|
|
|
-
|
|
|
- def assertError(self, r):
|
|
|
- if not r.status == ERROR:
|
|
|
- raise AssertionError("Le test n'a pas levé d'erreur")
|
|
|
-
|
|
|
- def assertErrorLogged(self, result, err_msg):
|
|
|
- if not any((re.fullmatch(err_msg, err.message) for err in result.errors)):
|
|
|
- print(result.errors)
|
|
|
- raise AssertionError("Error was not logged: {}".format(err_msg))
|
|
|
-
|
|
|
- def test_load_layers(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_load_layers")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Chargement des données')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # une couche manquante et une pk manquante
|
|
|
- _pk_ini = self.schema.SiteTelecom.pk
|
|
|
- try:
|
|
|
- self.schema.SiteClient.layer = None
|
|
|
- self.schema.SiteTelecom.pk = "not_a_qgis_field"
|
|
|
-
|
|
|
- r = self.run_test("test_load_layers", True)
|
|
|
-
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, ".*Couche manquante.*")
|
|
|
- self.assertErrorLogged(r, ".*Clef primaire manquante.*")
|
|
|
- finally:
|
|
|
- self.schema.SiteTelecom.pk = _pk_ini
|
|
|
-
|
|
|
-
|
|
|
- def test_scr(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_scr")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Contrôle des projections')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # mauvaise projection
|
|
|
- crs = QgsCoordinateReferenceSystem()
|
|
|
- crs.createFromSrid(4473)
|
|
|
- self.schema.SiteTelecom.layer.setCrs(crs)
|
|
|
- r = self.run_test("test_scr", True)
|
|
|
-
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, f"Mauvaise projection.*")
|
|
|
-
|
|
|
- def test_structure_sites_telecom(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_structure_sites_telecom")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Structure des données: SiteTelecom')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # valeur non autorisée
|
|
|
- self.checker.sites_telecom[0].ST_NBPRISE = "abcd"
|
|
|
- r = self.run_test("test_structure_sites_telecom", True)
|
|
|
- self.assertFailure(r)
|
|
|
-
|
|
|
- def test_structure_sites_client(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_structure_sites_client")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Structure des données: SiteClient')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # valeur non autorisée
|
|
|
- self.checker.sites_client[0].SC_TYPFON = "invalid"
|
|
|
- r = self.run_test("test_structure_sites_client", True)
|
|
|
- self.assertFailure(r)
|
|
|
-
|
|
|
- def test_structure_cables(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_structure_cables")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Structure des données: Cables')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # valeur non autorisée
|
|
|
- self.checker.cables[0].CA_TYPFON = "invalid"
|
|
|
- r = self.run_test("test_structure_cables", True)
|
|
|
- self.assertFailure(r)
|
|
|
-
|
|
|
- def test_structure_zapbos(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_structure_zapbos")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Structure des données: Zapbo')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # valeur non autorisée
|
|
|
- self.checker.zapbos[0].ZP_ISOLE = "2"
|
|
|
- r = self.run_test("test_structure_zapbos", True)
|
|
|
- self.assertFailure(r)
|
|
|
-
|
|
|
- def test_structure_zasros(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_structure_zasros")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Structure des données: Zasro')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # valeur non autorisée
|
|
|
- self.checker.zasros[0].ZS_NBPRISE = "invalid"
|
|
|
- r = self.run_test("test_structure_zasros", True)
|
|
|
- self.assertFailure(r)
|
|
|
-
|
|
|
- def test_structure_adductions(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_structure_adductions")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Structure des données: Adduction')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # valeur non autorisée
|
|
|
- self.checker.adductions[0].AD_LONG = "invalid"
|
|
|
- r = self.run_test("test_structure_adductions", True)
|
|
|
- self.assertFailure(r)
|
|
|
-
|
|
|
- def test_geometry_validity(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_geometry_validity")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Contrôle de la validité des géométries')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # géométrie invalide
|
|
|
- self.checker.cables[0]._feature.setGeometry(QgsGeometry())
|
|
|
- r = self.run_test("test_geometry_validity", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "La géométrie de l'objet est invalide")
|
|
|
-
|
|
|
- def test_geometry_type(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_geometry_type")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Contrôle des types de géométries')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # type de géométrie invalide
|
|
|
- self.checker.cables[0]._feature.setGeometry(QgsGeometry())
|
|
|
- r = self.run_test("test_geometry_type", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "Type de géométrie invalide.*")
|
|
|
-
|
|
|
- def test_bounding_box(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_bounding_box")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Contrôle des emprises')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- # hors de l'emprise
|
|
|
- self.checker.sites_client[0]._feature.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(0,0)))
|
|
|
- r = self.run_test("test_bounding_box", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "Hors de l'emprise autorisée")
|
|
|
-
|
|
|
- def test_duplicates(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_duplicates")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Recherche de doublons')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
-
|
|
|
- new_ = SiteTelecom(self.checker.sites_telecom[0]._feature)
|
|
|
- new_.__dict__.update(self.checker.sites_telecom[0].__dict__)
|
|
|
- self.checker.sites_telecom.append(new_)
|
|
|
-
|
|
|
- r = self.run_test("test_duplicates", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "Doublons dans le champs ST_CODE")
|
|
|
-
|
|
|
- def test_dimension_zasro(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_dimension_zasro")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Contrôle le dimensionnement des ZASRO')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- self.checker.zasros[0].ZS_NBPRISE = 1000
|
|
|
- r = self.run_test("test_dimension_zasro", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "Le nombre de prises est supérieur à 850")
|
|
|
-
|
|
|
- def test_affaiblissement(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_affaiblissement")
|
|
|
-
|
|
|
- self.assertEqual(r.title, "Contrôle l'affaiblissement")
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- self.checker.sites_client[0].SC_ATT_PTO = 100
|
|
|
- r = self.run_test("test_affaiblissement", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "L'affaiblissement est supérieur à 28")
|
|
|
-
|
|
|
- def test_capacite_modulo(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_capacite_modulo")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Cohérence capacité du cable / modulo')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- self.checker.cables[0].CA_CAPFO = 24
|
|
|
- self.checker.cables[0].CA_MODULO = 12
|
|
|
- r = self.run_test("test_capacite_modulo", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "Modulo invalide.+")
|
|
|
-
|
|
|
- self.checker.cables[0].CA_CAPFO = 144
|
|
|
- self.checker.cables[0].CA_MODULO = 6
|
|
|
- r = self.run_test("test_capacite_modulo", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "Modulo invalide.+")
|
|
|
-
|
|
|
- def test_longueur_racco(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_longueur_racco")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Contrôle la longueur des raccordements')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- self.checker.adductions[0].AD_LONG = 10
|
|
|
- self.checker.adductions[0].AD_ISOLE = "1"
|
|
|
- r = self.run_test("test_longueur_racco", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "L'adduction ne devrait pas être consiudérée comme isolée.+")
|
|
|
-
|
|
|
- self.checker.adductions[0].AD_LONG = 1000
|
|
|
- self.checker.adductions[0].AD_ISOLE = "0"
|
|
|
- r = self.run_test("test_longueur_racco", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "L'adduction devrait être considérée comme isolée.+")
|
|
|
-
|
|
|
- def test_zapbos_prises(self):
|
|
|
-
|
|
|
- # cas initial: valide
|
|
|
- r = self.run_test("test_zapbos_prises")
|
|
|
-
|
|
|
- self.assertEqual(r.title, 'Topologie: Zapbos / Prises')
|
|
|
- self.assertSuccess(r)
|
|
|
- self.assertEqual(len(r.errors), 0)
|
|
|
-
|
|
|
- new_ = Zapbo(self.checker.zapbos[0]._feature)
|
|
|
- new_.__dict__.update(self.checker.zapbos[0].__dict__)
|
|
|
- self.checker.zapbos.append(new_)
|
|
|
-
|
|
|
- r = self.run_test("test_zapbos_prises", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "La prise est contenue dans plus d'une ZAPBO")
|
|
|
-
|
|
|
- new_geom = self.checker.sites_client[0]._feature.geometry()
|
|
|
- new_geom.translate(1000,1000)
|
|
|
- self.checker.sites_client[0]._feature.setGeometry(new_geom)
|
|
|
-
|
|
|
- r = self.run_test("test_zapbos_prises", True)
|
|
|
- self.assertFailure(r)
|
|
|
- self.assertErrorLogged(r, "La prise n'est contenue dans aucune ZAPBO")
|
|
|
-
|
|
|
-
|
|
|
-
|