| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- '''
- Script d'import des données de facturation depuis
- la base de données ASTRE-GF vers les tables de la base Analytique
- du Parc Départemental d'Erstein
- En cas d'erreur avec les données importées:
- 1. une tentative d'autocorrection est effectuée
- 2. Si les données sont toujours invalides, une ligne est ajoutée au fichier .\\work\\gf2analytique\\err.csv
- pour une correction manuelle.
- IMPORTANT: Si le fichier 'err.csv' contient des lignes, le script tentera d'importer
- ces lignes à la place de celles issues de Astre Gf.
- Pour forcer un imprt depuis AstreGf, supprimez le fichier 'err.csv'
- Info: Les données sont obtenues via le web service CG67.AstreGf
- '''
- import logging
- import re
- from path import Path # @UnusedImport
- from core import logconf
- from core.pde import AnalytiqueDb, mk_workdir
- from core.webservice import GfWebservice
- logger = logging.getLogger("gf2analytique")
- logconf.start("gf2analytique", logging.DEBUG)
- # # POUR TESTER, décommenter les lignes suivantes
- ##-----------------------------------------------
- logger.warning("<<<<<<<<<<<<<< Mode TEST >>>>>>>>>>>>>>>>>")
- GfWebservice._url = r"http://webservices-t.bas-rhin.fr/CG67.AstreGF.WebServices/public/WsPDE.asmx"
- AnalytiqueDb._path = Path(r"\\h2o\local\4-transversal\BDD\mdb_test\Db_analytique.mdb")
- ##-----------------------------------------------
- logger.info("Initialization")
- # Connect to factures.mdb
- analytique_db = AnalytiqueDb(autocommit=False)
- # Connect to the astre gf webservice
- ws = GfWebservice("GetPDEFactures")
- # Make the working directory
- workdir = mk_workdir("gf2analytique")
- errfile = workdir / "err.csv"
- class AlreadyImported(Exception):
- pass
- class NotImported(Exception):
- pass
- class InvalidData(Exception):
- pass
- class InvalidAxe(Exception):
- pass
- class Facture():
- WS_FIELDS = ["numExBudget", "codeColl", "codeBudg", "numEnv", "codeSection", "typeMvt", "numMandat", "numLiqMandat",
- "numLigneMandat", "codeAxe", "libAxe", "codeCout", "libCout", "dateMandat", "numBj", "numTiers",
- "libRai", "refIntMandat", "codePeriode", "dateDepDelai", "typeNomencMarche", "mntTtcMandat",
- "mntTvaMandat", "mntVent"]
- def __init__(self):
- self._factureId = None
- for fld in self.WS_FIELDS:
- setattr(self, fld, None)
- @property
- def factureId(self):
- if self._factureId is None:
- try:
- self._factureId = self._get_facture_id()
- except (KeyError, AttributeError, TypeError):
- raise NotImported()
- return self._factureId
- @classmethod
- def from_webservice(cls, wsdata):
- facture = cls()
- for key, value in wsdata.items():
- setattr(facture, key, value)
- facture.autocorrection()
- return facture
- @classmethod
- def from_errfile(cls, line):
- return cls.from_webservice(dict(zip(cls.WS_FIELDS, line.split("\t"))))
- def is_imported(self):
- try:
- return self.factureId > 0
- except NotImported:
- return False
- def _init_errfile(self):
- try:
- with open(errfile, 'r') as f:
- if f.read(100):
- # File already exists and is not empty
- return
- except FileNotFoundError:
- pass
- firstline = "\t".join(self.WS_FIELDS + ["\n"])
- with open(errfile, 'a') as f:
- f.write(firstline)
- def dump_to_err(self):
- self._init_errfile()
- line = "\t".join([str(getattr(self, field)).replace("\t", " ") for field in self.WS_FIELDS] + ["\n"])
- with open(errfile, 'a') as f:
- f.write(line)
- def autocorrection(self):
- # correction auto des codes chantiers
- if self.codeAxe == "AFFAI" and re.match(r"\d{2}5\d{3}", self.codeCout):
- self.codeCout += "/1"
- # echappe les apostrophes
- self.libRai = self.libRai.replace("'", "''")
- # renomme automatiquement les noms de materiels
- if self.codeAxe == "ENGIN":
- row = analytique_db.first("""SELECT txtMateriel FROM tbl_materiel
- WHERE txtMateriel='{codeCout}' or txtMateriel='ZZ {codeCout}'
- """.format(codeCout=self.codeCout))
- if row:
- self.codeCout = row.txtMateriel
- def is_valid(self):
- """ controle la validité des données d'une facture """
- if not int(self.numExBudget) > 2000:
- logger.warning("Exercice budgetaire invalide: %s", self.numExBudget)
- return False
- if self.codeColl != "CG67":
- logger.warning("Code collectivité invalide: %s", self.codeColl)
- return False
- if self.codeBudg != "02":
- logger.warning("Code budgetaire invalide: %s", self.codeBudg)
- return False
- if self.codeAxe == "ENGIN":
- # Controle l'existence du materiel
- if not analytique_db.first("SELECT intlMaterielID FROM tbl_materiel WHERE txtMateriel='{}'".format(self.codeCout)):
- logger.warning("Le materiel n'existe pas: %s", self.codeCout)
- return False
- elif self.codeAxe == "AFFAI":
- # Controle l'existence de l'affaire
- if not analytique_db.first("SELECT dblAffaireId FROM tbl_Affaires WHERE strLiaisonControle='{}'".format(self.codeCout)):
- logger.warning("L'affaire n'existe pas: %s", self.codeCout)
- return False
- else:
- # CodeAxe invalide
- logger.warning("Code axe inconnu: %s", self.codeAxe)
- return False
- return True
- def send_to_db(self):
- if self.is_imported():
- raise AlreadyImported()
- if not self.is_valid():
- raise InvalidData()
- self._insert_factures()
- if self.codeAxe == "ENGIN":
- self._insert_factures_engins()
- elif self.codeAxe == "AFFAI":
- self._insert_factures_affaires()
- self._insert_mandatement()
- analytique_db.commit()
- logger.info("* imported: %s", self.factureId)
- def _get_facture_id(self):
- sql = """SELECT dblFactureId FROM tbl_Factures
- WHERE intExercice = {}
- AND strLiquidation = '{}'
- AND strEngagement = '{}'
- AND strService='7710'
- """.format(self.numExBudget,
- self.numLiqMandat,
- self.numMandat)
- factureId = analytique_db.first(sql).dblFactureId
- return factureId
- def _insert_factures(self):
- sql = """INSERT INTO tbl_Factures ( intExercice,
- strLiquidation,
- intLiquidationLigne,
- strEngagement,
- strEnveloppe,
- strService,
- strTiers,
- strTiersLibelle,
- strMotsClefs,
- dtmDeb,
- intOperation,
- strNomenclature0,
- strAXE,
- strCentreCout,
- strObjet,
- dblMontantTotal,
- dblMontantTVA,
- strORIGINE_DONNEES
- )
- VALUES ({intExercice},
- '{strLiquidation}',
- {intLiquidationLigne},
- '{strEngagement}',
- '{strEnveloppe}',
- '{strService}',
- '{strTiers}',
- '{strTiersLibelle}',
- '{strMotsClefs}',
- #{dtmDeb}#,
- {intOperation},
- '{strNomenclature0}',
- '{strAxe}',
- '{strCentreCout}',
- '{strObjet}',
- {dblMontantTotal},
- {dblMontantTVA},
- '{strORIGINE_DONNEES}'
- )
- """.format(
- intExercice=self.numExBudget,
- strLiquidation=self.numLiqMandat,
- intLiquidationLigne=self.numLigneMandat,
- strEngagement=self.numMandat,
- strEnveloppe=self.numEnv,
- strService='7710',
- strTiers=self.numTiers,
- strTiersLibelle=self.libRai,
- strMotsClefs=AnalytiqueDb.nz(self.refIntMandat),
- dtmDeb=AnalytiqueDb.format_date(self.dateDepDelai),
- intOperation=AnalytiqueDb.nz(self.codePeriode, "Null"),
- strNomenclature0=self.typeNomencMarche,
- strAxe=self.codeAxe,
- strCentreCout=self.codeCout,
- strObjet=AnalytiqueDb.format_date(self.dateMandat, out_format="%d/%m/%Y"),
- dblMontantTVA=self.mntTvaMandat,
- dblMontantTotal=self.mntVent,
- strORIGINE_DONNEES='ASTRE'
- )
- logger.debug("> %s", sql)
- analytique_db.execute(sql)
- def _insert_factures_engins(self):
- if self.codeAxe != "ENGIN":
- raise InvalidAxe()
- materiel = analytique_db.first("SELECT intlMaterielID FROM tbl_Materiel WHERE [txtMateriel]='{}'".format(self.codeCout))
- materielId = materiel.intlMaterielID if materiel else '859'
- logger.debug("retrieve intlMaterielID: %s", materielId)
- sql = """INSERT INTO tbl_Facture_Engin ( intlMaterielID, txtMateriel, dblFactureId, strLibelle, dblMontant, strType )
- VALUES ({}, '{}', {}, '{}', {}, '{}')
- """.format(materielId,
- self.codeCout,
- self.factureId,
- AnalytiqueDb.nz(self.libCout),
- self.mntVent,
- self.libRai
- )
- logger.debug("> %s", sql)
- analytique_db.execute(sql)
- def _insert_factures_affaires(self):
- if self.codeAxe != "AFFAI":
- raise InvalidAxe()
- sql = """INSERT INTO tbl_Facture_Affaire ( strAffaireId, dblFactureId, strLibelle, dblMontant, strType )
- VALUES ('{}', {}, '{}', {}, '{}')
- """.format(self.codeCout,
- self.factureId,
- self.libRai ,
- self.mntVent,
- AnalytiqueDb.nz(self.libCout),
- )
- logger.debug("> %s", sql)
- analytique_db.execute(sql)
- def _insert_mandatement(self):
- sql = """INSERT INTO tbl_Mandatement ( dblFacture, strNumMandat, dtmMandat, strBordereau )
- VALUES ({}, '{}', #{}#, '{}')
- """.format(self.factureId,
- self.numMandat,
- AnalytiqueDb.format_date(self.dateMandat),
- self.numBj
- )
- logger.debug("> %s", sql)
- analytique_db.execute(sql)
- @staticmethod
- def load_errfile_data():
- factures = []
- try:
- firstline = True
- with open(errfile, "r") as f:
- for line in f:
- if firstline:
- firstline = False
- continue
- facture = Facture.from_errfile(line)
- factures.append(facture)
- except FileNotFoundError:
- pass
- return factures
- @staticmethod
- def process(factures):
- analysed, updated, errors = 0, 0, 0
- for facture in factures:
- analysed += 1
- try:
- facture.send_to_db()
- updated += 1
- except AlreadyImported:
- pass
- except InvalidData:
- facture.dump_to_err()
- errors += 1
- return analysed, updated, errors
- ########
- if __name__ == "__main__":
- to_retry = Facture.load_errfile_data()
- errfile.remove_p()
- if to_retry:
- logger.info("# Ré-import depuis le fichier d'erreurs")
- logger.info("{} lignes chargées depuis {}".format(len(to_retry), errfile))
- res = Facture.process(to_retry)
- logger.info("> {} lignes traitées / {} importées / {} erreurs".format(res[0], res[1], res[2]))
- else:
- logger.info("# Import depuis Astre-Gf")
- res = Facture.process([Facture.from_webservice(wsdata) for wsdata in ws])
- logger.info("> {} lignes traitées / {} importées / {} erreurs".format(res[0], res[1], res[2]))
- logging.shutdown()
|