clonedb.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. """
  2. Script de clonage des bases de données MySql
  3. (requiert python 3.6+)
  4. > Configuration: settings.yml
  5. Usage:
  6. clonedb.py [-v] [-y] [<opname>...]
  7. clonedb.py (-h | --help)
  8. clonedb.py --version
  9. Options:
  10. -y, --yes Do not ask for confirmation
  11. -h --help Show this screen.
  12. --version Show version.
  13. @author: olivier.massot, 05-2020
  14. """
  15. import logging
  16. import re
  17. from subprocess import Popen, PIPE, CalledProcessError
  18. import sys
  19. import pymysql
  20. import yaml
  21. from docopt import docopt
  22. from path import Path
  23. from core import logging_
  24. from core.docker import resolve_docker_ip
  25. from core.locker import Lockfile
  26. from core.pipe_handler import PipeHandler
  27. from core.ssh import SshTunnel
  28. from core.prompt import ask_confirmation
  29. __VERSION__ = "0.2"
  30. HERE = Path(__file__).parent
  31. # Start logger
  32. LOG_DIR = HERE / 'log'
  33. LOG_DIR.mkdir_p()
  34. logger = logging.getLogger('clonedb')
  35. logging_.start("clonedb", filename=LOG_DIR / 'clonedb.log', replace=True)
  36. # FIX the default ascii encoding on some linux dockers...
  37. sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
  38. # Options
  39. SHOW_PROGRESSION = True
  40. LOG_PIPES_OUTPUT = False
  41. LOG_MYSQL_QUERIES = False
  42. MY_CNF_FILE = Path(HERE / "my.cnf")
  43. MAX_ALLOWED_PACKET = 1073741824
  44. # Utilities
  45. def load_settings():
  46. """ Load the settings from the 'settings.yml' file
  47. If there is no such file, the base settings.yml file is created
  48. """
  49. settings_file = HERE / 'settings.yml'
  50. if not settings_file.exists():
  51. Path(HERE / 'settings.yml.dist').copy(HERE / 'settings.yml')
  52. with open(settings_file, 'r') as f:
  53. return yaml.load(f, Loader=yaml.FullLoader)
  54. class MysqldumpHandler(PipeHandler):
  55. """ Handle and process the stdout / stderr output from a mysqldump process
  56. """
  57. _rx_prog = re.compile(r'Retrieving table structure for table (\w+)')
  58. _log_all = LOG_PIPES_OUTPUT
  59. _action_name = "dumping"
  60. def process(self, line):
  61. """ Process the last line that was read
  62. """
  63. line = line.strip('\n')
  64. if SHOW_PROGRESSION:
  65. match = self._rx_prog.search(line)
  66. if match:
  67. logger.debug('... %s %s', self._action_name, match.group(1))
  68. print('.', end="", flush=True)
  69. if self._log_all:
  70. logger.debug(line)
  71. def close(self):
  72. """ Close the write end of the pipe.
  73. """
  74. print('', flush=True)
  75. super().close()
  76. class MysqlHandler(MysqldumpHandler):
  77. """ Handle and process the stdout / stderr output from a mysql process
  78. """
  79. _rx_prog = re.compile(r'^((?:CREATE TABLE )|(?:INSERT INTO ))`(\w+)`')
  80. _log_all = LOG_PIPES_OUTPUT
  81. _action_name = "restoring"
  82. _last_logged = ""
  83. def process(self, line):
  84. """ Process the last line that was read
  85. """
  86. line = line.strip('\n')
  87. if SHOW_PROGRESSION:
  88. match = self._rx_prog.search(line)
  89. if match:
  90. tname = match.group(2)
  91. if tname != self._last_logged:
  92. logger.debug('... %s %s %s', self._action_name,
  93. 'structure of' if 'CREATE' in match.group(1) else 'data of',
  94. tname)
  95. print('.', end="", flush=True)
  96. self._last_logged = tname
  97. if self._log_all:
  98. logger.debug(line)
  99. class MySqlServer:
  100. """ A server hosting a Mysql instance
  101. """
  102. def __init__(self, host, port, username, password, description="", ssh_tunnel=None):
  103. self.host = host
  104. self.port = port
  105. self.username = username
  106. self.password = password
  107. self.description = description[:30]
  108. self.ssh_tunnel = ssh_tunnel
  109. self.cnn = None
  110. self.active_db = ""
  111. def __repr__(self):
  112. s = f"{self.host}:{self.port} as {self.username}"
  113. if self.description:
  114. s = f"{self.description} ({s})"
  115. return s
  116. def connect(self, autocommit=True):
  117. """ Establish the connection to the Mysql server
  118. @see https://pymysql.readthedocs.io/en/latest/modules/connections.html
  119. """
  120. if self.ssh_tunnel:
  121. self.ssh_tunnel.start()
  122. host, port = self.ssh_tunnel.LOCAL_ADRESS
  123. else:
  124. host, port = self.host, self.port
  125. self.cnn = pymysql.connect(host=host,
  126. port=port,
  127. user=self.username,
  128. password=self.password,
  129. autocommit=autocommit,
  130. max_allowed_packet="32M",
  131. )
  132. if not self.cnn.open:
  133. raise RuntimeError(f'Unable to connect to {self}')
  134. return self.cnn
  135. def set_active_db(self, dbname):
  136. """ set the active database
  137. """
  138. self.cnn.select_db(dbname)
  139. self.active_db = dbname
  140. def close(self):
  141. """ Close the connection to the database
  142. and the ssh tunnel if one is opened
  143. """
  144. if self.cnn:
  145. self.cnn.close()
  146. if self.ssh_tunnel:
  147. self.ssh_tunnel.stop()
  148. logger.debug(f'{self} - connection closed')
  149. def exec_query(self, sql):
  150. """ Execute the sql code and return the resulting cursor
  151. @see https://pymysql.readthedocs.io/en/latest/modules/cursors.html
  152. """
  153. self.cnn.ping(reconnect=True)
  154. cursor = self.cnn.cursor()
  155. if LOG_MYSQL_QUERIES:
  156. logger.debug(sql)
  157. cursor.execute(sql)
  158. return cursor
  159. def db_exists(self, dbname):
  160. """ Return True if the database exists
  161. """
  162. cursor = self.exec_query(f"""SELECT SCHEMA_NAME
  163. FROM INFORMATION_SCHEMA.SCHEMATA
  164. WHERE SCHEMA_NAME = '{dbname}'""")
  165. row = cursor.fetchone()
  166. return row is not None
  167. def list_tables(self, dbname=""):
  168. """ Return a list of tables (but not views!)
  169. for either the currently selected database,
  170. or the one given as a parameter"""
  171. cursor = self.exec_query(
  172. "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else ""))
  173. return (row[0] for row in cursor.fetchall())
  174. def list_views(self, dbname=""):
  175. """ Return a list of views
  176. for either the currently selected database,
  177. or the one given as a parameter"""
  178. cursor = self.exec_query(
  179. "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else ""))
  180. return (row[0] for row in cursor.fetchall())
  181. def get_view_definition(self, view_name, set_definer=""):
  182. """ Return the SQL create statement for the view
  183. If 'set_definer' is not empty, the username in the 'SET DEFINER' part
  184. of the create statement is replace by the one given
  185. """
  186. cursor = self.exec_query(f"show create view {view_name}")
  187. definition = cursor.fetchone()[1]
  188. if set_definer:
  189. # force a new definer
  190. definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`',
  191. f"DEFINER=`{set_definer}`@`\1`",
  192. definition)
  193. return definition
  194. # Operation status
  195. UNKNOWN = 0
  196. SUCCESS = 1
  197. FAILURE = 2
  198. # Behaviors for the tables cloning
  199. IGNORE = 0
  200. STRUCTURE_ONLY = 1
  201. STRUCTURE_AND_DATA = 2 # -> default behavior
  202. class CloningOperation:
  203. """ A database cloning operation between two Mysql servers
  204. """
  205. def __init__(self, name, dbname, from_server, to_server, is_default=True, ignore_tables=None, structure_only=None,
  206. filter_tables=None, ignore_views=None, compress=True):
  207. self.name = name
  208. self.dbname = dbname
  209. self.from_server = from_server
  210. self.to_server = to_server
  211. self.is_default = is_default
  212. self.compress = compress
  213. self.ignore_tables = [re.compile(r) for r in ignore_tables] if ignore_tables else []
  214. self.structure_only = [re.compile(r) for r in structure_only] if structure_only else []
  215. self.filter_tables = [re.compile(r) for r in filter_tables] if filter_tables else []
  216. self.ignore_views = [re.compile(r) for r in ignore_views] if ignore_views else []
  217. self.status = UNKNOWN
  218. def __repr__(self):
  219. return f"Cloning {self.dbname} from {self.from_server} to {self.to_server}"
  220. def _build_dump_command(self, dump_options=None, tables=None):
  221. """ Build a mysqldump command line and return it as a
  222. ready-to-consume list for Popen
  223. @see https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary
  224. """
  225. tables = tables or []
  226. dump_options = dump_options or []
  227. base_cmd = ["mysqldump",
  228. "--single-transaction",
  229. "-u", self.from_server.username,
  230. f"--password={self.from_server.password}",
  231. f"--max-allowed-packet={MAX_ALLOWED_PACKET}",
  232. "--skip-add-drop-table",
  233. "--skip-add-locks",
  234. "--skip-comments",
  235. ]
  236. if self.compress:
  237. base_cmd.append("--compress")
  238. if SHOW_PROGRESSION:
  239. base_cmd.append("--verbose")
  240. if self.from_server.ssh_tunnel:
  241. host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS
  242. base_cmd += ["--host", host,
  243. "--port", str(port)]
  244. return base_cmd + dump_options + [self.dbname] + tables
  245. def _build_restore_command(self):
  246. """ Build a mysql command line and return it as a
  247. ready-to-consume list for Popen
  248. @see https://dev.mysql.com/doc/refman/8.0/en/mysql-command-options.html#option_mysql_quick
  249. """
  250. init_command = f"set global max_allowed_packet={MAX_ALLOWED_PACKET};" \
  251. "set global wait_timeout=28800;" \
  252. "set global interactive_timeout=28800;"
  253. cmd = ["mysql",
  254. "-h", self.to_server.host,
  255. "-P", str(self.to_server.port),
  256. "-u", self.to_server.username,
  257. f"--password={self.to_server.password}",
  258. f"--init-command={init_command}",
  259. "--reconnect",
  260. "--quick",
  261. "--unbuffered",
  262. "--wait",
  263. "--verbose",
  264. "-D", self.dbname
  265. ]
  266. # if LOG_PIPES_OUTPUT:
  267. # cmd.append("--verbose")
  268. if self.compress:
  269. cmd.append("--compress")
  270. return cmd
  271. @staticmethod
  272. def _run_piped_processes(dump_cmd, restore_cmd):
  273. """ Run the dump and the restore commands by piping them
  274. The output of the mysqldump process is piped into the input of the mysql one
  275. """
  276. logger.debug(">>> Dump command: %s", " ".join(map(str, dump_cmd)))
  277. logger.debug(">>> Piped into: %s", " ".join(map(str, restore_cmd)))
  278. mysqldump_handler = MysqldumpHandler(logger.name)
  279. mysql_handler = MysqlHandler(logger.name)
  280. try:
  281. # noinspection PyTypeChecker
  282. with Popen(restore_cmd, stdin=PIPE, stdout=mysql_handler, stderr=mysql_handler) as mysql:
  283. # noinspection PyTypeChecker
  284. with Popen(dump_cmd, stdout=PIPE, stderr=mysqldump_handler) as mysqldump:
  285. mysql.stdin.write(mysqldump.stdout.read())
  286. if mysqldump.returncode:
  287. raise RuntimeError('mysqldump returned a non zero code')
  288. if mysql.returncode:
  289. raise RuntimeError('mysql returned a non zero code')
  290. except (OSError, RuntimeError, CalledProcessError) as e:
  291. logger.error("Execution failed: %s", e)
  292. raise RuntimeError(f"An error happened at runtime: {e}")
  293. finally:
  294. mysqldump_handler.close()
  295. mysql_handler.close()
  296. def run(self):
  297. """ Run the cloning op
  298. """
  299. logger.info(f"*** Cloning {self.dbname} ***")
  300. logger.info(f"> From {self.from_server}")
  301. logger.info(f"> To {self.to_server}")
  302. try:
  303. self.from_server.connect()
  304. self.from_server.set_active_db(self.dbname)
  305. logger.debug('Connected to %s', self.from_server)
  306. self.to_server.connect()
  307. logger.debug('Connected to %s', self.to_server)
  308. tables = {}
  309. for tname in self.from_server.list_tables():
  310. if any(rx.match(tname) for rx in self.ignore_tables):
  311. tables[tname] = IGNORE
  312. elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables):
  313. tables[tname] = IGNORE
  314. elif any(rx.match(tname) for rx in self.structure_only):
  315. tables[tname] = STRUCTURE_ONLY
  316. else:
  317. tables[tname] = STRUCTURE_AND_DATA
  318. restore_cmd = self._build_restore_command()
  319. # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ...
  320. dump_structure_for = [t for t, s in tables.items() if s != IGNORE]
  321. dump_structure_cmd = self._build_dump_command(["--no-data", "--routines"],
  322. dump_structure_for)
  323. # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ...
  324. dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA]
  325. dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"],
  326. dump_data_for)
  327. if tables and not dump_structure_for and not dump_data_for:
  328. logging.warning('No table will be cloned')
  329. # Recreate the target DB
  330. logger.info("(Re)create the database")
  331. self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;")
  332. self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;")
  333. self.to_server.set_active_db(self.dbname)
  334. # Run mysqldump
  335. try:
  336. if dump_structure_for:
  337. logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...")
  338. self._run_piped_processes(dump_structure_cmd, restore_cmd)
  339. if dump_data_for:
  340. logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...")
  341. self._run_piped_processes(dump_data_cmd, restore_cmd)
  342. logger.info(f"Cloning views...")
  343. for v in self.from_server.list_views(self.dbname):
  344. if any(rx.match(v) for rx in self.ignore_views):
  345. continue
  346. logger.debug('* cloning view %s', v)
  347. definition = self.from_server.get_view_definition(v, self.to_server.username)
  348. try:
  349. self.to_server.exec_query(definition)
  350. except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e:
  351. logger.error('Unable to create the internal view %s: %s', v, e)
  352. self.status = SUCCESS
  353. logger.info("> the database was successfully cloned")
  354. except RuntimeError:
  355. self.status = FAILURE
  356. logger.error("<!> An error happened while cloning the '%s' database", self.dbname)
  357. finally:
  358. self.from_server.close()
  359. self.to_server.close()
  360. def main(settings, arguments):
  361. prompt = not arguments["--yes"]
  362. logger.info("Start db cloning utility...")
  363. logger.debug(f"Settings: %s", str(settings).replace('\r', '').replace('\n', ''))
  364. logger.debug(f"Arguments: %s", str(arguments).replace('\r', '').replace('\n', ''))
  365. # Load the servers' configuration
  366. servers = {}
  367. for server_name, server_settings in settings['servers'].items():
  368. hostname = server_settings['host']
  369. match = re.search(r"^docker:(\w+)$", hostname)
  370. if match:
  371. logger.debug("resolve IP for docker %s", match.group(1))
  372. ip = resolve_docker_ip(match.group(1))
  373. logger.debug("substitute '%s' to '%s' as hostname", ip, hostname)
  374. hostname = ip
  375. if 'ssh' in server_settings:
  376. ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh'])
  377. else:
  378. ssh_tunnel = None
  379. server = MySqlServer(hostname,
  380. **server_settings['mysql'],
  381. description=server_settings['description'],
  382. ssh_tunnel=ssh_tunnel)
  383. servers[server_name] = server
  384. # Load the cloning ops' configuration
  385. ops = {}
  386. for name, args in settings['operations'].items():
  387. dbname = args['dbname']
  388. from_server = servers[args['from_server']]
  389. to_server = servers[args['to_server']]
  390. kwargs = {k: v for k, v in args.items() if k not in ('dbname', 'from_server', 'to_server')}
  391. op = CloningOperation(name, dbname, from_server, to_server, **kwargs)
  392. ops[name] = op
  393. # Operations to launch
  394. if arguments.get('<opname>', None):
  395. selected_ops = []
  396. for opname in arguments['<opname>']:
  397. try:
  398. selected_ops.append(ops[opname])
  399. except KeyError:
  400. logger.error('No operation found with name %s', opname)
  401. else:
  402. selected_ops = [op for op in ops.values() if op.is_default]
  403. if not selected_ops:
  404. raise RuntimeError('No operation to launch')
  405. # Ask for confirmation (except if '--yes' is in arguments)
  406. if prompt:
  407. logger.debug('Ask for confirmation...')
  408. msg = "The following operations will be launched:\n{}\n" \
  409. "WARNING: the existing local databases will be replaced" \
  410. "".format("\n".join(f"* {op}" for op in selected_ops))
  411. if not ask_confirmation(msg):
  412. logger.info("-- Operation cancelled by user --")
  413. return
  414. logger.debug('> User confirmed')
  415. # Run the cloning operations
  416. for op in selected_ops:
  417. op.run()
  418. failures = [op.name for op in selected_ops if op.status == FAILURE]
  419. if failures:
  420. logger.error("WARNING! the following operations failed: %s", ', '.join(failures))
  421. if __name__ == '__main__':
  422. # load settings from settings.yml file
  423. settings = load_settings()
  424. # parse CLI arguments
  425. arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
  426. with Lockfile(path=HERE / '.clonedb.lock',
  427. on_error=lambda: logger.critical("A cloning process is already running, please wait...")):
  428. main(settings, arguments)