db.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. '''
  2. Convenient access to various databases
  3. '''
  4. from collections import namedtuple
  5. from datetime import datetime
  6. import logging
  7. import pypyodbc
  8. pypyodbc.lowercase = False
  9. logger = logging.getLogger("database")
  10. class CustomDb(pypyodbc.Connection):
  11. """ Connexion to a database """
  12. _cache = {}
  13. default_name = ""
  14. drivername = ""
  15. dsn = ""
  16. default_user = ""
  17. default_pwd = ""
  18. def __init__(self, **kwargs):
  19. cls = self.__class__
  20. if not "uid" in kwargs and cls.default_user:
  21. kwargs["uid"] = cls.default_user
  22. if not "pwd" in kwargs and cls.default_pwd:
  23. kwargs["pwd"] = cls.default_pwd
  24. super(CustomDb, self).__init__(cls.dsn, **kwargs)
  25. def connect(self, *args, **kwargs):
  26. """ Establish the connexion to the database"""
  27. logger.info("Connection to %s: %s", self.__class__.__name__, self.connectString)
  28. super(CustomDb, self).connect(*args, **kwargs)
  29. def read(self, sql, *args):
  30. """ yield rows as NamedTupleRow """
  31. cursor = self.execute(sql)
  32. row = cursor.fetchone()
  33. fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
  34. rowmodel = namedtuple("Row", fieldnames)
  35. while row:
  36. yield rowmodel(*row)
  37. row = cursor.fetchone()
  38. cursor.close()
  39. def read_all(self, sql, *args):
  40. """ return the selection as a list of dictionnaries """
  41. cursor = self.execute(sql)
  42. fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
  43. rowmodel = namedtuple("Row", fieldnames)
  44. data = [rowmodel(*row) for row in cursor.fetchall()]
  45. cursor.close()
  46. return data
  47. def first(self, sql, *args):
  48. try:
  49. return next(self.read(sql, *args))
  50. except StopIteration:
  51. return None
  52. def exists(self, sql, *args):
  53. """ return True if the sql command retrieves records """
  54. return (self.first(sql, *args) is not None)
  55. def execute(self, sql, *args):
  56. cursor = self.cursor()
  57. args = [sql, tuple(args)] if args else [sql]
  58. try:
  59. cursor.execute(*args)
  60. except:
  61. logger.debug(sql)
  62. print(sql)
  63. raise
  64. return cursor
  65. class AccessDb(CustomDb):
  66. dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
  67. default_user = "admin"
  68. default_pwd = ""
  69. def __init__(self, dbpath, **kwargs):
  70. super(AccessDb, self).__init__(dbq=dbpath, **kwargs)
  71. def assert_connected(self):
  72. for row in self.read("SELECT TOP 1 * FROM MSysObjects;"):
  73. if not row:
  74. raise AssertionError("Unable to connect to: {}".format(self.connectString))
  75. return
  76. @staticmethod
  77. def format_date(dat, in_format="%Y-%m-%dT%H:%M:%S", out_format="%m/%d/%Y"):
  78. return datetime.strptime(str(dat), in_format).strftime(out_format)
  79. @staticmethod
  80. def nz(val, default=""):
  81. return val if val else default
  82. class AccessSDb(AccessDb):
  83. dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
  84. default_user = ""
  85. default_pwd = ""
  86. def __init__(self, dbpath, mdwpath, uid, pwd, **kwargs):
  87. super(AccessSDb, self).__init__(dbpath, uid=uid, pwd=pwd, systemdb=mdwpath, **kwargs)
  88. class OracleDb(CustomDb):
  89. dsn = "DRIVER={Oracle dans ORA102};"
  90. def __init__(self, dbname, user, pwd, **kwargs):
  91. super(OracleDb, self).__init__(dbq=dbname, uid=user, pwd=pwd, **kwargs)
  92. class SqlServerDb(CustomDb):
  93. dsn = "DRIVER={SQL Server};"
  94. def __init__(self, server, dbname, user, pwd, **kwargs):
  95. super(SqlServerDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
  96. class PostgresDb(CustomDb):
  97. dsn = "DRIVER={PostgreSQL Unicode};"
  98. server = ""
  99. db = ""
  100. user = ""
  101. pwd = ""
  102. def __init__(self, server, dbname, user, pwd, **kwargs):
  103. super(PostgresDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
  104. # class SqliteDb(CustomDb):
  105. # drivername = "QODBC"
  106. # dsn = "DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};FIL={{MS Access}}"
  107. # default_user = "admin"
  108. # pwd = ""
  109. # def __init__(self, dbpath, **kwargs):
  110. # CustomDb.__init__(self, dbq=dbpath, **kwargs)
  111. #