db.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. '''
  2. Convenient access to various databases
  3. '''
  4. from collections import namedtuple
  5. from datetime import datetime
  6. import logging
  7. import pypyodbc
  8. pypyodbc.lowercase = False
  9. logger = logging.getLogger("database")
  10. class CustomDb(pypyodbc.Connection):
  11. """ Connexion to a database """
  12. _cache = {}
  13. default_name = ""
  14. drivername = ""
  15. dsn = ""
  16. default_user = ""
  17. default_pwd = ""
  18. def __init__(self, **kwargs):
  19. cls = self.__class__
  20. if not "uid" in kwargs and cls.default_user:
  21. kwargs["uid"] = cls.default_user
  22. if not "pwd" in kwargs and cls.default_pwd:
  23. kwargs["pwd"] = cls.default_pwd
  24. super(CustomDb, self).__init__(cls.dsn, **kwargs)
  25. def connect(self, *args, **kwargs):
  26. """ Establish the connexion to the database"""
  27. logger.info("Connection to %s: %s", self.__class__.__name__, self.connectString)
  28. super(CustomDb, self).connect(*args, **kwargs)
  29. def read(self, sql, *args):
  30. """ yield rows as NamedTupleRow """
  31. cursor = self.execute(sql)
  32. row = cursor.fetchone()
  33. fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
  34. rowmodel = namedtuple("Row", fieldnames)
  35. while row:
  36. yield rowmodel(*row)
  37. row = cursor.fetchone()
  38. cursor.close()
  39. def read_all(self, sql, *args):
  40. """ return the selection as a list of dictionnaries """
  41. cursor = self.execute(sql)
  42. fieldnames = [(column[0] if column[0].isidentifier() else "field_{}".format(i)) for i, column in enumerate(cursor.description)]
  43. rowmodel = namedtuple("Row", fieldnames)
  44. data = [rowmodel(*row) for row in cursor.fetchall()]
  45. cursor.close()
  46. return data
  47. def first(self, sql, *args):
  48. try:
  49. return next(self.read(sql, *args))
  50. except StopIteration:
  51. return None
  52. def exists(self, sql, *args):
  53. """ return True if the sql command retrieves records """
  54. return (self.first(sql, *args) is not None)
  55. def execute(self, sql, *args):
  56. cursor = self.cursor()
  57. args = [sql, tuple(args)] if args else [sql]
  58. cursor.execute(*args)
  59. return cursor
  60. class AccessDb(CustomDb):
  61. dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
  62. default_user = "admin"
  63. default_pwd = ""
  64. def __init__(self, dbpath, **kwargs):
  65. super(AccessDb, self).__init__(dbq=dbpath, **kwargs)
  66. def assert_connected(self):
  67. for row in self.read("SELECT TOP 1 * FROM MSysObjects;"):
  68. if not row:
  69. raise AssertionError("Unable to connect to: {}".format(self.connectString))
  70. return
  71. @staticmethod
  72. def format_date(dat, in_format="%Y-%m-%dT%H:%M:%S", out_format="%m/%d/%Y"):
  73. return datetime.strptime(str(dat), in_format).strftime(out_format)
  74. @staticmethod
  75. def nz(val, default=""):
  76. return val if val else default
  77. class AccessSDb(AccessDb):
  78. dsn = "DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};FIL={MS Access};"
  79. default_user = ""
  80. default_pwd = ""
  81. def __init__(self, dbpath, mdwpath, uid, pwd, **kwargs):
  82. super(AccessSDb, self).__init__(dbpath, uid=uid, pwd=pwd, systemdb=mdwpath, **kwargs)
  83. class OracleDb(CustomDb):
  84. dsn = "DRIVER={Oracle dans ORA102};"
  85. def __init__(self, dbname, user, pwd, **kwargs):
  86. super(OracleDb, self).__init__(dbq=dbname, uid=user, pwd=pwd, **kwargs)
  87. class SqlServerDb(CustomDb):
  88. dsn = "DRIVER={SQL Server};"
  89. def __init__(self, server, dbname, user, pwd, **kwargs):
  90. super(SqlServerDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
  91. class PostgresDb(CustomDb):
  92. dsn = "DRIVER={PostgreSQL Unicode};"
  93. server = ""
  94. db = ""
  95. user = ""
  96. pwd = ""
  97. def __init__(self, server, dbname, user, pwd, **kwargs):
  98. super(PostgresDb, self).__init__(server=server, database=dbname, uid=user, pwd=pwd, **kwargs)
  99. # class SqliteDb(CustomDb):
  100. # drivername = "QODBC"
  101. # dsn = "DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};FIL={{MS Access}}"
  102. # default_user = "admin"
  103. # pwd = ""
  104. # def __init__(self, dbpath, **kwargs):
  105. # CustomDb.__init__(self, dbq=dbpath, **kwargs)
  106. #
  107. ### SQL Helpers ###
  108. class SQLHelper():
  109. """ Génère du code sql """
  110. @classmethod
  111. def _sql_format(cls, val):
  112. """ pre-formatte une variable pour injection sql dans une base MS Access """
  113. raise NotImplementedError()
  114. @classmethod
  115. def select(cls, where=""):
  116. raise NotImplementedError()
  117. @classmethod
  118. def update(cls, tblname, data, where):
  119. raise NotImplementedError()
  120. @classmethod
  121. def insert(cls, tblname, data):
  122. raise NotImplementedError()
  123. @classmethod
  124. def delete(cls, tblname, where):
  125. raise NotImplementedError()
  126. class AccessSqlHelper(SQLHelper):
  127. """ SQL Helper pour MS Access """
  128. @classmethod
  129. def _sql_format(cls, val):
  130. if val is None:
  131. return "Null"
  132. elif type(val) is int or type(val) is bool:
  133. return "{}".format(val)
  134. elif type(val) is str:
  135. return "\"{}\"".format(val)
  136. elif type(val) is datetime:
  137. return "#{:%Y-%m-%d %H:%M:%S}#".format(val)
  138. return "{}".format(val)
  139. @classmethod
  140. def select(cls, where=""):
  141. sql = "SELECT * FROM {}".format(cls._tblname)
  142. if where:
  143. sql = "{} WHERE {}".format(sql, where)
  144. return sql
  145. @classmethod
  146. def update(cls, tblname, data, where):
  147. sql = "UPDATE {} SET {} WHERE {}".format(tblname,
  148. ",".join(["{} = {}".format(key, cls._sql_format(data[key])) for key in data]),
  149. " AND ".join(["{} = {}".format(key, cls._sql_format(where[key])) for key in where]))
  150. return sql
  151. @classmethod
  152. def insert(cls, tblname, data):
  153. sql = "INSERT INTO {} ({}) VALUES ({})".format(tblname,
  154. ",".join(data.keys()),
  155. ",".join([cls._sql_format(data[key]) for key in data]))
  156. return sql
  157. @classmethod
  158. def delete(cls, tblname, where):
  159. sql = "DELETE * FROM {} WHERE {}".format(tblname,
  160. " AND ".join(["{} = {}".format(key, cls._sql_format(where[key])) for key in where]))
  161. return sql