''' Script d'import des données de facturation depuis la base de données ASTRE-GF vers les tables de la base Analytique du Parc Départemental d'Erstein En cas d'erreur avec les données importées: 1. une tentative d'autocorrection est effectuée 2. Si les données sont toujours invalides, une ligne est ajoutée au fichier .\\work\\gf2analytique\\err.csv pour une correction manuelle. IMPORTANT: Si le fichier 'err.csv' contient des lignes, le script tentera d'importer ces lignes à la place de celles issues de Astre Gf. Pour forcer un imprt depuis AstreGf, supprimez le fichier 'err.csv' Info: Les données sont obtenues via le web service CG67.AstreGf ''' import logging import re from path import Path # @UnusedImport from core import logconf from core.pde import AnalytiqueDb, mk_workdir from core.webservice import GfWebservice logger = logging.getLogger("gf2analytique") logconf.start("gf2analytique", logging.DEBUG) # # POUR TESTER, décommenter les lignes suivantes ##----------------------------------------------- logger.warning("<<<<<<<<<<<<<< Mode TEST >>>>>>>>>>>>>>>>>") GfWebservice._url = r"http://webservices-t.bas-rhin.fr/CG67.AstreGF.WebServices/public/WsPDE.asmx" AnalytiqueDb._path = Path(r"\\h2o\local\4-transversal\BDD\mdb_test\Db_analytique.mdb") ##----------------------------------------------- logger.info("Initialization") # Connect to factures.mdb analytique_db = AnalytiqueDb(autocommit=False) # Connect to the astre gf webservice ws = GfWebservice("GetPDEFactures") # Make the working directory workdir = mk_workdir("gf2analytique") errfile = workdir / "err.csv" class AlreadyImported(Exception): pass class NotImported(Exception): pass class InvalidData(Exception): pass class InvalidAxe(Exception): pass class Facture(): WS_FIELDS = ["numExBudget", "codeColl", "codeBudg", "numEnv", "codeSection", "typeMvt", "numMandat", "numLiqMandat", "numLigneMandat", "codeAxe", "libAxe", "codeCout", "libCout", "dateMandat", "numBj", "numTiers", "libRai", "refIntMandat", "codePeriode", "dateDepDelai", "typeNomencMarche", "mntTtcMandat", "mntTvaMandat", "mntVent"] def __init__(self): self._factureId = None for fld in self.WS_FIELDS: setattr(self, fld, None) @property def factureId(self): if self._factureId is None: try: self._factureId = self._get_facture_id() except (KeyError, AttributeError, TypeError): raise NotImported() return self._factureId @classmethod def from_webservice(cls, wsdata): facture = cls() for key, value in wsdata.items(): setattr(facture, key, value) facture.autocorrection() return facture @classmethod def from_errfile(cls, line): return cls.from_webservice(dict(zip(cls.WS_FIELDS, line.split("\t")))) def is_imported(self): try: return self.factureId > 0 except NotImported: return False def _init_errfile(self): try: with open(errfile, 'r') as f: if f.read(100): # File already exists and is not empty return except FileNotFoundError: pass firstline = "\t".join(self.WS_FIELDS + ["\n"]) with open(errfile, 'a') as f: f.write(firstline) def dump_to_err(self): self._init_errfile() line = "\t".join([str(getattr(self, field)).replace("\t", " ") for field in self.WS_FIELDS] + ["\n"]) with open(errfile, 'a') as f: f.write(line) def autocorrection(self): # correction auto des codes chantiers if self.codeAxe == "AFFAI" and re.match(r"\d{2}5\d{3}", self.codeCout): self.codeCout += "/1" # echappe les apostrophes self.libRai = self.libRai.replace("'", "''") # renomme automatiquement les noms de materiels if self.codeAxe == "ENGIN": row = analytique_db.first("""SELECT txtMateriel FROM tbl_materiel WHERE txtMateriel='{codeCout}' or txtMateriel='ZZ {codeCout}' """.format(codeCout=self.codeCout)) if row: self.codeCout = row.txtMateriel def is_valid(self): """ controle la validité des données d'une facture """ if not int(self.numExBudget) > 2000: logger.warning("Exercice budgetaire invalide: %s", self.numExBudget) return False if self.codeColl != "CG67": logger.warning("Code collectivité invalide: %s", self.codeColl) return False if self.codeBudg != "02": logger.warning("Code budgetaire invalide: %s", self.codeBudg) return False if self.codeAxe == "ENGIN": # Controle l'existence du materiel if not analytique_db.first("SELECT intlMaterielID FROM tbl_materiel WHERE txtMateriel='{}'".format(self.codeCout)): logger.warning("Le materiel n'existe pas: %s", self.codeCout) return False elif self.codeAxe == "AFFAI": # Controle l'existence de l'affaire if not analytique_db.first("SELECT dblAffaireId FROM tbl_Affaires WHERE strLiaisonControle='{}'".format(self.codeCout)): logger.warning("L'affaire n'existe pas: %s", self.codeCout) return False else: # CodeAxe invalide logger.warning("Code axe inconnu: %s", self.codeAxe) return False return True def send_to_db(self): if self.is_imported(): raise AlreadyImported() if not self.is_valid(): raise InvalidData() self._insert_factures() if self.codeAxe == "ENGIN": self._insert_factures_engins() elif self.codeAxe == "AFFAI": self._insert_factures_affaires() self._insert_mandatement() analytique_db.commit() logger.info("* imported: %s", self.factureId) def _get_facture_id(self): sql = """SELECT dblFactureId FROM tbl_Factures WHERE intExercice = {} AND strLiquidation = '{}' AND strEngagement = '{}' AND strService='7710' """.format(self.numExBudget, self.numLiqMandat, self.numMandat) factureId = analytique_db.first(sql).dblFactureId return factureId def _insert_factures(self): sql = """INSERT INTO tbl_Factures ( intExercice, strLiquidation, intLiquidationLigne, strEngagement, strEnveloppe, strService, strTiers, strTiersLibelle, strMotsClefs, dtmDeb, intOperation, strNomenclature0, strAXE, strCentreCout, strObjet, dblMontantTotal, dblMontantTVA, strORIGINE_DONNEES ) VALUES ({intExercice}, '{strLiquidation}', {intLiquidationLigne}, '{strEngagement}', '{strEnveloppe}', '{strService}', '{strTiers}', '{strTiersLibelle}', '{strMotsClefs}', #{dtmDeb}#, {intOperation}, '{strNomenclature0}', '{strAxe}', '{strCentreCout}', '{strObjet}', {dblMontantTotal}, {dblMontantTVA}, '{strORIGINE_DONNEES}' ) """.format( intExercice=self.numExBudget, strLiquidation=self.numLiqMandat, intLiquidationLigne=self.numLigneMandat, strEngagement=self.numMandat, strEnveloppe=self.numEnv, strService='7710', strTiers=self.numTiers, strTiersLibelle=self.libRai, strMotsClefs=AnalytiqueDb.nz(self.refIntMandat), dtmDeb=AnalytiqueDb.format_date(self.dateDepDelai), intOperation=AnalytiqueDb.nz(self.codePeriode, "Null"), strNomenclature0=self.typeNomencMarche, strAxe=self.codeAxe, strCentreCout=self.codeCout, strObjet=AnalytiqueDb.format_date(self.dateMandat, out_format="%d/%m/%Y"), dblMontantTVA=self.mntTvaMandat, dblMontantTotal=self.mntVent, strORIGINE_DONNEES='ASTRE' ) logger.debug("> %s", sql) analytique_db.execute(sql) def _insert_factures_engins(self): if self.codeAxe != "ENGIN": raise InvalidAxe() materiel = analytique_db.first("SELECT intlMaterielID FROM tbl_Materiel WHERE [txtMateriel]='{}'".format(self.codeCout)) materielId = materiel.intlMaterielID if materiel else '859' logger.debug("retrieve intlMaterielID: %s", materielId) sql = """INSERT INTO tbl_Facture_Engin ( intlMaterielID, txtMateriel, dblFactureId, strLibelle, dblMontant, strType ) VALUES ({}, '{}', {}, '{}', {}, '{}') """.format(materielId, self.codeCout, self.factureId, AnalytiqueDb.nz(self.libCout), self.mntVent, self.libRai ) logger.debug("> %s", sql) analytique_db.execute(sql) def _insert_factures_affaires(self): if self.codeAxe != "AFFAI": raise InvalidAxe() sql = """INSERT INTO tbl_Facture_Affaire ( strAffaireId, dblFactureId, strLibelle, dblMontant, strType ) VALUES ('{}', {}, '{}', {}, '{}') """.format(self.codeCout, self.factureId, self.libRai , self.mntVent, AnalytiqueDb.nz(self.libCout), ) logger.debug("> %s", sql) analytique_db.execute(sql) def _insert_mandatement(self): sql = """INSERT INTO tbl_Mandatement ( dblFacture, strNumMandat, dtmMandat, strBordereau ) VALUES ({}, '{}', #{}#, '{}') """.format(self.factureId, self.numMandat, AnalytiqueDb.format_date(self.dateMandat), self.numBj ) logger.debug("> %s", sql) analytique_db.execute(sql) @staticmethod def load_errfile_data(): factures = [] try: firstline = True with open(errfile, "r") as f: for line in f: if firstline: firstline = False continue facture = Facture.from_errfile(line) factures.append(facture) except FileNotFoundError: pass return factures @staticmethod def process(factures): analysed, updated, errors = 0, 0, 0 for facture in factures: analysed += 1 try: facture.send_to_db() updated += 1 except AlreadyImported: pass except InvalidData: facture.dump_to_err() errors += 1 return analysed, updated, errors ######## if __name__ == "__main__": to_retry = Facture.load_errfile_data() errfile.remove_p() if to_retry: logger.info("# Ré-import depuis le fichier d'erreurs") logger.info("{} lignes chargées depuis {}".format(len(to_retry), errfile)) res = Facture.process(to_retry) logger.info("> {} lignes traitées / {} importées / {} erreurs".format(res[0], res[1], res[2])) else: logger.info("# Import depuis Astre-Gf") res = Facture.process([Facture.from_webservice(wsdata) for wsdata in ws]) logger.info("> {} lignes traitées / {} importées / {} erreurs".format(res[0], res[1], res[2])) logging.shutdown()