| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- '''
- Convenient access to various databases
- '''
- from collections import namedtuple
- from datetime import datetime
- import logging
- import pypyodbc
- pypyodbc.lowercase = False
- logger = logging.getLogger("database")
- class CustomDb(pypyodbc.Connection):
- """ Connexion to a database """
- _cache = {}
- default_name = ""
- drivername = ""
- dsn = ""
- default_user = ""
- default_pwd = ""
- def __init__(self, **kwargs):
- cls = self.__class__
- if not "uid" in kwargs and cls.default_user:
- kwargs["uid"] = cls.default_user
- if not "pwd" in kwargs and cls.default_pwd:
- kwargs["pwd"] = cls.default_pwd
- super(CustomDb, self).__init__(cls.dsn, **kwargs)
- def connect(self, *args, **kwargs):
- """ Establish the connexion to the database"""
- logger.info("Connection to %s: %s", self.__class__.__name__, self.connectString)
- super(CustomDb, self).connect(*args, **kwargs)
- def read(self, sql, *args):
- """ yield rows as NamedTupleRow """
- cursor = self.execute(sql)
- row = cursor.fetchone()
- fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
- rowmodel = namedtuple("Row", fieldnames)
- while row:
- yield rowmodel(*row)
- row = cursor.fetchone()
- cursor.close()
- def read_all(self, sql, *args):
- """ return the selection as a list of dictionnaries """
- cursor = self.execute(sql)
- fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
- rowmodel = namedtuple("Row", fieldnames)
- data = [rowmodel(*row) for row in cursor.fetchall()]
- cursor.close()
- return data
- def first(self, sql, *args):
- try:
- return next(self.read(sql, *args))
- except StopIteration:
- return None
- def exists(self, sql, *args):
- """ return True if the sql command retrieves records """
- return (self.first(sql, *args) is not None)
- def execute(self, sql, *args):
- cursor = self.cursor()
- args = [sql, tuple(args)] if args else [sql]
- try:
- cursor.execute(*args)
- except:
- logger.debug(sql)
- print(sql)
- raise
- return cursor
- class AccessDb(CustomDb):
- dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
- default_user = "admin"
- default_pwd = ""
- def __init__(self, dbpath, **kwargs):
- super(AccessDb, self).__init__(dbq=dbpath, **kwargs)
- def assert_connected(self):
- for row in self.read("SELECT TOP 1 * FROM MSysObjects;"):
- if not row:
- raise AssertionError("Unable to connect to: {}".format(self.connectString))
- return
- @staticmethod
- def format_date(dat, in_format="%Y-%m-%dT%H:%M:%S", out_format="%m/%d/%Y"):
- return datetime.strptime(str(dat), in_format).strftime(out_format)
- @staticmethod
- def nz(val, default=""):
- return val if val else default
- class AccessSDb(AccessDb):
- dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
- default_user = ""
- default_pwd = ""
- def __init__(self, dbpath, mdwpath, uid, pwd, **kwargs):
- super(AccessSDb, self).__init__(dbpath, uid=uid, pwd=pwd, systemdb=mdwpath, **kwargs)
- class OracleDb(CustomDb):
- dsn = "DRIVER={Microsoft ODBC For Oracle};"
- def __init__(self, dbq, uid, pwd, **kwargs):
- super(OracleDb, self).__init__(dbq=dbq, uid=uid, pwd=pwd, **kwargs)
- class SqlServerDb(CustomDb):
- dsn = "DRIVER={SQL Server};"
- def __init__(self, server, dbname, user, pwd, **kwargs):
- super(SqlServerDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
- class PostgresDb(CustomDb):
- dsn = "DRIVER={PostgreSQL Unicode};"
- server = ""
- db = ""
- user = ""
- pwd = ""
- port = ""
- def __init__(self, server, dbname, user, pwd, **kwargs):
- super(PostgresDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
- # class SqliteDb(CustomDb):
- # drivername = "QODBC"
- # dsn = "DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};FIL={{MS Access}}"
- # default_user = "admin"
- # pwd = ""
- # def __init__(self, dbpath, **kwargs):
- # CustomDb.__init__(self, dbq=dbpath, **kwargs)
- #
|