mn3_apd.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. '''
  2. Schéma de validation des données MN2
  3. @author: olivier.massot, 2018
  4. '''
  5. import logging
  6. from qgis.core import QgsProject
  7. from core.cerberus_ import is_positive_int, is_positive_float, CerberusValidator, \
  8. CerberusErrorHandler, _translate_messages
  9. from core.checking import BaseChecker
  10. from core.mncheck import QgsModel
  11. # TODO: verifier que les zapbos intersectent au moins une zasro et même nom que cette zasro
  12. # TODO: verifier que toute les prises comprises dans une zapbo soient contenues dans une seule et même zasro
  13. logger = logging.getLogger("mncheck")
  14. SCHEMA_NAME = "Schéma MN v3 APD"
  15. XMIN, XMAX, YMIN, YMAX = 1341999, 1429750, 8147750, 8294000
  16. CRS = 'EPSG:3949' # Coordinate Reference System
  17. TOLERANCE = 0.5
  18. STATUTS = ['ETUDE PRELIMINAIRE', 'ETUDE DE DIAGNOSTIC', 'AVANT-PROJET PROJET', 'PROJET', 'PASSATION DES MARCHES DE TRAVAUX',
  19. 'ETUDE D EXECUTION', 'TRAVAUX' , 'RECOLEMENT', 'MAINTIEN EN CONDITIONS OPERATIONNELLES']
  20. ##### Modeles
  21. class SiteTelecom(QgsModel):
  22. layername = "SITE_TELECOM"
  23. geom_type = QgsModel.GEOM_POINT
  24. crs = CRS
  25. bounding_box = (XMIN,YMIN,XMAX,YMAX)
  26. pk = "ST_CODE"
  27. schema = {'ST_CODE': {'type': 'string', 'empty': False},
  28. 'ST_NOM': {'type': 'string', 'empty': False},
  29. 'ST_TYPFCT': {'type': 'string', 'empty': False, 'allowed': ['SITE HEBERGEMENT',
  30. 'NOEUD RACCORDEMENT OPTIQUE',
  31. 'SOUS-REPARTITEUR OPTIQUE',
  32. 'SOUS-REPARTITEUR OPTIQUE COLOCALISE',
  33. 'NOEUD RACCORDEMENT D ABONNES',
  34. 'NOEUD RACCORDEMENT D ABONNES - HAUT DEBIT',
  35. 'NOEUD RACCORDEMENT D ABONNES - MONTEE EN DEBIT']},
  36. 'ST_STATUT': {'type': 'string', 'empty': False, 'allowed': STATUTS},
  37. 'ST_NBPRISE': {'empty': False, 'validator': is_positive_int}
  38. }
  39. class SiteClient(QgsModel):
  40. layername = "SITE_CLIENT"
  41. geom_type = QgsModel.GEOM_POINT
  42. crs = CRS
  43. bounding_box = (XMIN,YMIN,XMAX,YMAX)
  44. schema = {
  45. 'SC_TYPFON': {'type': 'string', 'empty': False, 'allowed': ['RESIDENTIEL', 'PROFESSIONNEL', 'OPERATEUR', 'TECHNIQUE']},
  46. 'SC_STATUT': {'type': 'string', 'empty': False, 'allowed': STATUTS},
  47. 'SC_NBPRISE': {'empty': False, 'validator': is_positive_int},
  48. 'SC_NBPRHAB': {'empty': False, 'validator': is_positive_int},
  49. 'SC_NBPRPRO': {'empty': False, 'validator': is_positive_int},
  50. 'SC_ID_SITE': {'type': 'string', 'empty': False},
  51. 'SC_NUM_RUE': {'type': 'string', 'empty': False},
  52. 'SC_REPET': {'type': 'string', 'empty': True},
  53. 'SC_DNUBAT': {'type': 'string', 'empty': False},
  54. 'SC_DESC': {'type': 'string', 'empty': False},
  55. 'SC_NOM_RUE': {'type': 'string', 'empty': False},
  56. 'SC_NOM_COM': {'type': 'string', 'empty': False},
  57. 'SC_ETAT': {'type': 'string', 'empty': False, 'allowed': ['VALIDE', 'OBSOLETE', 'NOUVEAU', 'FUTUR']},
  58. 'SC_ATT_PTO': {'empty': False, 'validator': is_positive_float}
  59. }
  60. class Cable(QgsModel):
  61. layername = "CABLE"
  62. geom_type = QgsModel.GEOM_LINE
  63. crs = CRS
  64. bounding_box = (XMIN,YMIN,XMAX,YMAX)
  65. schema = {'CA_TYPFON': {'type': 'string', 'empty': False, 'allowed': ['COLLECTE TRANSPORT DISTRIBUTION', 'COLLECTE', 'COLLECTE TRANSPORT', 'COLLECTE DISTRIBUTION', 'TRANSPORT DISTRIBUTION', 'TRANSPORT', 'DISTRIBUTION', 'RACCORDEMENT FINAL', 'BOUCLE METROPOLITAINE', 'LONGUE DISTANCE (LONG HAUL)', 'NON COMMUNIQUE']},
  66. 'CA_TYPSTR': {'type': 'string', 'empty': False, 'allowed': ['CONDUITE', 'AERIEN', 'COLONNE MONTANTE', 'IMMERGE', 'FACADE']},
  67. 'CA_STATUT': {'type': 'string', 'empty': False, 'allowed': STATUTS},
  68. 'CA_CAPFO': {'empty': False, 'allowed': [720,576,288,144,96,72,48,24,12]},
  69. 'CA_MODULO': {'empty': False, 'allowed': [6, 12]},
  70. 'CA_SUPPORT': {'type': 'string', 'empty': False, 'allowed': ['0', '1']}
  71. }
  72. class Zapbo(QgsModel):
  73. layername = "ZAPBO"
  74. geom_type = QgsModel.GEOM_POLYGON
  75. crs = CRS
  76. bounding_box = (XMIN,YMIN,XMAX,YMAX)
  77. schema = {'ZP_NBPRISE': {'empty': False, 'validator': is_positive_int},
  78. 'ZP_ISOLE': {'type': 'string', 'empty': False, 'allowed': ['0', '1']},
  79. 'ZP_STATUT': {'type': 'string', 'empty': False, 'allowed': STATUTS}
  80. }
  81. class Zasro(QgsModel):
  82. layername = "ZASRO"
  83. geom_type = QgsModel.GEOM_POLYGON
  84. crs = CRS
  85. bounding_box = (XMIN,YMIN,XMAX,YMAX)
  86. schema = {'ZS_CODE': {'type': 'string', 'empty': False},
  87. 'ZS_NBPRISE': {'empty': False, 'validator': is_positive_int}
  88. }
  89. class Adduction(QgsModel):
  90. layername = "ADDUCTION"
  91. geom_type = QgsModel.GEOM_LINE
  92. crs = CRS
  93. bounding_box = (XMIN,YMIN,XMAX,YMAX)
  94. schema = {'AD_ISOLE': {'type': 'string', 'empty': False, 'allowed': ['0', '1']},
  95. 'AD_LONG': {'empty': False, 'validator': is_positive_float}
  96. }
  97. models = [SiteTelecom, SiteClient, Cable, Zapbo, Zasro, Adduction]
  98. ####### Validateur
  99. class Mn3ApdChecker(BaseChecker):
  100. def setUp(self):
  101. self.dataset = {}
  102. for model in models:
  103. model.layer = next((l for l in QgsProject.instance().mapLayers().values() \
  104. if l.name().lower() == model.layername.lower()), None)
  105. self.dataset[model] = [model(f) for f in model.layer.getFeatures()] if model.layer else []
  106. self.sites_telecom = self.dataset.get(SiteTelecom, [])
  107. self.sites_client = self.dataset.get(SiteClient, [])
  108. self.cables = self.dataset.get(Cable, [])
  109. self.zapbos = self.dataset.get(Zapbo, [])
  110. self.zasros = self.dataset.get(Zasro, [])
  111. self.adductions = self.dataset.get(Adduction, [])
  112. def test_load_layers(self):
  113. """ Chargement des données
  114. Contrôle la présence des couches attendues
  115. """
  116. for model in models:
  117. if model.layer is None:
  118. self.log_error("Couche manquante", model=model)
  119. continue
  120. if model.pk:
  121. if not model.pk.lower() in [f.name().lower() for f in model.layer.fields()]:
  122. self.log_error(f"Clef primaire manquante ({model.pk})", model=model)
  123. continue
  124. def test_scr(self):
  125. """ Contrôle des projections
  126. Vérifie que les couches ont le bon sytème de projection
  127. """
  128. for model in models:
  129. if model.layer and model.layer.crs().authid() != model.crs:
  130. self.log_error(f"Mauvaise projection (attendu: {model.crs})", model=model)
  131. def _validate_structure(self, model, items):
  132. v = CerberusValidator(model.schema, purge_unknown=True, error_handler=CerberusErrorHandler, require_all=True)
  133. for item in items:
  134. v.validate(item.__dict__)
  135. for field, verrors in v.errors.items():
  136. for err in verrors:
  137. self.log_error(f"{field} : {_translate_messages(err)}", item=item)
  138. def test_structure_sites_telecom(self):
  139. """ Structure des données: SiteTelecom
  140. Contrôle les données attributaires de la couche SITE_TELECOM:
  141. présence, format, valeurs autorisées...
  142. """
  143. self._validate_structure(SiteTelecom, self.sites_telecom)
  144. def test_structure_sites_client(self):
  145. """ Structure des données: SiteClient
  146. Contrôle les données attributaires de la couche SITE_CLIENT:
  147. présence, format, valeurs autorisées...
  148. """
  149. self._validate_structure(SiteClient, self.sites_client)
  150. def test_structure_cables(self):
  151. """ Structure des données: Cables
  152. Contrôle les données attributaires de la couche CABLE:
  153. présence, format, valeurs autorisées...
  154. """
  155. self._validate_structure(Cable, self.cables)
  156. def test_structure_zapbos(self):
  157. """ Structure des données: Zapbo
  158. Contrôle les données attributaires de la couche ZAPBO:
  159. présence, format, valeurs autorisées...
  160. """
  161. self._validate_structure(Zapbo, self.zapbos)
  162. def test_structure_zasros(self):
  163. """ Structure des données: Zasro
  164. Contrôle les données attributaires de la couche ZASRO:
  165. présence, format, valeurs autorisées...
  166. """
  167. self._validate_structure(Zasro, self.zasros)
  168. def test_structure_adductions(self):
  169. """ Structure des données: Adduction
  170. Contrôle les données attributaires de la couche ADDUCTION:
  171. présence, format, valeurs autorisées...
  172. """
  173. self._validate_structure(Adduction, self.adductions)
  174. def test_geometry_validity(self):
  175. """ Contrôle de la validité des géométries
  176. """
  177. for model in models:
  178. for item in self.dataset[model]:
  179. if not item.is_geometry_valid():
  180. self.log_error("La géométrie de l'objet est invalide", item=item)
  181. def test_geometry_type(self):
  182. """ Contrôle des types de géométries
  183. """
  184. for model in models:
  185. for item in self.dataset[model]:
  186. item_geom_type = item.get_geom_type()
  187. if item_geom_type != model.geom_type:
  188. expected, found = model.get_geom_name(model.geom_type), model.get_geom_name(item_geom_type)
  189. self.log_error(f"Type de géométrie invalide (attendu: {expected}, trouvé: {found})", item=item)
  190. def test_bounding_box(self):
  191. """ Contrôle des emprises
  192. Vérifie que les objets sont dans le périmètre attendu
  193. """
  194. for model in models:
  195. xmin, ymin, xmax, ymax = model.bounding_box
  196. for item in self.dataset[model]:
  197. if not item.is_geometry_valid():
  198. continue
  199. x1, y1, x2, y2 = item.get_bounding_box()
  200. if any(x < xmin or x > xmax for x in (x1, x2)) or \
  201. any(y < ymin or y > ymax for y in (y1, y2)):
  202. self.log_error("Hors de l'emprise autorisée", item=item)
  203. def test_duplicates(self):
  204. """ Recherche de doublons
  205. Recherche d'éventuels doublons dans des champs qui supposent l'unicité
  206. """
  207. # doublons dans ST_CODE
  208. tmp = []
  209. for site_telecom in self.sites_telecom:
  210. if not site_telecom.ST_CODE:
  211. continue
  212. if not site_telecom.ST_CODE in tmp:
  213. tmp.append(site_telecom.ST_CODE)
  214. else:
  215. self.log_error("Doublons dans le champs ST_CODE", item=site_telecom)
  216. def test_dimension_zasro(self):
  217. """ Contrôle le dimensionnement des ZASRO """
  218. for zasro in self.zasros:
  219. if zasro.ZS_NBPRISE > 850:
  220. self.log_error("Le nombre de prises est supérieur à 850", item=zasro)
  221. def test_affaiblissement(self):
  222. """ Contrôle l'affaiblissement """
  223. for site_client in self.sites_client:
  224. if site_client.SC_ATT_PTO > 28:
  225. self.log_error("L'affaiblissement est supérieur à 28", item=site_client)
  226. def test_capacite_modulo(self):
  227. """ Cohérence capacité du cable / modulo """
  228. for cable in self.cables:
  229. if cable.CA_CAPFO in [720,576,288,144,96] and cable.CA_MODULO != 12:
  230. self.log_error(f"Modulo invalide (capacite: {cable.CA_CAPFO}, modulo: {cable.CA_MODULO})", item=cable)
  231. elif cable.CA_CAPFO in [48,24,12] and cable.CA_MODULO != 6:
  232. self.log_error(f"Modulo invalide (capacite: {cable.CA_CAPFO}, modulo: {cable.CA_MODULO})", item=cable)
  233. def test_longueur_racco(self):
  234. """ Contrôle la longueur des raccordements """
  235. for adduction in self.adductions:
  236. if adduction.AD_ISOLE == "1" and adduction.AD_LONG <= 100:
  237. self.log_error("L'adduction ne devrait pas être consiudérée comme isolée (long.: {adduction.AD_LONG})", item=adduction)
  238. elif adduction.AD_ISOLE == "0" and adduction.AD_LONG > 100:
  239. self.log_error("L'adduction devrait être considérée comme isolée (long.: {adduction.AD_LONG})", item=adduction)
  240. def test_zapbos_prises(self):
  241. """ Topologie: Zapbos / Prises
  242. Toutes les zapbo contiennent au moins une prise"""
  243. for zapbo in self.zapbos:
  244. nb_prises = sum([site_client.SC_NBPRISE for site_client in self.sites_client if zapbo.geom.contains(site_client.geom)])
  245. if not nb_prises:
  246. self.log_error("La Zapbo ne contient aucune prise", item=zapbo)
  247. for site_client in self.sites_client:
  248. nb_zapbo = len([zapbo for zapbo in self.zapbos if zapbo.geom.contains(site_client.geom)])
  249. if nb_zapbo == 0:
  250. self.log_error("La prise n'est contenue dans aucune ZAPBO", item=site_client)
  251. elif nb_zapbo > 1:
  252. self.log_error("La prise est contenue dans plus d'une ZAPBO", item=site_client)
  253. checkers = [Mn3ApdChecker]