|
@@ -0,0 +1,135 @@
|
|
|
|
|
+'''
|
|
|
|
|
+ Convenient access to various databases
|
|
|
|
|
+'''
|
|
|
|
|
+from collections import namedtuple
|
|
|
|
|
+from datetime import datetime
|
|
|
|
|
+import logging
|
|
|
|
|
+
|
|
|
|
|
+import pypyodbc
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+pypyodbc.lowercase = False
|
|
|
|
|
+
|
|
|
|
|
+logger = logging.getLogger("database")
|
|
|
|
|
+
|
|
|
|
|
+class CustomDb(pypyodbc.Connection):
|
|
|
|
|
+ """ Connexion to a database """
|
|
|
|
|
+ _cache = {}
|
|
|
|
|
+ default_name = ""
|
|
|
|
|
+ drivername = ""
|
|
|
|
|
+ dsn = ""
|
|
|
|
|
+ default_user = ""
|
|
|
|
|
+ default_pwd = ""
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, **kwargs):
|
|
|
|
|
+ cls = self.__class__
|
|
|
|
|
+ if not "uid" in kwargs and cls.default_user:
|
|
|
|
|
+ kwargs["uid"] = cls.default_user
|
|
|
|
|
+ if not "pwd" in kwargs and cls.default_pwd:
|
|
|
|
|
+ kwargs["pwd"] = cls.default_pwd
|
|
|
|
|
+ super(CustomDb, self).__init__(cls.dsn, **kwargs)
|
|
|
|
|
+
|
|
|
|
|
+ def connect(self, *args, **kwargs):
|
|
|
|
|
+ """ Establish the connexion to the database"""
|
|
|
|
|
+ logger.info("Connection to %s: %s", self.__class__.__name__, self.connectString)
|
|
|
|
|
+ super(CustomDb, self).connect(*args, **kwargs)
|
|
|
|
|
+
|
|
|
|
|
+ def read(self, sql, *args):
|
|
|
|
|
+ """ yield rows as NamedTupleRow """
|
|
|
|
|
+ cursor = self.execute(sql)
|
|
|
|
|
+ row = cursor.fetchone()
|
|
|
|
|
+ fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
|
|
|
|
|
+
|
|
|
|
|
+ rowmodel = namedtuple("Row", fieldnames)
|
|
|
|
|
+ while row:
|
|
|
|
|
+ yield rowmodel(*row)
|
|
|
|
|
+ row = cursor.fetchone()
|
|
|
|
|
+ cursor.close()
|
|
|
|
|
+
|
|
|
|
|
+ def read_all(self, sql, *args):
|
|
|
|
|
+ """ return the selection as a list of dictionnaries """
|
|
|
|
|
+ cursor = self.execute(sql)
|
|
|
|
|
+ fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
|
|
|
|
|
+
|
|
|
|
|
+ rowmodel = namedtuple("Row", fieldnames)
|
|
|
|
|
+ data = [rowmodel(*row) for row in cursor.fetchall()]
|
|
|
|
|
+ cursor.close()
|
|
|
|
|
+ return data
|
|
|
|
|
+
|
|
|
|
|
+ def first(self, sql, *args):
|
|
|
|
|
+ try:
|
|
|
|
|
+ return next(self.read(sql, *args))
|
|
|
|
|
+ except StopIteration:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def exists(self, sql, *args):
|
|
|
|
|
+ """ return True if the sql command retrieves records """
|
|
|
|
|
+ return (self.first(sql, *args) is not None)
|
|
|
|
|
+
|
|
|
|
|
+ def execute(self, sql, *args):
|
|
|
|
|
+ cursor = self.cursor()
|
|
|
|
|
+ args = [sql, tuple(args)] if args else [sql]
|
|
|
|
|
+ try:
|
|
|
|
|
+ cursor.execute(*args)
|
|
|
|
|
+ except:
|
|
|
|
|
+ logger.debug(sql)
|
|
|
|
|
+ print(sql)
|
|
|
|
|
+ raise
|
|
|
|
|
+ return cursor
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class AccessDb(CustomDb):
|
|
|
|
|
+ dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
|
|
|
|
|
+ default_user = "admin"
|
|
|
|
|
+ default_pwd = ""
|
|
|
|
|
+ def __init__(self, dbpath, **kwargs):
|
|
|
|
|
+ super(AccessDb, self).__init__(dbq=dbpath, **kwargs)
|
|
|
|
|
+
|
|
|
|
|
+ def assert_connected(self):
|
|
|
|
|
+ for row in self.read("SELECT TOP 1 * FROM MSysObjects;"):
|
|
|
|
|
+ if not row:
|
|
|
|
|
+ raise AssertionError("Unable to connect to: {}".format(self.connectString))
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def format_date(dat, in_format="%Y-%m-%dT%H:%M:%S", out_format="%m/%d/%Y"):
|
|
|
|
|
+ return datetime.strptime(str(dat), in_format).strftime(out_format)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def nz(val, default=""):
|
|
|
|
|
+ return val if val else default
|
|
|
|
|
+
|
|
|
|
|
+class AccessSDb(AccessDb):
|
|
|
|
|
+ dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
|
|
|
|
|
+ default_user = ""
|
|
|
|
|
+ default_pwd = ""
|
|
|
|
|
+ def __init__(self, dbpath, mdwpath, uid, pwd, **kwargs):
|
|
|
|
|
+ super(AccessSDb, self).__init__(dbpath, uid=uid, pwd=pwd, systemdb=mdwpath, **kwargs)
|
|
|
|
|
+
|
|
|
|
|
+class OracleDb(CustomDb):
|
|
|
|
|
+ dsn = "DRIVER={Oracle dans ORA102};"
|
|
|
|
|
+ def __init__(self, dbname, user, pwd, **kwargs):
|
|
|
|
|
+ super(OracleDb, self).__init__(dbq=dbname, uid=user, pwd=pwd, **kwargs)
|
|
|
|
|
+
|
|
|
|
|
+class SqlServerDb(CustomDb):
|
|
|
|
|
+ dsn = "DRIVER={SQL Server};"
|
|
|
|
|
+ def __init__(self, server, dbname, user, pwd, **kwargs):
|
|
|
|
|
+ super(SqlServerDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
|
|
|
|
|
+
|
|
|
|
|
+class PostgresDb(CustomDb):
|
|
|
|
|
+ dsn = "DRIVER={PostgreSQL Unicode};"
|
|
|
|
|
+ server = ""
|
|
|
|
|
+ db = ""
|
|
|
|
|
+ user = ""
|
|
|
|
|
+ pwd = ""
|
|
|
|
|
+ def __init__(self, server, dbname, user, pwd, **kwargs):
|
|
|
|
|
+ super(PostgresDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
|
|
|
|
|
+
|
|
|
|
|
+# class SqliteDb(CustomDb):
|
|
|
|
|
+# drivername = "QODBC"
|
|
|
|
|
+# dsn = "DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};FIL={{MS Access}}"
|
|
|
|
|
+# default_user = "admin"
|
|
|
|
|
+# pwd = ""
|
|
|
|
|
+# def __init__(self, dbpath, **kwargs):
|
|
|
|
|
+# CustomDb.__init__(self, dbq=dbpath, **kwargs)
|
|
|
|
|
+#
|