clonedb.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. #!/usr/bin/python3
  2. """
  3. Script de clonage des bases de données MySql
  4. (requiert python 3.6+)
  5. > Configuration: settings.yml
  6. Usage:
  7. clonedb.py [-v] [-y] [<opname>...]
  8. clonedb.py (-h | --help)
  9. clonedb.py --version
  10. Options:
  11. -y, --yes Do not ask for confirmation
  12. -h --help Show this screen.
  13. --version Show version.
  14. @author: olivier.massot, 05-2020
  15. """
  16. import logging
  17. import re
  18. from subprocess import Popen, PIPE, CalledProcessError
  19. import sys
  20. import pymysql
  21. import yaml
  22. from docopt import docopt
  23. from path import Path
  24. from core import logging_
  25. from core.docker import resolve_docker_ip
  26. from core.locker import Lockfile
  27. from core.pipe_handler import PipeHandler
  28. from core.ssh import SshTunnel
  29. from core.prompt import ask_confirmation
  30. __VERSION__ = "0.2"
  31. HERE = Path(__file__).parent
  32. # Start logger
  33. LOG_DIR = HERE / 'log'
  34. LOG_DIR.mkdir_p()
  35. logger = logging.getLogger('clonedb')
  36. logging_.start("clonedb", filename=LOG_DIR / 'clonedb.log', replace=True)
  37. # FIX the default ascii encoding on some linux dockers...
  38. sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
  39. # Options
  40. SHOW_PROGRESSION = True
  41. LOG_PIPES_OUTPUT = True
  42. LOG_MYSQL_QUERIES = True
  43. MAX_ALLOWED_PACKET = 1073741824
  44. CHARSET_TO_ENCODING = {
  45. 'utf8': 'utf-8',
  46. 'utf8mb4': 'utf-8',
  47. 'latin1': 'latin'
  48. }
  49. # Utilities
  50. def load_settings():
  51. """ Load the settings from the 'settings.yml' file
  52. If there is no such file, the base settings.yml file is created
  53. """
  54. settings_file = HERE / 'settings.yml'
  55. if not settings_file.exists():
  56. Path(HERE / 'settings.yml.dist').copy(HERE / 'settings.yml')
  57. with open(settings_file, 'r') as f:
  58. return yaml.load(f, Loader=yaml.FullLoader)
  59. def _print(msg, end=False):
  60. msg = msg.ljust(80)
  61. print(f'\r{msg}', end='' if not end else '\n', flush=True)
  62. class MysqldumpHandler(PipeHandler):
  63. """ Handle and process the stdout / stderr output from a mysqldump process
  64. """
  65. _rx_prog = re.compile(r'Retrieving table structure for table (\w+)')
  66. _log_all = LOG_PIPES_OUTPUT
  67. def __init__(self, logger_name, level, total_prog):
  68. super().__init__(logger_name, level)
  69. self.total_prog = total_prog
  70. self.prog = 0
  71. self._last_logged = ""
  72. def process(self, line):
  73. """ Process the last line that was read
  74. """
  75. line = line.strip('\n')
  76. if SHOW_PROGRESSION:
  77. match = self._rx_prog.search(line)
  78. if match:
  79. self.log_new_table(match.group(1), "dumping")
  80. if self._log_all:
  81. logger.debug(line)
  82. def log_new_table(self, tname, action_name=""):
  83. if tname == self._last_logged:
  84. return
  85. self.prog += 1
  86. logger.debug('... %s %s', action_name, tname)
  87. _print(f'{action_name} `{tname}` [{self.prog} / {self.total_prog}]')
  88. self._last_logged = tname
  89. def log_end(self):
  90. _print(f'\r-- done --', end=True)
  91. def close(self):
  92. """ Close the write end of the pipe.
  93. """
  94. super().close()
  95. class MysqlHandler(MysqldumpHandler):
  96. """ Handle and process the stdout / stderr output from a mysql process
  97. """
  98. _rx_prog = re.compile(r'^((?:CREATE TABLE )|(?:INSERT INTO ))`(\w+)`')
  99. _log_all = LOG_PIPES_OUTPUT
  100. _action_name = "restoring"
  101. def process(self, line):
  102. """ Process the last line that was read
  103. """
  104. line = line.strip('\n')
  105. if SHOW_PROGRESSION:
  106. match = self._rx_prog.search(line)
  107. if match:
  108. action_name = "restoring {}".format('structure of'
  109. if 'CREATE' in match.group(1)
  110. else 'data of')
  111. self.log_new_table(match.group(2), action_name)
  112. if self._log_all:
  113. logger.debug(line)
  114. class MySqlServer:
  115. """ A server hosting a Mysql instance
  116. """
  117. def __init__(self, host, port, username, password, description="", ssh_tunnel=None):
  118. self.host = host
  119. self.port = port
  120. self.username = username
  121. self.password = password
  122. self.description = description[:30]
  123. self.ssh_tunnel = ssh_tunnel
  124. self.cnn = None
  125. self.active_db = ""
  126. def __repr__(self):
  127. s = f"{self.host}:{self.port} as {self.username}"
  128. if self.description:
  129. s = f"{self.description} ({s})"
  130. return s
  131. def connect(self, autocommit=True):
  132. """ Establish the connection to the Mysql server
  133. @see https://pymysql.readthedocs.io/en/latest/modules/connections.html
  134. """
  135. if self.ssh_tunnel:
  136. self.ssh_tunnel.start()
  137. host, port = self.ssh_tunnel.LOCAL_ADRESS
  138. else:
  139. host, port = self.host, self.port
  140. self.cnn = pymysql.connect(host=host,
  141. port=port,
  142. user=self.username,
  143. password=self.password,
  144. autocommit=autocommit,
  145. max_allowed_packet=MAX_ALLOWED_PACKET,
  146. )
  147. if not self.cnn.open:
  148. raise RuntimeError(f'Unable to connect to {self}')
  149. return self.cnn
  150. def set_active_db(self, dbname):
  151. """ set the active database
  152. """
  153. self.cnn.select_db(dbname)
  154. self.active_db = dbname
  155. def close(self):
  156. """ Close the connection to the database
  157. and the ssh tunnel if one is opened
  158. """
  159. if self.cnn:
  160. self.cnn.close()
  161. if self.ssh_tunnel:
  162. self.ssh_tunnel.stop()
  163. logger.debug(f'{self} - connection closed')
  164. def exec_query(self, sql):
  165. """ Execute the sql code and return the resulting cursor
  166. @see https://pymysql.readthedocs.io/en/latest/modules/cursors.html
  167. """
  168. self.cnn.ping(reconnect=True)
  169. cursor = self.cnn.cursor()
  170. if LOG_MYSQL_QUERIES:
  171. logger.debug(sql)
  172. cursor.execute(sql)
  173. return cursor
  174. def db_exists(self, dbname):
  175. """ Return True if the database exists
  176. """
  177. cursor = self.exec_query(f"""SELECT SCHEMA_NAME
  178. FROM INFORMATION_SCHEMA.SCHEMATA
  179. WHERE SCHEMA_NAME = '{dbname}'""")
  180. row = cursor.fetchone()
  181. return row is not None
  182. def get_db_charset(self, dbname):
  183. """ return the charset (encoding) of the mysql database """
  184. cursor = self.exec_query(f"""SELECT default_character_set_name
  185. FROM information_schema.SCHEMATA S
  186. WHERE schema_name = '{dbname}';""")
  187. return cursor.fetchone()[0]
  188. def list_tables(self, dbname=""):
  189. """ Return a list of tables (but not views!)
  190. for either the currently selected database,
  191. or the one given as a parameter"""
  192. cursor = self.exec_query(
  193. "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else ""))
  194. return (row[0] for row in cursor.fetchall())
  195. def list_views(self, dbname=""):
  196. """ Return a list of views
  197. for either the currently selected database,
  198. or the one given as a parameter"""
  199. cursor = self.exec_query(
  200. "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else ""))
  201. return (row[0] for row in cursor.fetchall())
  202. def get_view_definition(self, view_name, set_definer=""):
  203. """ Return the SQL create statement for the view
  204. If 'set_definer' is not empty, the username in the 'SET DEFINER' part
  205. of the create statement is replaced by the one given
  206. """
  207. cursor = self.exec_query(f"show create view {view_name}")
  208. definition = cursor.fetchone()[1]
  209. if set_definer:
  210. # force a new definer
  211. definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`',
  212. f"DEFINER=`{set_definer}`@`\1`",
  213. definition)
  214. return definition
  215. class MysqlUser:
  216. def __init__(self, username, pwd, host='localhost'):
  217. self.username = username
  218. self.pwd = pwd
  219. self.host = host
  220. # Operation status
  221. UNKNOWN = 0
  222. SUCCESS = 1
  223. FAILURE = 2
  224. # Behaviors for the tables cloning
  225. IGNORE = 0
  226. STRUCTURE_ONLY = 1
  227. STRUCTURE_AND_DATA = 2 # -> default behavior
  228. class CloningOperation:
  229. """ A database cloning operation between two Mysql servers
  230. """
  231. def __init__(self, name, dbname, from_server, to_server, grant=None,
  232. is_default=True, ignore_tables=None, structure_only=None,
  233. filter_tables=None, ignore_views=None, compress=True):
  234. self.name = name
  235. self.dbname = dbname
  236. self.from_server = from_server
  237. self.to_server = to_server
  238. self.grant = grant if grant is not None else []
  239. self.is_default = is_default
  240. self.compress = compress
  241. self.ignore_tables = [re.compile(f"^{r}$") for r in ignore_tables] if ignore_tables else []
  242. self.structure_only = [re.compile(f"^{r}$") for r in structure_only] if structure_only else []
  243. self.filter_tables = [re.compile(f"^{r}$") for r in filter_tables] if filter_tables else []
  244. self.ignore_views = [re.compile(f"^{r}$") for r in ignore_views] if ignore_views else []
  245. self.status = UNKNOWN
  246. def __repr__(self):
  247. return f"Cloning {self.dbname} from {self.from_server} to {self.to_server}"
  248. def _build_dump_command(self, dump_options=None, tables=None):
  249. """ Build a mysqldump command line and return it as a
  250. ready-to-consume list for Popen
  251. @see https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary
  252. """
  253. tables = tables or []
  254. dump_options = dump_options or []
  255. base_cmd = ["mysqldump",
  256. "--single-transaction",
  257. "-u", self.from_server.username,
  258. f"--password={self.from_server.password}",
  259. f"--max-allowed-packet={MAX_ALLOWED_PACKET}",
  260. "--skip-add-drop-table",
  261. "--skip-add-locks",
  262. "--skip-comments",
  263. "--column-statistics=0"
  264. ]
  265. if self.compress:
  266. base_cmd.append("--compress")
  267. if SHOW_PROGRESSION:
  268. base_cmd.append("--verbose")
  269. if self.from_server.ssh_tunnel:
  270. host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS
  271. base_cmd += ["--host", host,
  272. "--port", str(port)]
  273. return base_cmd + dump_options + [self.dbname] + tables
  274. def _build_restore_command(self):
  275. """ Build a mysql command line and return it as a
  276. ready-to-consume list for Popen
  277. @see https://dev.mysql.com/doc/refman/8.0/en/mysql-command-options.html#option_mysql_quick
  278. """
  279. init_command = f"set global max_allowed_packet={MAX_ALLOWED_PACKET};" \
  280. "set global wait_timeout=28800;" \
  281. "set global interactive_timeout=28800;"
  282. cmd = ["mysql",
  283. "-h", self.to_server.host,
  284. "-P", str(self.to_server.port),
  285. "-u", self.to_server.username,
  286. f"--password={self.to_server.password}",
  287. f"--init-command={init_command}",
  288. "--reconnect",
  289. "--quick",
  290. "--unbuffered",
  291. "--wait",
  292. "--verbose",
  293. "-D", self.dbname
  294. ]
  295. # if LOG_PIPES_OUTPUT:
  296. # cmd.append("--verbose")
  297. if self.compress:
  298. cmd.append("--compress")
  299. return cmd
  300. @staticmethod
  301. def _clean_sql(bin_cmd, encoding):
  302. """ clean some old sql declaration from mysql 5 in order to preserve
  303. a compatibility between servers"""
  304. cmd = bin_cmd.decode('latin')
  305. # To ensure compatibility between mysql5 and 8+
  306. cmd = re.sub(",?NO_AUTO_CREATE_USER", "", cmd)
  307. return cmd.encode('latin')
  308. @staticmethod
  309. def _run_piped_processes(
  310. dump_cmd,
  311. restore_cmd,
  312. tbl_count,
  313. encoding):
  314. """ Run the dump and the restore commands by piping them
  315. The output of the mysqldump process is piped into the input of the mysql one
  316. """
  317. logger.debug(">>> Dump command: %s", " ".join(map(str, dump_cmd)))
  318. logger.debug(">>> Piped into: %s", " ".join(map(str, restore_cmd)))
  319. mysqldump_handler = MysqldumpHandler(logger.name, logging.INFO, tbl_count)
  320. mysql_handler = MysqlHandler(logger.name, logging.INFO, tbl_count)
  321. try:
  322. # noinspection PyTypeChecker
  323. with Popen(restore_cmd, stdin=PIPE, stdout=mysql_handler, stderr=mysql_handler) as mysql:
  324. # noinspection PyTypeChecker
  325. with Popen(dump_cmd, stdout=PIPE, stderr=mysqldump_handler) as mysqldump:
  326. cmd = mysqldump.stdout.read()
  327. cmd = CloningOperation._clean_sql(cmd, encoding)
  328. mysql.stdin.write(cmd)
  329. if mysqldump.returncode:
  330. raise RuntimeError('mysqldump returned a non zero code')
  331. if mysql.returncode:
  332. raise RuntimeError('mysql returned a non zero code')
  333. mysql_handler.log_end()
  334. except (OSError, RuntimeError, CalledProcessError) as e:
  335. logger.error("Execution failed: %s", e)
  336. raise RuntimeError(f"An error happened at runtime: {e}")
  337. finally:
  338. mysqldump_handler.close()
  339. mysql_handler.close()
  340. def run(self):
  341. """ Run the cloning op
  342. """
  343. logger.info(f"*** Cloning {self.dbname} ***")
  344. logger.info(f"> From {self.from_server}")
  345. logger.info(f"> To {self.to_server}")
  346. try:
  347. self.from_server.connect()
  348. self.from_server.set_active_db(self.dbname)
  349. logger.debug('Connected to %s', self.from_server)
  350. self.to_server.connect()
  351. logger.debug('Connected to %s', self.to_server)
  352. # Create admin users if not exist
  353. for user in self.grant:
  354. exists = self.to_server.exec_query(
  355. f"SELECT count(*) FROM mysql.user WHERE User = '{user.username}' and Host='{user.host}';"
  356. ).fetchone()[0] > 0
  357. if not exists:
  358. logger.info(f'Create user %s@%s on %s', user.username, user.host, self.to_server)
  359. self.to_server.exec_query(
  360. f"CREATE USER '{user.username}'@'{user.host}' IDENTIFIED BY '{user.pwd}';")
  361. # List tables
  362. tables = {}
  363. for tname in self.from_server.list_tables():
  364. if any(rx.match(tname) for rx in self.ignore_tables):
  365. tables[tname] = IGNORE
  366. elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables):
  367. tables[tname] = IGNORE
  368. elif any(rx.match(tname) for rx in self.structure_only):
  369. tables[tname] = STRUCTURE_ONLY
  370. else:
  371. tables[tname] = STRUCTURE_AND_DATA
  372. restore_cmd = self._build_restore_command()
  373. # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ...
  374. dump_structure_for = [t for t, s in tables.items() if s != IGNORE]
  375. dump_structure_cmd = self._build_dump_command(["--no-data", "--routines"],
  376. dump_structure_for)
  377. # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ...
  378. dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA]
  379. dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"],
  380. dump_data_for)
  381. if tables and not dump_structure_for and not dump_data_for:
  382. logging.warning('No table will be cloned')
  383. # Recreate the target DB
  384. logger.info("(Re)create the database")
  385. self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;")
  386. self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;")
  387. self.to_server.set_active_db(self.dbname)
  388. # Following is to avoid conflict between mysql 5 and mysql 8+
  389. # (@see https://stackoverflow.com/questions/50336378/variable-sql-mode-cant-be-set-to-the-value-of-no-auto-create-user)
  390. self.to_server.exec_query(f"SET GLOBAL log_bin_trust_function_creators = 1;")
  391. # Grant admin users if any
  392. for user in self.grant:
  393. self.to_server.exec_query(
  394. f"GRANT ALL ON {self.dbname}.* TO '{user.username}'@'{user.host}';"
  395. )
  396. # charsets
  397. charset = self.from_server.get_db_charset(self.dbname)
  398. encoding = CHARSET_TO_ENCODING[charset]
  399. # Run mysqldump
  400. try:
  401. if dump_structure_for:
  402. logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...")
  403. self._run_piped_processes(
  404. dump_structure_cmd,
  405. restore_cmd,
  406. len(dump_structure_for),
  407. encoding
  408. )
  409. if dump_data_for:
  410. logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...")
  411. self._run_piped_processes(
  412. dump_data_cmd,
  413. restore_cmd,
  414. len(dump_data_for),
  415. encoding
  416. )
  417. logger.info(f"Cloning views...")
  418. self.from_server.set_active_db(self.dbname)
  419. self.to_server.set_active_db(self.dbname)
  420. for v in self.from_server.list_views(self.dbname):
  421. if any(rx.match(v) for rx in self.ignore_views):
  422. continue
  423. logger.debug('* cloning view %s', v)
  424. definition = self.from_server.get_view_definition(v, self.to_server.username)
  425. try:
  426. self.to_server.exec_query(definition)
  427. except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e:
  428. logger.error('Unable to create the internal view %s: %s', v, e)
  429. self.status = SUCCESS
  430. logger.info("> the database was successfully cloned")
  431. except RuntimeError:
  432. self.status = FAILURE
  433. logger.error("<!> An error happened while cloning the '%s' database", self.dbname)
  434. finally:
  435. self.from_server.close()
  436. self.to_server.close()
  437. def main(settings, arguments):
  438. prompt = not arguments["--yes"]
  439. logger.info("Start db cloning utility...")
  440. logger.debug(f"Settings: %s", str(settings).replace('\r', '').replace('\n', ''))
  441. logger.debug(f"Arguments: %s", str(arguments).replace('\r', '').replace('\n', ''))
  442. # Load the servers' configuration
  443. servers = {}
  444. if 'servers' not in settings:
  445. raise RuntimeError(f'Missing section in settings.yml: {servers}')
  446. for server_name, server_settings in settings['servers'].items():
  447. hostname = server_settings['host']
  448. match = re.search(r"^docker:(\w+)$", hostname)
  449. if match:
  450. logger.debug("resolve IP for docker %s", match.group(1))
  451. ip = resolve_docker_ip(match.group(1))
  452. logger.debug("substitute '%s' to '%s' as hostname", ip, hostname)
  453. hostname = ip
  454. if 'ssh' in server_settings:
  455. ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh'])
  456. else:
  457. ssh_tunnel = None
  458. server = MySqlServer(hostname,
  459. **server_settings['mysql'],
  460. description=server_settings['description'],
  461. ssh_tunnel=ssh_tunnel)
  462. servers[server_name] = server
  463. # Load the users' configuration
  464. users = {}
  465. for username, args in settings.get('users', {}).items():
  466. host = args.get('host', 'localhost')
  467. pwd = args.get('pwd', '')
  468. users[username] = MysqlUser(username, pwd, host)
  469. # Load the cloning ops' configuration
  470. ops = {}
  471. if 'operations' not in settings:
  472. raise RuntimeError(f'Missing section in settings.yml: {servers}')
  473. for name, args in settings['operations'].items():
  474. dbname = args['dbname']
  475. from_server = servers[args['from_server']]
  476. to_server = servers[args['to_server']]
  477. grant = args.get('grant', [])
  478. admins = [user for username, user in users.items() if username in grant]
  479. kwargs = {k: v for k, v in args.items() \
  480. if k not in ('dbname', 'from_server', 'to_server', 'grant')}
  481. op = CloningOperation(name, dbname, from_server, to_server, admins, **kwargs)
  482. ops[name] = op
  483. # Operations to launch
  484. if arguments.get('<opname>', None):
  485. selected_ops = []
  486. for opname in arguments['<opname>']:
  487. try:
  488. selected_ops.append(ops[opname])
  489. except KeyError:
  490. logger.error('No operation found with name %s', opname)
  491. else:
  492. selected_ops = [op for op in ops.values() if op.is_default]
  493. if not selected_ops:
  494. logger.error('No operations to launch')
  495. return
  496. # Ask for confirmation (except if '--yes' is in arguments)
  497. if prompt:
  498. logger.debug('Ask for confirmation...')
  499. msg = "The following operations will be launched:\n{}\n" \
  500. "WARNING: the existing local databases will be replaced" \
  501. "".format("\n".join(f"* {op}" for op in selected_ops))
  502. if not ask_confirmation(msg):
  503. logger.info("-- Operation cancelled by user --")
  504. return
  505. logger.debug('> User confirmed')
  506. # Create the user if they do not exist
  507. # CREATE USER IF NOT EXISTS 'user'@'localhost' IDENTIFIED BY 'password';
  508. # GRANT ALL ON opentalent TO 'opentalent'@'localhost';
  509. # Run the cloning operations
  510. for op in selected_ops:
  511. op.run()
  512. failures = [op.name for op in selected_ops if op.status == FAILURE]
  513. if failures:
  514. logger.error("WARNING! the following operations failed: %s", ', '.join(failures))
  515. if __name__ == '__main__':
  516. # load settings from settings.yml file
  517. settings = load_settings()
  518. # parse CLI arguments
  519. arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
  520. with Lockfile(path=HERE / '.clonedb.lock',
  521. on_error=lambda: logger.critical("A cloning process is already running, please wait...")):
  522. main(settings, arguments)