| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- '''
- @author: olivier.massot, janv. 2019
- '''
- from core import mncheck
- import logging
- from qgis.core import QgsCoordinateReferenceSystem, QgsGeometry, QgsPointXY
- import re
- from core.checking import SUCCESS, FAILURE, ERROR
- from path import Path
- from schemas.mn3_apd import SiteTelecom, Zapbo
- from test._base import SchemaTest
- logger = logging.getLogger("mncheck")
- class Test(SchemaTest):
- SCHEMA_NAME = "mn3_apd"
- PROJECT_FILE = Path(__file__).parent / 'projects' / 'mn3_apd' / 'mn3_apd_valid.qgz'
- def setUp(self):
- SchemaTest.setUp(self)
- self.schema = mncheck.get_schema(self.SCHEMA_NAME)
- if not self.schema.checkers:
- raise AttributeError("Aucun testeur trouvé dans le schéma")
- self.checker = self.schema.checkers[0]()
- def run_test(self, test_name, dry=False):
- results = self.checker.run(test_name, dry)
- if len(results) == 0:
- raise Exception("No result for test")
- elif len(results) > 1:
- raise Exception("More than one result from test")
- return results[0]
-
- def assertSuccess(self, r):
- if not r.status == SUCCESS:
- raise AssertionError("Le test a échoué")
-
- def assertFailure(self, r):
- if not r.status == FAILURE:
- raise AssertionError("Le test n'aurait pas dû réussir")
-
- def assertError(self, r):
- if not r.status == ERROR:
- raise AssertionError("Le test n'a pas levé d'erreur")
-
- def assertErrorLogged(self, result, err_msg):
- if not any((re.fullmatch(err_msg, err.message) for err in result.errors)):
- print(result.errors)
- raise AssertionError("Error was not logged: {}".format(err_msg))
-
- def test_load_layers(self):
-
- # cas initial: valide
- r = self.run_test("test_load_layers")
- self.assertEqual(r.title, 'Chargement des données')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # une couche manquante et une pk manquante
- _pk_ini = self.schema.SiteTelecom.pk
- try:
- self.schema.SiteClient.layer = None
- self.schema.SiteTelecom.pk = "not_a_qgis_field"
-
- r = self.run_test("test_load_layers", True)
-
- self.assertFailure(r)
- self.assertErrorLogged(r, ".*Couche manquante.*")
- self.assertErrorLogged(r, ".*Clef primaire manquante.*")
- finally:
- self.schema.SiteTelecom.pk = _pk_ini
-
-
- def test_scr(self):
-
- # cas initial: valide
- r = self.run_test("test_scr")
- self.assertEqual(r.title, 'Contrôle des projections')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # mauvaise projection
- crs = QgsCoordinateReferenceSystem()
- crs.createFromSrid(4473)
- self.schema.SiteTelecom.layer.setCrs(crs)
- r = self.run_test("test_scr", True)
-
- self.assertFailure(r)
- self.assertErrorLogged(r, f"Mauvaise projection.*")
-
- def test_structure_sites_telecom(self):
-
- # cas initial: valide
- r = self.run_test("test_structure_sites_telecom")
- self.assertEqual(r.title, 'Structure des données: SiteTelecom')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # valeur non autorisée
- self.checker.sites_telecom[0].ST_NBPRISE = "abcd"
- r = self.run_test("test_structure_sites_telecom", True)
- self.assertFailure(r)
-
- def test_structure_sites_client(self):
-
- # cas initial: valide
- r = self.run_test("test_structure_sites_client")
- self.assertEqual(r.title, 'Structure des données: SiteClient')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # valeur non autorisée
- self.checker.sites_client[0].SC_TYPFON = "invalid"
- r = self.run_test("test_structure_sites_client", True)
- self.assertFailure(r)
-
- def test_structure_cables(self):
-
- # cas initial: valide
- r = self.run_test("test_structure_cables")
- self.assertEqual(r.title, 'Structure des données: Cables')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # valeur non autorisée
- self.checker.cables[0].CA_TYPFON = "invalid"
- r = self.run_test("test_structure_cables", True)
- self.assertFailure(r)
-
- def test_structure_zapbos(self):
-
- # cas initial: valide
- r = self.run_test("test_structure_zapbos")
- self.assertEqual(r.title, 'Structure des données: Zapbo')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # valeur non autorisée
- self.checker.zapbos[0].ZP_ISOLE = "2"
- r = self.run_test("test_structure_zapbos", True)
- self.assertFailure(r)
-
- def test_structure_zasros(self):
-
- # cas initial: valide
- r = self.run_test("test_structure_zasros")
- self.assertEqual(r.title, 'Structure des données: Zasro')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # valeur non autorisée
- self.checker.zasros[0].ZS_NBPRISE = "invalid"
- r = self.run_test("test_structure_zasros", True)
- self.assertFailure(r)
-
- def test_structure_adductions(self):
-
- # cas initial: valide
- r = self.run_test("test_structure_adductions")
- self.assertEqual(r.title, 'Structure des données: Adduction')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # valeur non autorisée
- self.checker.adductions[0].AD_LONG = "invalid"
- r = self.run_test("test_structure_adductions", True)
- self.assertFailure(r)
-
- def test_geometry_validity(self):
-
- # cas initial: valide
- r = self.run_test("test_geometry_validity")
- self.assertEqual(r.title, 'Contrôle de la validité des géométries')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # géométrie invalide
- self.checker.cables[0]._feature.setGeometry(QgsGeometry())
- r = self.run_test("test_geometry_validity", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "La géométrie de l'objet est invalide")
-
- def test_geometry_type(self):
-
- # cas initial: valide
- r = self.run_test("test_geometry_type")
- self.assertEqual(r.title, 'Contrôle des types de géométries')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
- # type de géométrie invalide
- self.checker.cables[0]._feature.setGeometry(QgsGeometry())
- r = self.run_test("test_geometry_type", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "Type de géométrie invalide.*")
- def test_bounding_box(self):
-
- # cas initial: valide
- r = self.run_test("test_bounding_box")
- self.assertEqual(r.title, 'Contrôle des emprises')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- # hors de l'emprise
- self.checker.sites_client[0]._feature.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(0,0)))
- r = self.run_test("test_bounding_box", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "Hors de l'emprise autorisée")
-
- def test_duplicates(self):
-
- # cas initial: valide
- r = self.run_test("test_duplicates")
- self.assertEqual(r.title, 'Recherche de doublons')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
-
- new_ = SiteTelecom(self.checker.sites_telecom[0]._feature)
- new_.__dict__.update(self.checker.sites_telecom[0].__dict__)
- self.checker.sites_telecom.append(new_)
-
- r = self.run_test("test_duplicates", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "Doublons dans le champs ST_CODE")
-
- def test_dimension_zasro(self):
-
- # cas initial: valide
- r = self.run_test("test_dimension_zasro")
- self.assertEqual(r.title, 'Contrôle le dimensionnement des ZASRO')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- self.checker.zasros[0].ZS_NBPRISE = 1000
- r = self.run_test("test_dimension_zasro", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "Le nombre de prises est supérieur à 850")
-
- def test_affaiblissement(self):
-
- # cas initial: valide
- r = self.run_test("test_affaiblissement")
- self.assertEqual(r.title, "Contrôle l'affaiblissement")
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- self.checker.sites_client[0].SC_ATT_PTO = 100
- r = self.run_test("test_affaiblissement", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "L'affaiblissement est supérieur à 28")
-
- def test_capacite_modulo(self):
-
- # cas initial: valide
- r = self.run_test("test_capacite_modulo")
- self.assertEqual(r.title, 'Cohérence capacité du cable / modulo')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- self.checker.cables[0].CA_CAPFO = 24
- self.checker.cables[0].CA_MODULO = 12
- r = self.run_test("test_capacite_modulo", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "Modulo invalide.+")
-
- self.checker.cables[0].CA_CAPFO = 144
- self.checker.cables[0].CA_MODULO = 6
- r = self.run_test("test_capacite_modulo", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "Modulo invalide.+")
-
- def test_longueur_racco(self):
-
- # cas initial: valide
- r = self.run_test("test_longueur_racco")
- self.assertEqual(r.title, 'Contrôle la longueur des raccordements')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- self.checker.adductions[0].AD_LONG = 10
- self.checker.adductions[0].AD_ISOLE = "1"
- r = self.run_test("test_longueur_racco", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "L'adduction ne devrait pas être consiudérée comme isolée.+")
-
- self.checker.adductions[0].AD_LONG = 1000
- self.checker.adductions[0].AD_ISOLE = "0"
- r = self.run_test("test_longueur_racco", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "L'adduction devrait être considérée comme isolée.+")
-
- def test_zapbos_prises(self):
-
- # cas initial: valide
- r = self.run_test("test_zapbos_prises")
- self.assertEqual(r.title, 'Topologie: Zapbos / Prises')
- self.assertSuccess(r)
- self.assertEqual(len(r.errors), 0)
-
- new_ = Zapbo(self.checker.zapbos[0]._feature)
- new_.__dict__.update(self.checker.zapbos[0].__dict__)
- self.checker.zapbos.append(new_)
-
- r = self.run_test("test_zapbos_prises", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "La prise est contenue dans plus d'une ZAPBO")
-
- new_geom = self.checker.sites_client[0]._feature.geometry()
- new_geom.translate(1000,1000)
- self.checker.sites_client[0]._feature.setGeometry(new_geom)
-
- r = self.run_test("test_zapbos_prises", True)
- self.assertFailure(r)
- self.assertErrorLogged(r, "La prise n'est contenue dans aucune ZAPBO")
-
-
|