Przeglądaj źródła

Ajoute deux tests utilisant la couche des prises

omassot 7 lat temu
rodzic
commit
bd72056db6
3 zmienionych plików z 122 dodań i 46 usunięć
  1. 12 3
      core/checking.py
  2. 55 21
      schemas/mn1_rec.py
  3. 55 22
      schemas/mn2_rec.py

+ 12 - 3
core/checking.py

@@ -26,6 +26,9 @@ def _linenumber(m):
     except AttributeError:
         return -1
 
+class CheckingException(Exception):
+    pass
+
 class TestError():
     def __init__(self, message, info = {}, critical=False):
         self.message = message
@@ -77,12 +80,15 @@ class TestResult():
         error = TestError(message, info, critical)
         self.errors.append(error)
 
-    def handle_exception(self, exc_info):
+    def log_exception(self, message, info={}):
         self._status = ERROR
+        error = TestError(message, info)
+        self.errors.append(error)
+        
+    def handle_exception(self, exc_info):
         typ, value, trace = exc_info
-        error = TestError("Une erreur inconnue s'est produite", 
+        self.log_exception("Une erreur inconnue s'est produite", 
                           {"exc_info": "{}\n{}\n{}".format(typ.__name__, value, ''.join(traceback.format_tb(trace)))})
-        self.errors.append(error)
 
 class Comlink():
     def _started_test(self, test):
@@ -136,6 +142,9 @@ class BaseChecker():
                 
                 if not dry:
                     self.tearDown()
+            
+            except CheckingException as e:
+                result.log_exception(str(e))
             except:
                 result.handle_exception(sys.exc_info())
             

+ 55 - 21
schemas/mn1_rec.py

@@ -12,7 +12,7 @@ from core.cerberus_ import is_float, is_multi_int, is_int, \
     is_modern_french_date, CerberusValidator, CerberusErrorHandler, \
     _translate_messages, is_positive_float, is_positive_int, \
     is_strictly_positive_float, is_strictly_positive_int
-from core.checking import BaseChecker
+from core.checking import BaseChecker, CheckingException
 from core.mncheck import QgsModel
 
 
@@ -25,6 +25,8 @@ CRS = 'EPSG:3949' # Coordinate Reference System
 
 TOLERANCE = 1.0
 
+COUCHE_PRISES = "FTTH_SITES_GEO_VALIDE"
+
 class Artere(QgsModel):
     layername = "artere_geo"
     geom_type = QgsModel.GEOM_LINE
@@ -214,7 +216,15 @@ class Mn1Checker(BaseChecker):
     
         for equipement in self.equipements:
             equipement.noeud = next((n for n in self.noeuds if n.NO_NOM == equipement.EQ_NOM_NOE), None)
-    
+        
+    def _load_prises(self):
+        """ on charge la couche des prises si elle existe """
+        self.prises = []
+        prises_layer = next((l for l in QgsProject.instance().mapLayers().values() \
+                          if l.name().lower() == COUCHE_PRISES.lower()), None)
+        if prises_layer:
+            self.prises = [QgsModel(f) for f in prises_layer.getFeatures()]
+        
     def test_load_layers(self):
         """ Chargement des données
             Contrôle la présence des couches attendues
@@ -518,23 +528,28 @@ class Mn1Checker(BaseChecker):
                     self.log_error("Le nombre de fourreaux disponibles (CA_NB_FO_D) doit être inférieur au nombre total (CA_NB_FO)", item=cable)
             except (TypeError, ValueError):
                 pass
-         
+    
+    def _za_for_pbo(self, pbo):
+        # retourne la ZAPBO correspondant à la PBO en parametre, None si aucune
+        if hasattr(pbo, 'zapbo') and pbo.zapbo:
+            return pbo.zapbo
+        for zapbo in self.zapbos:
+            if zapbo.geom.contains(pbo.geom) and pbo.EQ_NOM in zapbo.ID_ZAPBO:
+                return zapbo
+        return None
+    
     def test_pbos(self):
         """ Topologie: PBO / ZAPBO
         Compare la géométrie et le nom des équipements de type PBO à celle des ZAPBO
         """
-        
         # Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
         for equipement in self.equipements:
             if not equipement.EQ_TYPE == "PBO":
                 continue
             
             #zapbos englobant l'equipement
-            candidates = []
-            for zapbo in self.zapbos:
-                if zapbo.geom.contains(equipement.geom):
-                    candidates.append(zapbo)
-                    
+            candidates = [z for z in self.zapbos if z.geom.contains(equipement.geom)]
+
             # le pbo doit être contenu dans une zapbo
             if not candidates:
                 self.log_error("Le PBO n'est contenu dans aucune ZAPBO", item=equipement)
@@ -546,28 +561,47 @@ class Mn1Checker(BaseChecker):
             except StopIteration:
                 self.log_error("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contiennent", item=equipement)
                 break
-            
-    # a venir (webservice?)
-    def __test_pbo_dimension(self):
-        """ Dimensionnement des PBO """
+    
+    def test_zapbos_prises(self):
+        """ Topologie: Zapbos / Prises 
+        Toutes les zapbo contiennent au moins une prise"""
+        
+        self._load_prises()
+        
+        if not self.prises:
+            raise CheckingException("La couche des prises n'est pas chargée")
+        
+        for zapbo in self.zapbos:
+            if not next((prise for prise in self.prises if zapbo.geom.contains(prise.geom)), 0):
+                self.log_error("La Zapbo ne contient aucune prise", item=zapbo)
+        
+    def test_pbo_dimension(self):
+        """ Dimensionnement des PBO 
+        Compare la couche des prises (si chargée) au dimensionnement des PBO"""
+        
+        self._load_prises()
+        
+        if not self.prises:
+            raise CheckingException("La couche des prises n'est pas chargée")
+        
         for equipement in self.equipements:
             if not equipement.EQ_TYPE == "PBO":
                 continue
             
-            if not hasattr(equipement.zapbo, "nb_prises") or equipement.zapbo.nb_prises is None:
-                equipement.zapbo.nb_prises = 0
-                
+            zapbo = self._za_for_pbo(equipement)
+            zapbo.nb_prises = len([prise for prise in self.prises if zapbo.geom.contains(prise.geom)])
+            
             # Controle du dimensionnement des PBO
-            if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
+            if equipement.EQ_TYPE_PH == 'PBO 6' and not zapbo.nb_prises < 6:
                 self.log_error("Le PBO 6 contient plus de 5 prises", item=equipement)
          
-            if equipement.EQ_TYPE_PH == 'PBO 12' and not equipement.zapbo.nb_prises >= 6 and equipement.zapbo.nb_prises <= 8:
-                self.log_error("Le PBO 12 contient mois de 6 prises ou plus de 8 prises", item=equipement)
+            if equipement.EQ_TYPE_PH == 'PBO 12' and not zapbo.nb_prises >= 6 and zapbo.nb_prises <= 8:
+                self.log_error("Le PBO 12 contient moins de 6 prises ou plus de 8 prises", item=equipement)
          
-            if equipement.zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
+            if zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
                 self.log_error("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO", item=equipement)
          
-            if equipement.EQ_STATUT == "REC" and not equipement.zapbo.STATUT == "REC" and not equipement.zapbo.ID_ZAPBO[:4].lower() == "att_":
+            if equipement.EQ_STATUT == "REC" and not zapbo.STATUT == "REC" and not zapbo.ID_ZAPBO[:4].lower() == "att_":
                 self.log_error("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO", item=equipement)
         
 

+ 55 - 22
schemas/mn2_rec.py

@@ -12,7 +12,7 @@ from core.cerberus_ import is_float, is_multi_int, is_int, \
     is_modern_french_date, CerberusValidator, CerberusErrorHandler, \
     _translate_messages, is_positive_int, is_strictly_positive_float, \
     is_positive_float, is_strictly_positive_int
-from core.checking import BaseChecker
+from core.checking import BaseChecker, CheckingException
 from core.mncheck import QgsModel
 
 
@@ -28,6 +28,8 @@ CRS = 'EPSG:3949' # Coordinate Reference System
 
 TOLERANCE = 1.0
 
+COUCHE_PRISES = "FTTH_SITES_GEO_VALIDE"
+
 ##### Modeles
 
 
@@ -237,6 +239,14 @@ class Mn2Checker(BaseChecker):
         for equipement in self.equipements:
             equipement.noeud = next((n for n in self.noeuds if n.NO_CODE == equipement.EQ_NOM_NOE), None)
         
+    def _load_prises(self):
+        """ on charge la couche des prises si elle existe """
+        self.prises = []
+        prises_layer = next((l for l in QgsProject.instance().mapLayers().values() \
+                          if l.name().lower() == COUCHE_PRISES.lower()), None)
+        if prises_layer:
+            self.prises = [QgsModel(f) for f in prises_layer.getFeatures()]
+        
     def test_load_layers(self):
         """ Chargement des données
             Contrôle la présence des couches attendues
@@ -538,23 +548,28 @@ class Mn2Checker(BaseChecker):
                     self.log_error("Le nombre de fourreaux disponibles (AR_FOU_DIS) doit être inférieur au nombre total (AR_NB_FOUR)", item=artere)
             except (TypeError, ValueError):
                 pass
-         
+
+    def _za_for_pbo(self, pbo):
+        # retourne la ZAPBO correspondant à la PBO en parametre, None si aucune
+        if hasattr(pbo, 'zapbo') and pbo.zapbo:
+            return pbo.zapbo
+        for zapbo in self.zapbos:
+            if zapbo.geom.contains(pbo.geom) and pbo.EQ_NOM in zapbo.ID_ZAPBO:
+                return zapbo
+        return None
+    
     def test_pbos(self):
         """ Topologie: PBO / ZAPBO
         Compare la géométrie et le nom des équipements de type PBO à celle des ZAPBO
         """
-        
         # Verifier que chaque equipement de type PBO est contenu dans une zapbo, et que le nom de la zapbo contient le nom de l'equipement
         for equipement in self.equipements:
             if not equipement.EQ_TYPE == "PBO":
                 continue
             
             #zapbos englobant l'equipement
-            candidates = []
-            for zapbo in self.zapbos:
-                if zapbo.geom.contains(equipement.geom):
-                    candidates.append(zapbo)
-                    
+            candidates = [z for z in self.zapbos if z.geom.contains(equipement.geom)]
+
             # le pbo doit être contenu dans une zapbo
             if not candidates:
                 self.log_error("Le PBO n'est contenu dans aucune ZAPBO", item=equipement)
@@ -562,33 +577,51 @@ class Mn2Checker(BaseChecker):
              
             # On se base sur le nom pour trouver la zapbo correspondante
             try:
-                equipement.zapbo = next((z for z in candidates if equipement.EQ_CODE in z.ID_ZAPBO))
+                equipement.zapbo = next((z for z in candidates if equipement.EQ_NOM in z.ID_ZAPBO))
             except StopIteration:
                 self.log_error("Le nom du PBO ne coincide avec le nom d'aucune des ZAPBO qui le contiennent", item=equipement)
                 break
-            
-
-    # a venir (webservice?)
-    def __test_pbo_dimension(self):
-        """ Dimensionnement des PBO """
+    
+    def test_zapbos_prises(self):
+        """ Topologie: Zapbos / Prises 
+        Toutes les zapbo contiennent au moins une prise"""
+        
+        self._load_prises()
+        
+        if not self.prises:
+            raise CheckingException("La couche des prises n'est pas chargée")
+        
+        for zapbo in self.zapbos:
+            if not next((prise for prise in self.prises if zapbo.geom.contains(prise.geom)), 0):
+                self.log_error("La Zapbo ne contient aucune prise", item=zapbo)
+        
+    def test_pbo_dimension(self):
+        """ Dimensionnement des PBO 
+        Compare la couche des prises (si chargée) au dimensionnement des PBO"""
+        
+        self._load_prises()
+        
+        if not self.prises:
+            raise CheckingException("La couche des prises n'est pas chargée")
+        
         for equipement in self.equipements:
             if not equipement.EQ_TYPE == "PBO":
                 continue
             
-            if not hasattr(equipement.zapbo, "nb_prises") or equipement.zapbo.nb_prises is None:
-                equipement.zapbo.nb_prises = 0
-                
+            zapbo = self._za_for_pbo(equipement)
+            zapbo.nb_prises = len([prise for prise in self.prises if zapbo.geom.contains(prise.geom)])
+            
             # Controle du dimensionnement des PBO
-            if equipement.EQ_TYPE_PH == 'PBO 6' and not equipement.zapbo.nb_prises < 6:
+            if equipement.EQ_TYPE_PH == 'PBO 6' and not zapbo.nb_prises < 6:
                 self.log_error("Le PBO 6 contient plus de 5 prises", item=equipement)
          
-            if equipement.EQ_TYPE_PH == 'PBO 12' and not equipement.zapbo.nb_prises >= 6 and equipement.zapbo.nb_prises <= 8:
-                self.log_error("Le PBO 12 contient mois de 6 prises ou plus de 8 prises", item=equipement)
+            if equipement.EQ_TYPE_PH == 'PBO 12' and not zapbo.nb_prises >= 6 and zapbo.nb_prises <= 8:
+                self.log_error("Le PBO 12 contient moins de 6 prises ou plus de 8 prises", item=equipement)
          
-            if equipement.zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
+            if zapbo.STATUT == "REC" and not equipement.EQ_STATUT == "REC":
                 self.log_error("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO", item=equipement)
          
-            if equipement.EQ_STATUT == "REC" and not equipement.zapbo.STATUT == "REC" and not equipement.zapbo.ID_ZAPBO[:4].lower() == "att_":
+            if equipement.EQ_STATUT == "REC" and not zapbo.STATUT == "REC" and not zapbo.ID_ZAPBO[:4].lower() == "att_":
                 self.log_error("Le statut du PBO n'est pas cohérent avec le statut de sa ZAPBO", item=equipement)