''' Convenient access to various databases ''' from collections import namedtuple from datetime import datetime import logging import pypyodbc pypyodbc.lowercase = False logger = logging.getLogger("database") class CustomDb(pypyodbc.Connection): """ Connexion to a database """ _cache = {} default_name = "" drivername = "" dsn = "" default_user = "" default_pwd = "" def __init__(self, **kwargs): cls = self.__class__ if not "uid" in kwargs and cls.default_user: kwargs["uid"] = cls.default_user if not "pwd" in kwargs and cls.default_pwd: kwargs["pwd"] = cls.default_pwd super(CustomDb, self).__init__(cls.dsn, **kwargs) def connect(self, *args, **kwargs): """ Establish the connexion to the database""" logger.info("Connection to %s: %s", self.__class__.__name__, self.connectString) super(CustomDb, self).connect(*args, **kwargs) def read(self, sql, *args): """ yield rows as NamedTupleRow """ cursor = self.execute(sql) row = cursor.fetchone() fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)] rowmodel = namedtuple("Row", fieldnames) while row: yield rowmodel(*row) row = cursor.fetchone() cursor.close() def read_all(self, sql, *args): """ return the selection as a list of dictionnaries """ cursor = self.execute(sql) fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)] rowmodel = namedtuple("Row", fieldnames) data = [rowmodel(*row) for row in cursor.fetchall()] cursor.close() return data def first(self, sql, *args): try: return next(self.read(sql, *args)) except StopIteration: return None def exists(self, sql, *args): """ return True if the sql command retrieves records """ return (self.first(sql, *args) is not None) def execute(self, sql, *args): cursor = self.cursor() args = [sql, tuple(args)] if args else [sql] try: cursor.execute(*args) except: logger.debug(sql) print(sql) raise return cursor class AccessDb(CustomDb): dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};" default_user = "admin" default_pwd = "" def __init__(self, dbpath, **kwargs): super(AccessDb, self).__init__(dbq=dbpath, **kwargs) def assert_connected(self): for row in self.read("SELECT TOP 1 * FROM MSysObjects;"): if not row: raise AssertionError("Unable to connect to: {}".format(self.connectString)) return @staticmethod def format_date(dat, in_format="%Y-%m-%dT%H:%M:%S", out_format="%m/%d/%Y"): return datetime.strptime(str(dat), in_format).strftime(out_format) @staticmethod def nz(val, default=""): return val if val else default class AccessSDb(AccessDb): dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};" default_user = "" default_pwd = "" def __init__(self, dbpath, mdwpath, uid, pwd, **kwargs): super(AccessSDb, self).__init__(dbpath, uid=uid, pwd=pwd, systemdb=mdwpath, **kwargs) class OracleDb(CustomDb): dsn = "DRIVER={Microsoft ODBC For Oracle};" def __init__(self, dbq, uid, pwd, **kwargs): super(OracleDb, self).__init__(dbq=dbq, uid=uid, pwd=pwd, **kwargs) class SqlServerDb(CustomDb): dsn = "DRIVER={SQL Server};" def __init__(self, server, dbname, user, pwd, **kwargs): super(SqlServerDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs) class PostgresDb(CustomDb): dsn = "DRIVER={PostgreSQL Unicode};" server = "" db = "" user = "" pwd = "" port = "" def __init__(self, server, dbname, user, pwd, **kwargs): super(PostgresDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs) # class SqliteDb(CustomDb): # drivername = "QODBC" # dsn = "DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};FIL={{MS Access}}" # default_user = "admin" # pwd = "" # def __init__(self, dbpath, **kwargs): # CustomDb.__init__(self, dbq=dbpath, **kwargs) #