omassot 6 роки тому
батько
коміт
f96c18d65c
3 змінених файлів з 271 додано та 49 видалено
  1. 0 1
      schemas/mn2_rec.py
  2. 29 48
      schemas/mn3_apd.py
  3. 242 0
      test/test_mn3_apd.py

+ 0 - 1
schemas/mn2_rec.py

@@ -349,7 +349,6 @@ class Mn2Checker(BaseChecker):
         """ Contrôle des emprises 
         Vérifie que les objets sont dans le périmètre attendu
         """
-        
         for model in models:
             xmin, ymin, xmax, ymax = model.bounding_box
             

+ 29 - 48
schemas/mn3_apd.py

@@ -13,6 +13,11 @@ from core.cerberus_ import is_positive_int, is_positive_float, CerberusValidator
 from core.checking import BaseChecker
 from core.mncheck import QgsModel
 
+# TODO: verifier que les zapbos intersectent au moins une zasro et même nom que cette zasro
+# TODO: verifier que toute les prises comprises dans une zapbo soient contenues dans une seule et même zasro
+
+
+
 
 logger = logging.getLogger("mncheck")
 
@@ -50,7 +55,8 @@ class SiteClient(QgsModel):
     layername = "SITE_CLIENT"
     geom_type = QgsModel.GEOM_POINT
     bounding_box = (XMIN,YMIN,XMAX,YMAX)
-    schema = {'SC_TYPFON': {'type': 'string', 'empty': False, 'allowed': ['RESIDENTIEL', 'PROFESSIONNEL', 'OPERATEUR', 'TECHNIQUE']},
+    schema = {
+              'SC_TYPFON': {'type': 'string', 'empty': False, 'allowed': ['RESIDENTIEL', 'PROFESSIONNEL', 'OPERATEUR', 'TECHNIQUE']},
               'SC_STATUT': {'type': 'string', 'empty': False, 'allowed': [STATUTS]}, 
               'SC_NBPRISE': {'empty': False, 'validator': is_positive_int}, 
               'SC_NBPRHAB': {'empty': False, 'validator': is_positive_int}, 
@@ -74,7 +80,7 @@ class Cable(QgsModel):
               'CA_TYPSTR': {'type': 'string', 'empty': False, 'allowed': ['CONDUITE', 'AERIEN', 'COLONNE MONTANTE', 'IMMERGE', 'FACADE']},
               'CA_STATUT': {'type': 'string', 'empty': False, 'allowed': [STATUTS]},
               'CA_CAPFO': {'empty': False, 'allowed': [720,576,288,144,96,72,48,24,12]},
-              'CA_MODULO': {'empty': False, 'validator': is_positive_int},
+              'CA_MODULO': {'empty': False, 'allowed': [6, 12]},
               'CA_SUPPORT': {'type': 'string', 'empty': False, 'allowed': ['0', '1']}
               }
     
@@ -87,7 +93,6 @@ class Zapbo(QgsModel):
               'ZP_STATUT': {'type': 'string', 'empty': False, 'allowed': [STATUTS]}
               }
     
-    
 class Zasro(QgsModel):
     layername = "ZASRO"
     geom_type = QgsModel.GEOM_POLYGON
@@ -240,7 +245,6 @@ class Mn3ApdChecker(BaseChecker):
         """ Recherche de doublons 
         Recherche d'éventuels doublons dans des champs qui supposent l'unicité
         """
-        
         # doublons dans ST_CODE
         tmp = []
         for site_telecom in self.sites_telecom:
@@ -251,45 +255,6 @@ class Mn3ApdChecker(BaseChecker):
             else:
                 self.log_error("Doublons dans le champs ST_CODE", item=site_telecom)
         
-    def test_graphic_duplicates(self):
-        """ Recherche de doublons graphiques """
-            
-        for i, site_telecom in enumerate(self.sites_telecom):
-            for other in self.sites_telecom[i+1:]:
-                if site_telecom.geom.equals(other.geom):
-                    self.log_error("Une entité graphique est dupliquée", item=site_telecom)
-                    continue
-                    
-        for i, site_client in enumerate(self.sites_client):
-            for other in self.sites_client[i+1:]:
-                if site_client.geom.equals(other.geom):
-                    self.log_error("Une entité graphique est dupliquée", item=site_client)
-                    continue
-
-#         for i, cable in enumerate(self.cables):
-#             for other in self.cables[i+1:]:
-#                 if cable.geom.equals(other.geom):
-#                     self.log_error("Une entité graphique est dupliquée", item=cable)
-#                     continue
-        
-        for i, zapbo in enumerate(self.zapbos):
-            for other in self.zapbos[i+1:]:
-                if zapbo.geom.equals(other.geom):
-                    self.log_error("Une entité graphique est dupliquée", item=zapbo)
-                    continue
-        
-        for i, zasro in enumerate(self.zasros):
-            for other in self.zasros[i+1:]:
-                if zasro.geom.equals(other.geom):
-                    self.log_error("Une entité graphique est dupliquée", item=zasro)
-                    continue
-        
-#         for i, adduction in enumerate(self.adductions):
-#             for other in self.adductions[i+1:]:
-#                 if adduction.geom.equals(other.geom):
-#                     self.log_error("Une entité graphique est dupliquée", item=adduction)
-#                     continue
-        
     def test_dimension_zasro(self):
         """ Contrôle le dimensionnement des ZASRO """
         for zasro in self.zasros:
@@ -301,14 +266,13 @@ class Mn3ApdChecker(BaseChecker):
         for site_client in self.site_clients:
             if site_client.ZS.NBPRISES > 28:
                 self.log_error("L'affaiblissement est supérieur à 28", item=site_client)
-        
-        
+
     def test_capacite_modulo(self):
         """ Contrôle l'affaiblissement """
         for cable in self.cables:
-            if cable.CA_CAPFO in [720,576,288,144,96,72] and cable.MODULO != "M12":
+            if cable.CA_CAPFO in [720,576,288,144,96] and cable.MODULO != 12:
                 self.log_error(f"Modulo invalide (capacite: {cable.CA_CAPFO}, modulo: {cable.MODULO})", item=cable)
-            elif cable.CA_CAPFO in [72,48,24,12] and cable.MODULO != "M6":
+            elif cable.CA_CAPFO in [48,24,12] and cable.MODULO != 6:
                 self.log_error(f"Modulo invalide (capacite: {cable.CA_CAPFO}, modulo: {cable.MODULO})", item=cable)
         
     def test_longueur_racco(self):
@@ -318,6 +282,23 @@ class Mn3ApdChecker(BaseChecker):
                 self.log_error("L'adduction ne devrait pas être consiudérée comme isolée (long.: {adduction.AD_LONG})", item=adduction)
         
             elif adduction.AD_ISOLE == "0" and adduction.AD_LONG > 100:
-                self.log_error("L'adduction devrait être consiudérée comme isolée (long.: {adduction.AD_LONG})", item=adduction)
+                self.log_error("L'adduction devrait être considérée comme isolée (long.: {adduction.AD_LONG})", item=adduction)
+        
+    def test_zapbos_prises(self):
+        """ Topologie: Zapbos / Prises 
+        Toutes les zapbo contiennent au moins une prise"""
+        
+        for zapbo in self.zapbos:
+            nb_prises = sum([site_client.SC_NBPRISE for site_client in self.site_clients if zapbo.geom.contains(site_client.geom)])
+            if not nb_prises:
+                self.log_error("La Zapbo ne contient aucune prise", item=zapbo)
+        
+        for site_client in self.site_clients:
+            nb_zapbo = len([zapbo for zapbo in self.zapbos if zapbo.geom.contains(site_client.geom)])
+            if nb_zapbo == 0:
+                self.log_error("La prise n'est contenue dans aucune ZAPBO", item=site_client)
+            elif nb_zapbo > 1:
+                self.log_error("La prise est contenue dans plus d'une ZAPBO", item=site_client)
+                
         
 checkers = [Mn3ApdChecker]

+ 242 - 0
test/test_mn3_apd.py

@@ -0,0 +1,242 @@
+'''
+
+@author: olivier.massot, janv. 2019
+'''
+from core import mncheck
+import logging
+from qgis.core import QgsCoordinateReferenceSystem, QgsGeometry, QgsPointXY
+import re
+
+from core.checking import SUCCESS, FAILURE, ERROR
+from path import Path
+from test._base import SchemaTest
+
+
+logger = logging.getLogger("mncheck")
+
+class Test(SchemaTest):
+    SCHEMA_NAME = "mn3_apd"
+    PROJECT_FILE = Path(__file__).parent / 'projects' / 'mn3_apd' / '1_valid' / '1_valid.qgz'
+
+    def setUp(self):
+        SchemaTest.setUp(self)
+        self.schema = mncheck.get_schema(self.SCHEMA_NAME)
+        if not self.schema.checkers:
+            raise AttributeError("Aucun testeur trouvé dans le schéma")
+        self.checker = self.schema.checkers[0]()
+
+    def run_test(self, test_name, dry=False):
+        results = self.checker.run(test_name, dry)
+        if len(results) == 0:
+            raise Exception("No result for test")
+        elif len(results) > 1:
+            raise Exception("More than one result from test")
+        return results[0]
+    
+    def assertSuccess(self, r):
+        if not r.status == SUCCESS:
+            raise AssertionError("Le test a échoué")
+    
+    def assertFailure(self, r):
+        if not r.status == FAILURE:
+            raise AssertionError("Le test n'aurait pas dû réussir")
+    
+    def assertError(self, r):
+        if not r.status == ERROR:
+            raise AssertionError("Le test n'a pas levé d'erreur")
+        
+    def assertErrorLogged(self, result, err_msg):
+        if not any((re.fullmatch(err_msg, err.message) for err in result.errors)):
+            print(result.errors)
+            raise AssertionError("Error was not logged: {}".format(err_msg))
+    
+    def test_load_layers(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_load_layers")
+
+        self.assertEqual(r.title, 'Chargement des données')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+        
+        # une couche manquante et une pk manquante
+        _pk_ini = self.schema.SiteTelecom.pk
+        try:
+            self.schema.SiteClient.layer = None
+            self.schema.SiteTelecom.pk = "not_a_qgis_field"
+    
+            r = self.run_test("test_load_layers", True)
+            
+            self.assertFailure(r)
+            self.assertErrorLogged(r, ".*Couche manquante.*")
+            self.assertErrorLogged(r, ".*Clef primaire manquante.*")
+        finally:
+            self.schema.SiteTelecom.pk = _pk_ini
+            
+            
+    def test_scr(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_scr")
+
+        self.assertEqual(r.title, 'Contrôle des projections')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+    
+        # mauvaise projection
+        crs = QgsCoordinateReferenceSystem()
+        crs.createFromSrid(4473)
+        self.schema.SiteTelecom.layer.setCrs(crs)
+        r = self.run_test("test_scr", True)
+        
+        self.assertFailure(r)
+        self.assertErrorLogged(r, f"Mauvaise projection.*")
+    
+    def test_structure_sites_telecom(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_structure_sites_telecom")
+
+        self.assertEqual(r.title, 'Structure des données: Sites Telecom')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+        
+        # valeur non autorisée
+        self.checker.sites_telecom[0].ST_NBPRISE = "abcd"
+        r = self.run_test("test_structure_sites_telecom", True)
+        self.assertFailure(r)
+    
+    def test_structure_sites_client(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_structure_sites_client")
+
+        self.assertEqual(r.title, 'Structure des données: Sites Clients')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+        
+        # valeur non autorisée
+        self.checker.sites_client[0].SC_TYPFON = "invalid"
+        r = self.run_test("test_structure_sites_client", True)
+        self.assertFailure(r)
+        
+    def test_structure_cables(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_structure_cables")
+
+        self.assertEqual(r.title, 'Structure des données: Cables')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+        
+        # valeur non autorisée
+        self.checker.cables[0].CA_TYPFON = "invalid"
+        r = self.run_test("test_structure_cables", True)
+        self.assertFailure(r)
+            
+    def test_structure_zapbos(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_structure_zapbos")
+
+        self.assertEqual(r.title, 'Structure des données: Zapbos')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+        
+        # valeur non autorisée
+        self.checker.zapbos[0].ZP_ISOLE = "2"
+        r = self.run_test("test_structure_zapbos", True)
+        self.assertFailure(r)
+    
+    def test_structure_zasros(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_structure_zasros")
+
+        self.assertEqual(r.title, 'Structure des données: Zasros')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+        
+        # valeur non autorisée
+        self.checker.zasros[0].ZS_NBPRISE = "invalid"
+        r = self.run_test("test_structure_zasros", True)
+        self.assertFailure(r)
+    
+    def test_structure_adductions(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_structure_adductions")
+
+        self.assertEqual(r.title, 'Structure des données: Adduction')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+        
+        # valeur non autorisée
+        self.checker.adductions[0].AD_LONG = "invalid"
+        r = self.run_test("test_structure_adductions", True)
+        self.assertFailure(r)
+    
+    def test_geometry_validity(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_geometry_validity")
+
+        self.assertEqual(r.title, 'Contrôle de la validité des géométries')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+        
+        # géométrie invalide
+        self.checker.arteres[0]._feature.setGeometry(QgsGeometry())
+        r = self.run_test("test_geometry_validity", True)
+        self.assertFailure(r)
+        self.assertErrorLogged(r, "La géométrie de l'objet est invalide")
+        
+    def test_geometry_type(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_geometry_type")
+
+        self.assertEqual(r.title, 'Contrôle des types de géométries')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+
+        # type de géométrie invalide
+        self.checker.arteres[0]._feature.setGeometry(QgsGeometry())
+        r = self.run_test("test_geometry_type", True)
+        self.assertFailure(r)
+        self.assertErrorLogged(r, "Type de géométrie invalide.*")
+
+    def test_bounding_box(self):
+        
+        # cas initial: valide
+        r = self.run_test("test_bounding_box")
+
+        self.assertEqual(r.title, 'Contrôle des emprises')
+        self.assertSuccess(r)
+        self.assertEqual(len(r.errors), 0)
+    
+        # hors de l'emprise
+        self.checker.noeuds[0]._feature.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(0,0)))
+        r = self.run_test("test_bounding_box", True)
+        self.assertFailure(r)
+        self.assertErrorLogged(r, "Hors de l'emprise autorisée")
+            
+    def test_duplicates(self):
+        pass
+            
+    def test_dimension_zasro(self):
+        pass
+    
+    def test_affaiblissement(self):
+        pass
+    
+    def test_capacite_modulo(self):
+        pass
+        
+    def test_longueur_racco(self):
+        pass
+        
+    def test_zapbos_prises(self):
+        pass
+    
+