| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- '''
- @author: olivier.massot, févr. 2018
- '''
- import csv
- from datetime import datetime
- import logging
- import dateutil.parser
- from core.sqlformatter import SqlFormatter
- logger = logging.getLogger("model")
- Sql = SqlFormatter()
- csv.register_dialect('tsv', delimiter='\t', quotechar='', quoting=csv.QUOTE_NONE)
- class Model():
- _mapping = {}
- def __setattr__(self, name, value):
- try:
- val, type_ = value
- if not type_ in (int, float, bool, str, datetime):
- raise TypeError("Type de donnée invalide pour une propriété de 'Model' ('{}')".format(type(val)))
- self.__class__._mapping[name] = type_
- except (TypeError, ValueError):
- val = value
- if val is None:
- super().__setattr__(name, val)
- return
- try:
- type_ = self.__class__._mapping[name]
- except KeyError:
- super().__setattr__(name, val)
- return
- val = Model._cast(val, type_)
- super().__setattr__(name, val)
- @staticmethod
- def _cast(value, type_):
- if value is None:
- return value
- if type_ == datetime:
- if type(value) is str:
- return dateutil.parser.parse(value)
- elif type(value) is datetime:
- return value
- else:
- raise ValueError("'{}' ne peut pas être converti en date".format(value))
- if type_ is bool and value in ("True", "False"):
- return (value == "True")
- else:
- return type_(value)
- @property
- def _fields(self):
- return list(self.__dict__.keys())
- @property
- def data(self):
- return self.__dict__
- def __repr__(self):
- return "<{} => {}>".format(self.__class__.__name__, ",".join(["{}={}".format(field, value) for field, value in self.__dict__.items()]))
- @classmethod
- def from_dict(cls, data):
- """ Retourne un objet à partir d'un dictionnaire de données """
- model = cls()
- for key, value in data.items():
- setattr(model, key, value)
- return model
- @classmethod
- def _parse(cls, field, value):
- if value == 'None':
- value = None
- try:
- return Model._cast(value, cls._mapping[field])
- except KeyError:
- return value
- # Fonctions CSV
- def dump_to_csv(self, path):
- """ Ajoute les données du modèle au format CSV dans le fichier spécifié en paramètre.
- Créé le fichier s'il n'existe pas, avec une ligne d'en-tête """
- if not path.exists():
- logger.debug("Génère le fichier %s", path)
- with open(path, 'w+', newline='') as f:
- writer = csv.writer(f, 'tsv')
- writer.writerow(self._fields)
- with open(path, "a", newline='') as f:
- writer = csv.writer(f, 'tsv')
- writer.writerow([str(getattr(self, field)).replace("\t", " ") for field in self._fields])
- @classmethod
- def load_csv(cls, path):
- """ parcourt les lignes du fichier csv et renvoie chaque ligne sous forme d'un objet Model
- ATTENTION: chaque propriété dont le type n'est pas précisé dans _mapping aura le type 'string'
- """
- with open(path) as f:
- reader = csv.reader(f, 'tsv')
- fields = next(reader)
- for row in csv.reader(f, 'tsv'):
- data = {key: cls._parse(key, value) for key, value in zip(fields, row)}
- yield(cls.from_dict(data))
|