Jelajahi Sumber

add logic tests to mn3_apd

omassot 6 tahun lalu
induk
melakukan
4662b96f19
1 mengubah file dengan 205 tambahan dan 3 penghapusan
  1. 205 3
      schemas/mn3_apd.py

+ 205 - 3
schemas/mn3_apd.py

@@ -8,7 +8,8 @@
 import logging
 from qgis.core import QgsProject
 
-from core.cerberus_ import is_positive_int, is_positive_float
+from core.cerberus_ import is_positive_int, is_positive_float, CerberusValidator, \
+    CerberusErrorHandler, _translate_messages
 from core.checking import BaseChecker
 from core.mncheck import QgsModel
 
@@ -72,7 +73,7 @@ class Cable(QgsModel):
     schema = {'CA_TYPFON': {'type': 'string', 'empty': False, 'allowed': ['COLLECTE TRANSPORT DISTRIBUTION', 'COLLECTE', 'COLLECTE TRANSPORT', 'COLLECTE DISTRIBUTION', 'TRANSPORT DISTRIBUTION', 'TRANSPORT', 'DISTRIBUTION', 'RACCORDEMENT FINAL', 'BOUCLE METROPOLITAINE', 'LONGUE DISTANCE (LONG HAUL)', 'NON COMMUNIQUE']},
               'CA_TYPSTR': {'type': 'string', 'empty': False, 'allowed': ['CONDUITE', 'AERIEN', 'COLONNE MONTANTE', 'IMMERGE', 'FACADE']},
               'CA_STATUT': {'type': 'string', 'empty': False, 'allowed': [STATUTS]},
-              'CA_CAPFO': {'empty': False, 'validator': is_positive_int},
+              'CA_CAPFO': {'empty': False, 'allowed': [720,576,288,144,96,72,48,24,12]},
               'CA_MODULO': {'empty': False, 'validator': is_positive_int},
               'CA_SUPPORT': {'type': 'string', 'empty': False, 'allowed': ['0', '1']}
               }
@@ -102,7 +103,8 @@ class Adduction(QgsModel):
     schema = {'AD_ISOLE': {'type': 'string', 'empty': False, 'allowed': ['0', '1']},
               'AD_LONG': {'empty': False, 'validator': is_positive_float}
               }
-models = []
+    
+models = [SiteTelecom, SiteClient, Cable, Zapbo, Zasro, Adduction]
     
 ####### Validateur
 
@@ -117,5 +119,205 @@ class Mn3ApdChecker(BaseChecker):
                           if l.name().lower() == model.layername.lower()), None)
             self.dataset[model] = [model(f) for f in model.layer.getFeatures()] if model.layer else []
         
+        self.sites_telecom = self.dataset.get(SiteTelecom, [])
+        self.sites_client = self.dataset.get(SiteClient, [])
+        self.cables = self.dataset.get(Cable, [])
+        self.zapbos = self.dataset.get(Zapbo, [])
+        self.zasros = self.dataset.get(Zasro, [])
+        self.adductions = self.dataset.get(Adduction, [])
+        
+    def test_load_layers(self):
+        """ Chargement des données
+            Contrôle la présence des couches attendues
+        """
+        for model in models:
+            if model.layer is None:
+                self.log_error("Couche manquante", model=model)
+                continue
+            
+            if model.pk:
+                if not model.pk.lower() in [f.name().lower() for f in model.layer.fields()]:
+                    self.log_error(f"Clef primaire manquante ({model.pk})", model=model)
+                    continue
+        
+    def test_scr(self):
+        """ Contrôle des projections 
+        Vérifie que les couches ont le bon sytème de projection
+        """
+        for model in models:
+            if model.layer and model.layer.crs().authid() != model.crs:
+                self.log_error(f"Mauvaise projection (attendu: {model.crs})", model=model)
+
+    def _validate_structure(self, model, items):
+        v = CerberusValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
+        
+        for item in items:
+            v.validate(item.__dict__)
+            
+            for field, verrors in v.errors.items():
+                for err in verrors:
+                    self.log_error(f"{field} : {_translate_messages(err)}", item=item)
+
+    def test_structure_sites_telecom(self):
+        """ Structure des données: SiteTelecom 
+        Contrôle les données attributaires de la couche SITE_TELECOM: 
+        présence, format, valeurs autorisées...
+        """       
+        self._validate_structure(SiteTelecom, self.sites_telecom)
+
+    def test_structure_sites_client(self):
+        """ Structure des données: SiteClient 
+        Contrôle les données attributaires de la couche SITE_CLIENT: 
+        présence, format, valeurs autorisées...
+        """
+        self._validate_structure(SiteClient, self.sites_client)
+
+    def test_structure_cables(self):
+        """ Structure des données: Cables 
+        Contrôle les données attributaires de la couche CABLE: 
+        présence, format, valeurs autorisées...
+        """
+        self._validate_structure(Cable, self.cables)
+        
+    def test_structure_zapbos(self):
+        """ Structure des données: Zapbo 
+        Contrôle les données attributaires de la couche ZAPBO: 
+        présence, format, valeurs autorisées...
+        """
+        self._validate_structure(Zapbo, self.zapbos)
+
+    def test_structure_zasros(self):
+        """ Structure des données: Zasro 
+        Contrôle les données attributaires de la couche ZASRO: 
+        présence, format, valeurs autorisées...
+        """
+        self._validate_structure(Zasro, self.zasros)
+
+    def test_structure_adductions(self):
+        """ Structure des données: Adduction 
+        Contrôle les données attributaires de la couche ADDUCTION: 
+        présence, format, valeurs autorisées...
+        """
+        self._validate_structure(Adduction, self.adductions)
+
+    def test_geometry_validity(self):
+        """ Contrôle de la validité des géométries 
+        """
+        for model in models:
+            for item in self.dataset[model]:
+                if not item.is_geometry_valid():
+                    self.log_error("La géométrie de l'objet est invalide", item=item)
+
+    def test_geometry_type(self):
+        """ Contrôle des types de géométries 
+        """
+        for model in models:
+            for item in self.dataset[model]:
+                item_geom_type = item.get_geom_type()
+                if item_geom_type != model.geom_type:
+                    expected, found = model.get_geom_name(model.geom_type), model.get_geom_name(item_geom_type)
+                    self.log_error(f"Type de géométrie invalide (attendu: {expected}, trouvé: {found})", item=item)
+                    
+    def test_bounding_box(self):
+        """ Contrôle des emprises 
+        Vérifie que les objets sont dans le périmètre attendu
+        """
+        
+        for model in models:
+            xmin, ymin, xmax, ymax = model.bounding_box
+            
+            for item in self.dataset[model]:
+                if not item.is_geometry_valid():
+                    continue
+                
+                x1, y1, x2, y2 = item.get_bounding_box()
+                
+                if any(x < xmin or x > xmax for x in (x1, x2)) or \
+                   any(y < ymin or y > ymax for y in (y1, y2)):
+                    self.log_error("Hors de l'emprise autorisée", item=item)
+        
+    def test_duplicates(self):
+        """ Recherche de doublons 
+        Recherche d'éventuels doublons dans des champs qui supposent l'unicité
+        """
+        
+        # doublons dans ST_CODE
+        tmp = []
+        for site_telecom in self.sites_telecom:
+            if not site_telecom.ST_CODE:
+                continue
+            if not site_telecom.ST_CODE in tmp:
+                tmp.append(site_telecom.ST_CODE)
+            else:
+                self.log_error("Doublons dans le champs ST_CODE", item=site_telecom)
+        
+    def test_graphic_duplicates(self):
+        """ Recherche de doublons graphiques """
+            
+        for i, site_telecom in enumerate(self.sites_telecom):
+            for other in self.sites_telecom[i+1:]:
+                if site_telecom.geom.equals(other.geom):
+                    self.log_error("Une entité graphique est dupliquée", item=site_telecom)
+                    continue
+                    
+        for i, site_client in enumerate(self.sites_client):
+            for other in self.sites_client[i+1:]:
+                if site_client.geom.equals(other.geom):
+                    self.log_error("Une entité graphique est dupliquée", item=site_client)
+                    continue
+
+#         for i, cable in enumerate(self.cables):
+#             for other in self.cables[i+1:]:
+#                 if cable.geom.equals(other.geom):
+#                     self.log_error("Une entité graphique est dupliquée", item=cable)
+#                     continue
+        
+        for i, zapbo in enumerate(self.zapbos):
+            for other in self.zapbos[i+1:]:
+                if zapbo.geom.equals(other.geom):
+                    self.log_error("Une entité graphique est dupliquée", item=zapbo)
+                    continue
+        
+        for i, zasro in enumerate(self.zasros):
+            for other in self.zasros[i+1:]:
+                if zasro.geom.equals(other.geom):
+                    self.log_error("Une entité graphique est dupliquée", item=zasro)
+                    continue
+        
+#         for i, adduction in enumerate(self.adductions):
+#             for other in self.adductions[i+1:]:
+#                 if adduction.geom.equals(other.geom):
+#                     self.log_error("Une entité graphique est dupliquée", item=adduction)
+#                     continue
+        
+    def test_dimension_zasro(self):
+        """ Contrôle le dimensionnement des ZASRO """
+        for zasro in self.zasros:
+            if zasro.ZS.NBPRISES > 850:
+                self.log_error("Le nombre de prises est supérieur à 850", item=zasro)
+        
+    def test_affaiblissement(self):
+        """ Contrôle l'affaiblissement """
+        for site_client in self.site_clients:
+            if site_client.ZS.NBPRISES > 28:
+                self.log_error("L'affaiblissement est supérieur à 28", item=site_client)
+        
+        
+    def test_capacite_modulo(self):
+        """ Contrôle l'affaiblissement """
+        for cable in self.cables:
+            if cable.CA_CAPFO in [720,576,288,144,96,72] and cable.MODULO != "M12":
+                self.log_error(f"Modulo invalide (capacite: {cable.CA_CAPFO}, modulo: {cable.MODULO})", item=cable)
+            elif cable.CA_CAPFO in [72,48,24,12] and cable.MODULO != "M6":
+                self.log_error(f"Modulo invalide (capacite: {cable.CA_CAPFO}, modulo: {cable.MODULO})", item=cable)
+        
+    def test_longueur_racco(self):
+        """ Contrôle la longueur des raccos """
+        for adduction in self.adductions:
+            if adduction.AD_ISOLE == "1" and adduction.AD_LONG <= 100:
+                self.log_error("L'adduction ne devrait pas être consiudérée comme isolée (long.: {adduction.AD_LONG})", item=adduction)
+        
+            elif adduction.AD_ISOLE == "0" and adduction.AD_LONG > 100:
+                self.log_error("L'adduction devrait être consiudérée comme isolée (long.: {adduction.AD_LONG})", item=adduction)
         
 checkers = [Mn3ApdChecker]