clonedb.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. """
  2. Script de clonage des bases de données MySql
  3. (requiert python 3.6+)
  4. > Configuration: settings.yml
  5. Usage:
  6. clonedb.py [-v] [-y] [<dbname>]
  7. clonedb.py (-h | --help)
  8. clonedb.py --version
  9. Options:
  10. -y, --yes Do not ask for confirmation
  11. -h --help Show this screen.
  12. --version Show version.
  13. @author: olivier.massot, 05-2020
  14. """
  15. import logging
  16. import re
  17. from subprocess import Popen, PIPE, CalledProcessError
  18. import sys
  19. import pymysql
  20. import yaml
  21. from docopt import docopt
  22. from path import Path
  23. from core import logging_
  24. from core.locker import Lockfile
  25. from core.pipe_handler import PipeHandler
  26. from core.ssh import SshTunnel
  27. from core.prompt import ask_confirmation
  28. __VERSION__ = "0.2"
  29. HERE = Path(__file__).parent
  30. # Start logger
  31. logger = logging.getLogger('clonedb')
  32. logging_.start("clonedb", replace=True)
  33. # FIX the default ascii encoding on some linux dockers...
  34. sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
  35. # Options
  36. SHOW_PROGRESSION = True
  37. MAX_ALLOWED_PACKET = 1073741824
  38. LOG_MYSQL_QUERIES = False
  39. LOG_MYSQLDUMP_OUTPUT = False
  40. # Utilities
  41. def load_settings():
  42. """ Load the settings from the 'settings.yml' file
  43. """
  44. with open(HERE / 'settings.yml', 'r') as f:
  45. return yaml.load(f, Loader=yaml.FullLoader)
  46. class MysqldumpHandler(PipeHandler):
  47. """ Handle and process the stdout / stderr output from a mysqldump process
  48. """
  49. _rx_newtable = re.compile(r'Retrieving table structure for table (\w+)')
  50. def process(self, line):
  51. """ Process the last line that was read
  52. """
  53. line = line.strip('\n')
  54. if SHOW_PROGRESSION:
  55. match = self._rx_newtable.search(line)
  56. if match:
  57. if not LOG_MYSQLDUMP_OUTPUT:
  58. logger.debug('... %s', match.group(1))
  59. print('.', end="", flush=True)
  60. if LOG_MYSQLDUMP_OUTPUT:
  61. logger.debug(line)
  62. def close(self):
  63. """ Close the write end of the pipe.
  64. """
  65. print('', flush=True)
  66. super().close()
  67. class MySqlServer:
  68. """ A server hosting a Mysql instance
  69. """
  70. def __init__(self, host, port, username, password, description="", ssh_tunnel=None):
  71. self.host = host
  72. self.port = port
  73. self.username = username
  74. self.password = password
  75. self.description = description[:30]
  76. self.ssh_tunnel = ssh_tunnel
  77. self.cnn = None
  78. self.active_db = ""
  79. def __repr__(self):
  80. s = f"{self.host}:{self.port} as {self.username}"
  81. if self.description:
  82. s = f"{self.description} ({s})"
  83. return s
  84. def connect(self, autocommit=True):
  85. """ Establish the connection to the Mysql server
  86. @see https://pymysql.readthedocs.io/en/latest/modules/connections.html
  87. """
  88. if self.ssh_tunnel:
  89. self.ssh_tunnel.start()
  90. host, port = self.ssh_tunnel.LOCAL_ADRESS
  91. else:
  92. host, port = self.host, self.port
  93. self.cnn = pymysql.connect(host=host,
  94. port=port,
  95. user=self.username,
  96. password=self.password,
  97. autocommit=autocommit,
  98. max_allowed_packet=MAX_ALLOWED_PACKET,
  99. )
  100. if not self.cnn.open:
  101. raise RuntimeError(f'Unable to connect to {self}')
  102. return self.cnn
  103. def set_active_db(self, dbname):
  104. """ set the active database
  105. """
  106. self.cnn.select_db(dbname)
  107. self.active_db = dbname
  108. def close(self):
  109. """ Close the connection to the database
  110. and the ssh tunnel if one is opened
  111. """
  112. if self.cnn:
  113. self.cnn.close()
  114. if self.ssh_tunnel:
  115. self.ssh_tunnel.stop()
  116. logger.debug(f'{self} - connection closed')
  117. def exec_query(self, sql):
  118. """ Execute the sql code and return the resulting cursor
  119. @see https://pymysql.readthedocs.io/en/latest/modules/cursors.html
  120. """
  121. self.cnn.ping(reconnect=True)
  122. cursor = self.cnn.cursor()
  123. if LOG_MYSQL_QUERIES:
  124. logger.debug(sql)
  125. cursor.execute(sql)
  126. return cursor
  127. def db_exists(self, dbname):
  128. """ Return True if the database exists
  129. """
  130. cursor = self.exec_query(f"""SELECT SCHEMA_NAME
  131. FROM INFORMATION_SCHEMA.SCHEMATA
  132. WHERE SCHEMA_NAME = '{dbname}'""")
  133. row = cursor.fetchone()
  134. return row is not None
  135. def list_tables(self, dbname=""):
  136. """ Return a list of tables (but not views!)
  137. for either the currently selected database,
  138. or the one given as a parameter"""
  139. cursor = self.exec_query(
  140. "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else ""))
  141. return (row[0] for row in cursor.fetchall())
  142. def list_views(self, dbname=""):
  143. """ Return a list of views
  144. for either the currently selected database,
  145. or the one given as a parameter"""
  146. cursor = self.exec_query(
  147. "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else ""))
  148. return (row[0] for row in cursor.fetchall())
  149. def get_view_definition(self, view_name, set_definer=""):
  150. """ Return the SQL create statement for the view
  151. If 'set_definer' is not empty, the username in the 'SET DEFINER' part
  152. of the create statement is replace by the one given
  153. """
  154. cursor = self.exec_query(f"show create view {view_name}")
  155. definition = cursor.fetchone()[1]
  156. if set_definer:
  157. # force a new definer
  158. definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`',
  159. f"DEFINER=`{set_definer}`@`\1`",
  160. definition)
  161. return definition
  162. # Beahviors for the tables cloning
  163. IGNORE = 0
  164. STRUCTURE_ONLY = 1
  165. STRUCTURE_AND_DATA = 2 # -> default behavior
  166. class CloningOperation:
  167. """ A database cloning operation between two Mysql servers
  168. """
  169. def __init__(self, dbname, from_server, to_server, is_default=True, ignore_tables=None, structure_only=None,
  170. filter_tables=None, ignore_views=None, compress=True):
  171. self.dbname = dbname
  172. self.from_server = from_server
  173. self.to_server = to_server
  174. self.is_default = is_default
  175. self.compress = compress
  176. self.ignore_tables = [re.compile(r) for r in ignore_tables] if ignore_tables else []
  177. self.structure_only = [re.compile(r) for r in structure_only] if structure_only else []
  178. self.filter_tables = [re.compile(r) for r in filter_tables] if filter_tables else []
  179. self.ignore_views = [re.compile(r) for r in ignore_views] if ignore_views else []
  180. def __repr__(self):
  181. return f"Cloning {self.dbname} from {self.from_server} to {self.to_server}"
  182. def _build_dump_command(self, dump_options=None, tables=None):
  183. """ Build a mysqldump command line and return it as a
  184. ready-to-consume list for Popen
  185. @see https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary
  186. """
  187. tables = tables or []
  188. dump_options = dump_options or []
  189. base_cmd = ["mysqldump",
  190. "--single-transaction",
  191. "-u", self.from_server.username,
  192. f"--password={self.from_server.password}",
  193. "--skip-add-drop-table",
  194. "--skip-add-locks",
  195. "--skip-comments",
  196. ]
  197. if self.compress:
  198. base_cmd.append("--compress")
  199. if SHOW_PROGRESSION:
  200. base_cmd.append("--verbose")
  201. if self.from_server.ssh_tunnel:
  202. host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS
  203. base_cmd += ["--host", host,
  204. "--port", str(port)]
  205. return base_cmd + dump_options + [self.dbname] + tables
  206. def _build_restore_command(self):
  207. """ Build a mysql command line and return it as a
  208. ready-to-consume list for Popen
  209. @see https://dev.mysql.com/doc/refman/8.0/en/mysql-command-options.html#option_mysql_quick
  210. """
  211. cmd = ["mysql",
  212. "-h", self.to_server.host,
  213. "-P", str(self.to_server.port),
  214. "-u", self.to_server.username,
  215. f"--password={self.to_server.password}",
  216. f"--max-allowed-packet={MAX_ALLOWED_PACKET}",
  217. "--reconnect",
  218. "--quick",
  219. "-D", self.dbname
  220. ]
  221. if LOG_MYSQL_QUERIES:
  222. cmd.append("--verbose")
  223. if self.compress:
  224. cmd.append("--compress")
  225. return cmd
  226. @staticmethod
  227. def _run_piped_processes(dump_cmd, restore_cmd):
  228. """ Run the dump and the restore commands by piping them
  229. The output of the mysqldump process is piped into the input of the mysql one
  230. """
  231. logger.debug(">>> Dump command: %s", " ".join(map(str, dump_cmd)))
  232. logger.debug(">>> Piped into: %s", " ".join(map(str, restore_cmd)))
  233. pipe_handler = MysqldumpHandler(logger.name)
  234. try:
  235. # noinspection PyTypeChecker
  236. with Popen(restore_cmd, stdin=PIPE, stdout=pipe_handler, stderr=pipe_handler) as mysql:
  237. # noinspection PyTypeChecker
  238. with Popen(dump_cmd, stdout=PIPE, stderr=pipe_handler) as mysqldump:
  239. mysql.stdin.write(mysqldump.stdout.read())
  240. if mysqldump.returncode or mysql.returncode:
  241. raise RuntimeError
  242. except (OSError, RuntimeError, CalledProcessError) as e:
  243. logger.error("Execution failed: %s", e)
  244. raise RuntimeError(f"An error happened at runtime: {e}")
  245. finally:
  246. pipe_handler.close()
  247. def run(self):
  248. """ Run the cloning op
  249. """
  250. logger.info(f"*** Cloning {self.dbname} ***")
  251. logger.info(f"> From {self.from_server}")
  252. logger.info(f"> To {self.to_server}")
  253. try:
  254. self.from_server.connect()
  255. self.from_server.set_active_db(self.dbname)
  256. logger.debug('Connected to %s', self.from_server)
  257. self.to_server.connect()
  258. logger.debug('Connected to %s', self.to_server)
  259. tables = {}
  260. for tname in self.from_server.list_tables():
  261. if any(rx.match(tname) for rx in self.ignore_tables):
  262. tables[tname] = IGNORE
  263. elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables):
  264. tables[tname] = IGNORE
  265. elif any(rx.match(tname) for rx in self.structure_only):
  266. tables[tname] = STRUCTURE_ONLY
  267. else:
  268. tables[tname] = STRUCTURE_AND_DATA
  269. restore_cmd = self._build_restore_command()
  270. # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ...
  271. dump_structure_for = [t for t, s in tables.items() if s != IGNORE]
  272. dump_structure_cmd = self._build_dump_command(["--single-transaction", "--no-data", "--routines"],
  273. dump_structure_for)
  274. # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ...
  275. dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA]
  276. dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"],
  277. dump_data_for)
  278. if tables and not dump_structure_for and not dump_data_for:
  279. logging.warning('No table will be cloned')
  280. # Recreate the target DB
  281. logger.info("(Re)create the database")
  282. self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;")
  283. self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;")
  284. self.to_server.set_active_db(self.dbname)
  285. # Run mysqldump
  286. try:
  287. if dump_structure_for:
  288. logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...")
  289. self._run_piped_processes(dump_structure_cmd, restore_cmd)
  290. if dump_data_for:
  291. logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...")
  292. self._run_piped_processes(dump_data_cmd, restore_cmd)
  293. logger.info(f"Cloning views...")
  294. for v in self.from_server.list_views(self.dbname):
  295. if any(rx.match(v) for rx in self.ignore_views):
  296. continue
  297. logger.debug('* cloning view %s', v)
  298. definition = self.from_server.get_view_definition(v, self.to_server.username)
  299. try:
  300. self.to_server.exec_query(definition)
  301. except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e:
  302. logger.error('Unable to create the internal view {p}: {e}')
  303. logger.info("> the database was successfully cloned")
  304. except RuntimeError:
  305. logger.error(f"<!> An error happened while cloning the '{self.dbname}' database")
  306. finally:
  307. self.from_server.close()
  308. self.to_server.close()
  309. def main(settings, arguments):
  310. prompt = not arguments["--yes"]
  311. logger.info("Start db cloning utility...")
  312. logger.debug(f"Settings: %s", str(settings).replace('\r', '').replace('\n', ''))
  313. logger.debug(f"Arguments: %s", str(arguments).replace('\r', '').replace('\n', ''))
  314. # Load the servers' configuration
  315. servers = {}
  316. for server_name, server_settings in settings['servers'].items():
  317. hostname = server_settings['host']
  318. if 'ssh' in server_settings:
  319. ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh'])
  320. else:
  321. ssh_tunnel = None
  322. server = MySqlServer(hostname,
  323. **server_settings['mysql'],
  324. description=server_settings['description'],
  325. ssh_tunnel=ssh_tunnel)
  326. servers[server_name] = server
  327. # Load the cloning ops' configuration
  328. ops = {}
  329. for name, args in settings['operations'].items():
  330. dbname = args['dbname']
  331. from_server = servers[args['from_server']]
  332. to_server = servers[args['to_server']]
  333. kwargs = {k: v for k, v in args.items() if k not in ('dbname', 'from_server', 'to_server')}
  334. op = CloningOperation(dbname, from_server, to_server, **kwargs)
  335. ops[name] = op
  336. # Operations to launch
  337. if arguments.get('<dbname>', None):
  338. selected_ops = [ops[arguments['<dbname>']]]
  339. else:
  340. selected_ops = [op for op in ops.values() if op.is_default]
  341. if not selected_ops:
  342. raise RuntimeError('No operation to launch')
  343. # Ask for confirmation (except if '--yes' is in arguments)
  344. if prompt:
  345. logger.debug('Ask for confirmation...')
  346. msg = "The following operations will be launched:\n{}\n" \
  347. "WARNING: the existing local databases will be replaced" \
  348. "".format("\n".join(f"* {op}" for op in selected_ops))
  349. if not ask_confirmation(msg):
  350. logger.info("-- Operation cancelled by user --")
  351. return
  352. logger.debug('> User confirmed')
  353. # Run the cloning operations
  354. for op in selected_ops:
  355. op.run()
  356. if __name__ == '__main__':
  357. # load settings from settings.yml file
  358. settings = load_settings()
  359. # parse CLI arguments
  360. arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
  361. with Lockfile(path=HERE / '.clonedb.lock',
  362. on_error=lambda: logger.critical("A cloning process is already running, please wait...")):
  363. main(settings, arguments)