| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- '''
- Python 3.7+
- @author: olivier.massot, sept 2018
- '''
- from datetime import datetime
- import logging
- import zipfile
- from path import Path, TempDir
- import shapefile
- import yaml
- from core import logconf
- from core.constants import MAIN
- logger = logging.getLogger("datachecker")
- logconf.start("datachecker", logging.INFO)
- # TODO: Vérifier la projection (besoin de GDAL/OGR)
- # TODO: max length
- # TODO: fonctions de controle spéciales
- # TODO: Rendu HTML
- def check(subject, checker):
- """ prends un dossier ou une archive en entier et vérifie son contenu selon les règles données par le fichier de config """
- subject, checker = Path(subject), Path(checker)
-
- if subject.isfile():
- with TempDir() as dirname:
- zip_ref = zipfile.ZipFile(subject, 'r')
- zip_ref.extractall(dirname)
- zip_ref.close()
- print()
- if Path(dirname / subject.stem).isdir(): # cas où l'archive contient un dossier qui lui-même contient les fichiers
- dirname /= subject.stem
- check_folder(dirname, checker)
-
- elif subject.isdir():
- check_folder(subject, checker)
- else:
- raise IOError(f"Impossible de trouver le fichier ou répertoire: {subject}")
- def check_folder(folder, checker):
- logging.info("***** Traitement de '%s' *****", folder.name)
-
- logging.info("> Controlleur: '%s'", checker.name)
-
- with open(checker, "r") as cf:
- config = yaml.load(cf)
-
- for filename, model in config["files"].items():
- path_ = folder / filename
- logging.info("* Traitement de %s", path_.name)
-
- if not path_.isfile():
- logger.error("Fichier introuvable")
- continue
-
- try:
- sf = shapefile.Reader(path_)
- except shapefile.ShapefileException:
- logger.error("Fichier SHAPE illisible")
- continue
-
- if "srid" in config:
- pass
- xmin, xmax, ymin, ymax = (int(config.get("xmin", 0)),
- int(config.get("xmax", float("inf"))),
- int(config.get("ymin", 0)),
- int(config.get("ymax", float("inf")))
- )
-
- if "shape_type" in model:
- shape_names = {1:"Point", 3:"Polyligne", 5:"Polygone"}
- if sf.shapeType != model["shape_type"]:
- logger.error("Le fichier shapefile n'est pas de type %s", shape_names[model["shape_type"]])
- del sf
- continue
- records = sf.shapeRecords()
- if not records:
- if not model["can_be_empty"]:
- logger.error("Le fichier shapefile ne contient aucune donnees")
- del sf, records
- continue
- else:
- logger.warning("Le fichier shapefile ne contient aucune donnees")
-
- if not "fields" in model:
- continue
-
- fields = [f[0] for f in sf.fields if f[0] != 'DeletionFlag']
-
- # controle d'éventuels champs inconnus
- for f in fields:
- if f not in model["fields"]:
- logger.warning("Champs inconnu: %s", f)
-
- # parcours et controle des enregistrements
- for i, record in enumerate(records):
-
- logging.info("\n> Enregistrement n°%s\n", i)
- record_data = {field: record.record[i] for i, field in enumerate(fields)}
-
- x1, y1, x2, y2 = sf.shapes()[i].bbox
- if not xmin <= x1 <= xmax or not xmin <= x2 <= xmax or \
- not ymin <= y1 <= ymax or not ymin <= y2 <= ymax:
- logger.error("L'élément est situé hors de la zone autorisée")
-
- for fieldname, fieldmodel in model["fields"].items():
-
- try:
- val = record_data[fieldname]
- except KeyError:
- if fieldmodel.get("required", True):
- logger.error("%s - Champs manquant", fieldname)
- continue
-
- type_ = fieldmodel.get("type", "str")
- if type_ == "float":
- try:
- _ = float(val)
- except (TypeError, ValueError):
- logger.error("%s - Valeur invalide, un flottant est attendu ('%s')", fieldname, val)
- continue
- elif type_ == "datetime":
- try:
- _ = datetime.strptime(val, fieldmodel.get("date_format", "%d/%m/%Y"))
- except ValueError:
- logger.error("%s - Valeur invalide, une date est attendu ('%s')", fieldname, val)
- continue
- else:
- if not fieldmodel.get("allow_empty", False) and not val:
- logger.error("%s - Champs vide", fieldname)
- continue
-
- if type_ == "str" and "max_len" in fieldmodel:
- if len(str(val)) > fieldmodel["max_len"]:
- logger.error("%s - Trop long, la longueur max. est de %s ('%s')", fieldname, fieldmodel["max_len"], val)
-
-
- try:
- if not val in fieldmodel["in_list"]:
- logger.error("%s - Valeur invalide, pas dans la liste ('%s')", fieldname, val)
- continue
- except KeyError:
- pass
-
- del sf, records
- if __name__ == "__main__":
-
- subject = MAIN / "work" / "SCOPELEC_CAP_097AP0_REC_180829_OK.zip"
- checker = MAIN / "resources" / "netgeo_v2-2_doe.yaml"
-
- check(subject, checker)
-
- logger.info("-- Fin --")
|