''' @author: olivier.massot, janv. 2019 ''' from core import mncheck import logging from qgis.core import QgsCoordinateReferenceSystem, QgsGeometry, QgsPointXY import re from core.checking import SUCCESS, FAILURE, ERROR from path import Path from schemas.mn3_apd import SiteTelecom, Zapbo from test._base import SchemaTest logger = logging.getLogger("mncheck") class Test(SchemaTest): SCHEMA_NAME = "mn3_apd" PROJECT_FILE = Path(__file__).parent / 'projects' / 'mn3_apd' / 'mn3_apd_valid.qgz' def setUp(self): SchemaTest.setUp(self) self.schema = mncheck.get_schema(self.SCHEMA_NAME) if not self.schema.checkers: raise AttributeError("Aucun testeur trouvé dans le schéma") self.checker = self.schema.checkers[0]() def run_test(self, test_name, dry=False): results = self.checker.run(test_name, dry) if len(results) == 0: raise Exception("No result for test") elif len(results) > 1: raise Exception("More than one result from test") return results[0] def assertSuccess(self, r): if not r.status == SUCCESS: raise AssertionError("Le test a échoué") def assertFailure(self, r): if not r.status == FAILURE: raise AssertionError("Le test n'aurait pas dû réussir") def assertError(self, r): if not r.status == ERROR: raise AssertionError("Le test n'a pas levé d'erreur") def assertErrorLogged(self, result, err_msg): if not any((re.fullmatch(err_msg, err.message) for err in result.errors)): print(result.errors) raise AssertionError("Error was not logged: {}".format(err_msg)) def test_load_layers(self): # cas initial: valide r = self.run_test("test_load_layers") self.assertEqual(r.title, 'Chargement des données') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # une couche manquante et une pk manquante _pk_ini = self.schema.SiteTelecom.pk try: self.schema.SiteClient.layer = None self.schema.SiteTelecom.pk = "not_a_qgis_field" r = self.run_test("test_load_layers", True) self.assertFailure(r) self.assertErrorLogged(r, ".*Couche manquante.*") self.assertErrorLogged(r, ".*Clef primaire manquante.*") finally: self.schema.SiteTelecom.pk = _pk_ini def test_scr(self): # cas initial: valide r = self.run_test("test_scr") self.assertEqual(r.title, 'Contrôle des projections') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # mauvaise projection crs = QgsCoordinateReferenceSystem() crs.createFromSrid(4473) self.schema.SiteTelecom.layer.setCrs(crs) r = self.run_test("test_scr", True) self.assertFailure(r) self.assertErrorLogged(r, f"Mauvaise projection.*") def test_structure_sites_telecom(self): # cas initial: valide r = self.run_test("test_structure_sites_telecom") self.assertEqual(r.title, 'Structure des données: SiteTelecom') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # valeur non autorisée self.checker.sites_telecom[0].ST_NBPRISE = "abcd" r = self.run_test("test_structure_sites_telecom", True) self.assertFailure(r) def test_structure_sites_client(self): # cas initial: valide r = self.run_test("test_structure_sites_client") self.assertEqual(r.title, 'Structure des données: SiteClient') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # valeur non autorisée self.checker.sites_client[0].SC_TYPFON = "invalid" r = self.run_test("test_structure_sites_client", True) self.assertFailure(r) def test_structure_cables(self): # cas initial: valide r = self.run_test("test_structure_cables") self.assertEqual(r.title, 'Structure des données: Cables') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # valeur non autorisée self.checker.cables[0].CA_TYPFON = "invalid" r = self.run_test("test_structure_cables", True) self.assertFailure(r) def test_structure_zapbos(self): # cas initial: valide r = self.run_test("test_structure_zapbos") self.assertEqual(r.title, 'Structure des données: Zapbo') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # valeur non autorisée self.checker.zapbos[0].ZP_ISOLE = "2" r = self.run_test("test_structure_zapbos", True) self.assertFailure(r) def test_structure_zasros(self): # cas initial: valide r = self.run_test("test_structure_zasros") self.assertEqual(r.title, 'Structure des données: Zasro') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # valeur non autorisée self.checker.zasros[0].ZS_NBPRISE = "invalid" r = self.run_test("test_structure_zasros", True) self.assertFailure(r) def test_structure_adductions(self): # cas initial: valide r = self.run_test("test_structure_adductions") self.assertEqual(r.title, 'Structure des données: Adduction') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # valeur non autorisée self.checker.adductions[0].AD_LONG = "invalid" r = self.run_test("test_structure_adductions", True) self.assertFailure(r) def test_geometry_validity(self): # cas initial: valide r = self.run_test("test_geometry_validity") self.assertEqual(r.title, 'Contrôle de la validité des géométries') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # géométrie invalide self.checker.cables[0]._feature.setGeometry(QgsGeometry()) r = self.run_test("test_geometry_validity", True) self.assertFailure(r) self.assertErrorLogged(r, "La géométrie de l'objet est invalide") def test_geometry_type(self): # cas initial: valide r = self.run_test("test_geometry_type") self.assertEqual(r.title, 'Contrôle des types de géométries') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # type de géométrie invalide self.checker.cables[0]._feature.setGeometry(QgsGeometry()) r = self.run_test("test_geometry_type", True) self.assertFailure(r) self.assertErrorLogged(r, "Type de géométrie invalide.*") def test_bounding_box(self): # cas initial: valide r = self.run_test("test_bounding_box") self.assertEqual(r.title, 'Contrôle des emprises') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) # hors de l'emprise self.checker.sites_client[0]._feature.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(0,0))) r = self.run_test("test_bounding_box", True) self.assertFailure(r) self.assertErrorLogged(r, "Hors de l'emprise autorisée") def test_duplicates(self): # cas initial: valide r = self.run_test("test_duplicates") self.assertEqual(r.title, 'Recherche de doublons') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) new_ = SiteTelecom(self.checker.sites_telecom[0]._feature) new_.__dict__.update(self.checker.sites_telecom[0].__dict__) self.checker.sites_telecom.append(new_) r = self.run_test("test_duplicates", True) self.assertFailure(r) self.assertErrorLogged(r, "Doublons dans le champs ST_CODE") def test_dimension_zasro(self): # cas initial: valide r = self.run_test("test_dimension_zasro") self.assertEqual(r.title, 'Contrôle le dimensionnement des ZASRO') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) self.checker.zasros[0].ZS_NBPRISE = 1000 r = self.run_test("test_dimension_zasro", True) self.assertFailure(r) self.assertErrorLogged(r, "Le nombre de prises est supérieur à 850") def test_affaiblissement(self): # cas initial: valide r = self.run_test("test_affaiblissement") self.assertEqual(r.title, "Contrôle l'affaiblissement") self.assertSuccess(r) self.assertEqual(len(r.errors), 0) self.checker.sites_client[0].SC_ATT_PTO = 100 r = self.run_test("test_affaiblissement", True) self.assertFailure(r) self.assertErrorLogged(r, "L'affaiblissement est supérieur à 28") def test_capacite_modulo(self): # cas initial: valide r = self.run_test("test_capacite_modulo") self.assertEqual(r.title, 'Cohérence capacité du cable / modulo') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) self.checker.cables[0].CA_CAPFO = 24 self.checker.cables[0].CA_MODULO = 12 r = self.run_test("test_capacite_modulo", True) self.assertFailure(r) self.assertErrorLogged(r, "Modulo invalide.+") self.checker.cables[0].CA_CAPFO = 144 self.checker.cables[0].CA_MODULO = 6 r = self.run_test("test_capacite_modulo", True) self.assertFailure(r) self.assertErrorLogged(r, "Modulo invalide.+") def test_longueur_racco(self): # cas initial: valide r = self.run_test("test_longueur_racco") self.assertEqual(r.title, 'Contrôle la longueur des raccordements') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) self.checker.adductions[0].AD_LONG = 10 self.checker.adductions[0].AD_ISOLE = "1" r = self.run_test("test_longueur_racco", True) self.assertFailure(r) self.assertErrorLogged(r, "L'adduction ne devrait pas être consiudérée comme isolée.+") self.checker.adductions[0].AD_LONG = 1000 self.checker.adductions[0].AD_ISOLE = "0" r = self.run_test("test_longueur_racco", True) self.assertFailure(r) self.assertErrorLogged(r, "L'adduction devrait être considérée comme isolée.+") def test_zapbos_prises(self): # cas initial: valide r = self.run_test("test_zapbos_prises") self.assertEqual(r.title, 'Topologie: Zapbos / Prises') self.assertSuccess(r) self.assertEqual(len(r.errors), 0) new_ = Zapbo(self.checker.zapbos[0]._feature) new_.__dict__.update(self.checker.zapbos[0].__dict__) self.checker.zapbos.append(new_) r = self.run_test("test_zapbos_prises", True) self.assertFailure(r) self.assertErrorLogged(r, "La prise est contenue dans plus d'une ZAPBO") new_geom = self.checker.sites_client[0]._feature.geometry() new_geom.translate(1000,1000) self.checker.sites_client[0]._feature.setGeometry(new_geom) r = self.run_test("test_zapbos_prises", True) self.assertFailure(r) self.assertErrorLogged(r, "La prise n'est contenue dans aucune ZAPBO")