db.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. '''
  2. Convenient access to various databases
  3. '''
  4. from collections import namedtuple
  5. from datetime import datetime
  6. import logging
  7. import pypyodbc
  8. pypyodbc.lowercase = False
  9. logger = logging.getLogger("database")
  10. class CustomDb(pypyodbc.Connection):
  11. """ Connexion to a database """
  12. _cache = {}
  13. default_name = ""
  14. drivername = ""
  15. dsn = ""
  16. default_user = ""
  17. default_pwd = ""
  18. def __init__(self, **kwargs):
  19. cls = self.__class__
  20. if not "uid" in kwargs and cls.default_user:
  21. kwargs["uid"] = cls.default_user
  22. if not "pwd" in kwargs and cls.default_pwd:
  23. kwargs["pwd"] = cls.default_pwd
  24. super(CustomDb, self).__init__(cls.dsn, **kwargs)
  25. def connect(self, *args, **kwargs):
  26. """ Establish the connexion to the database"""
  27. logger.info("Connection to %s: %s", self.__class__.__name__, self.connectString)
  28. super(CustomDb, self).connect(*args, **kwargs)
  29. def read(self, sql, *args):
  30. """ yield rows as NamedTupleRow """
  31. cursor = self.execute(sql)
  32. row = cursor.fetchone()
  33. fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
  34. rowmodel = namedtuple("Row", fieldnames)
  35. while row:
  36. yield rowmodel(*row)
  37. row = cursor.fetchone()
  38. cursor.close()
  39. def read_all(self, sql, *args):
  40. """ return the selection as a list of dictionnaries """
  41. cursor = self.execute(sql)
  42. fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
  43. rowmodel = namedtuple("Row", fieldnames)
  44. data = [rowmodel(*row) for row in cursor.fetchall()]
  45. cursor.close()
  46. return data
  47. def first(self, sql, *args):
  48. try:
  49. return next(self.read(sql, *args))
  50. except StopIteration:
  51. return None
  52. def exists(self, sql, *args):
  53. """ return True if the sql command retrieves records """
  54. return (self.first(sql, *args) is not None)
  55. def execute(self, sql, *args):
  56. cursor = self.cursor()
  57. args = [sql, tuple(args)] if args else [sql]
  58. cursor.execute(*args)
  59. return cursor
  60. class AccessDb(CustomDb):
  61. dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
  62. default_user = "admin"
  63. default_pwd = ""
  64. def __init__(self, dbpath, **kwargs):
  65. super(AccessDb, self).__init__(dbq=dbpath, **kwargs)
  66. def assert_connected(self):
  67. for row in self.read("SELECT TOP 1 * FROM MSysObjects;"):
  68. if not row:
  69. raise AssertionError("Unable to connect to: {}".format(self.connectString))
  70. return
  71. @staticmethod
  72. def format_date(dat, in_format="%Y-%m-%dT%H:%M:%S", out_format="%m/%d/%Y"):
  73. return datetime.strptime(str(dat), in_format).strftime(out_format)
  74. @staticmethod
  75. def nz(val, default=""):
  76. return val if val else default
  77. class AccessSDb(AccessDb):
  78. dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
  79. default_user = ""
  80. default_pwd = ""
  81. def __init__(self, dbpath, mdwpath, uid, pwd, **kwargs):
  82. super(AccessSDb, self).__init__(dbpath, uid=uid, pwd=pwd, systemdb=mdwpath, **kwargs)
  83. class OracleDb(CustomDb):
  84. dsn = "DRIVER={Oracle dans ORA102};"
  85. def __init__(self, dbname, user, pwd, **kwargs):
  86. super(OracleDb, self).__init__(dbq=dbname, uid=user, pwd=pwd, **kwargs)
  87. class SqlServerDb(CustomDb):
  88. dsn = "DRIVER={SQL Server};"
  89. def __init__(self, server, dbname, user, pwd, **kwargs):
  90. super(SqlServerDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
  91. # class SqliteDb(CustomDb):
  92. # drivername = "QODBC"
  93. # dsn = "DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};FIL={{MS Access}}"
  94. # default_user = "admin"
  95. # pwd = ""
  96. # def __init__(self, dbpath, **kwargs):
  97. # CustomDb.__init__(self, dbq=dbpath, **kwargs)
  98. #
  99. # class PostgresDb(CustomDb):
  100. # drivername = "QODBC"
  101. # dsn = "DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};FIL={{MS Access}}"
  102. # default_user = "admin"
  103. # pwd = ""
  104. # def __init__(self, dbpath, **kwargs):
  105. # CustomDb.__init__(self, dbq=dbpath, **kwargs)
  106. ### SQL Helpers ###
  107. class SQLHelper():
  108. """ Génère du code sql """
  109. @classmethod
  110. def _sql_format(cls, val):
  111. """ pre-formatte une variable pour injection sql dans une base MS Access """
  112. raise NotImplementedError()
  113. @classmethod
  114. def select(cls, where=""):
  115. raise NotImplementedError()
  116. @classmethod
  117. def update(cls, tblname, data, where):
  118. raise NotImplementedError()
  119. @classmethod
  120. def insert(cls, tblname, data):
  121. raise NotImplementedError()
  122. @classmethod
  123. def delete(cls, tblname, where):
  124. raise NotImplementedError()
  125. class AccessSqlHelper(SQLHelper):
  126. """ SQL Helper pour MS Access """
  127. @classmethod
  128. def _sql_format(cls, val):
  129. if val is None:
  130. return "Null"
  131. elif type(val) is int or type(val) is bool:
  132. return "{}".format(val)
  133. elif type(val) is str:
  134. return "\"{}\"".format(val)
  135. elif type(val) is datetime:
  136. return "#{:%Y-%m-%d %H:%M:%S}#".format(val)
  137. return "{}".format(val)
  138. @classmethod
  139. def select(cls, where=""):
  140. sql = "SELECT * FROM {}".format(cls._tblname)
  141. if where:
  142. sql = "{} WHERE {}".format(sql, where)
  143. return sql
  144. @classmethod
  145. def update(cls, tblname, data, where):
  146. sql = "UPDATE {} SET {} WHERE {}".format(tblname,
  147. ",".join(["{} = {}".format(key, cls._sql_format(data[key])) for key in data]),
  148. " AND ".join(["{} = {}".format(key, cls._sql_format(where[key])) for key in where]))
  149. return sql
  150. @classmethod
  151. def insert(cls, tblname, data):
  152. sql = "INSERT INTO {} ({}) VALUES ({})".format(tblname,
  153. ",".join(data.keys()),
  154. ",".join([cls._sql_format(data[key]) for key in data]))
  155. return sql
  156. @classmethod
  157. def delete(cls, tblname, where):
  158. sql = "DELETE * FROM {} WHERE {}".format(tblname,
  159. " AND ".join(["{} = {}".format(key, cls._sql_format(where[key])) for key in where]))
  160. return sql