""" Script de clonage des bases de données MySql (requiert python 3.6+) > Configuration: settings.yml Usage: clonedb.py [-v] [-y] [...] clonedb.py (-h | --help) clonedb.py --version Options: -y, --yes Do not ask for confirmation -h --help Show this screen. --version Show version. @author: olivier.massot, 05-2020 """ import logging import re from subprocess import Popen, PIPE, CalledProcessError import sys import pymysql import yaml from docopt import docopt from path import Path from core import logging_ from core.docker import resolve_docker_ip from core.locker import Lockfile from core.pipe_handler import PipeHandler from core.ssh import SshTunnel from core.prompt import ask_confirmation __VERSION__ = "0.2" HERE = Path(__file__).parent # Start logger LOG_DIR = HERE / 'log' LOG_DIR.mkdir_p() logger = logging.getLogger('clonedb') logging_.start("clonedb", filename=LOG_DIR / 'clonedb.log', replace=True) # FIX the default ascii encoding on some linux dockers... sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1) # Options SHOW_PROGRESSION = True LOG_PIPES_OUTPUT = False LOG_MYSQL_QUERIES = False MY_CNF_FILE = Path(HERE / "my.cnf") MAX_ALLOWED_PACKET = 1073741824 # Utilities def load_settings(): """ Load the settings from the 'settings.yml' file If there is no such file, the base settings.yml file is created """ settings_file = HERE / 'settings.yml' if not settings_file.exists(): Path(HERE / 'settings.yml.dist').copy(HERE / 'settings.yml') with open(settings_file, 'r') as f: return yaml.load(f, Loader=yaml.FullLoader) class MysqldumpHandler(PipeHandler): """ Handle and process the stdout / stderr output from a mysqldump process """ _rx_prog = re.compile(r'Retrieving table structure for table (\w+)') _log_all = LOG_PIPES_OUTPUT _action_name = "dumping" def process(self, line): """ Process the last line that was read """ line = line.strip('\n') if SHOW_PROGRESSION: match = self._rx_prog.search(line) if match: logger.debug('... %s %s', self._action_name, match.group(1)) print('.', end="", flush=True) if self._log_all: logger.debug(line) def close(self): """ Close the write end of the pipe. """ print('', flush=True) super().close() class MysqlHandler(MysqldumpHandler): """ Handle and process the stdout / stderr output from a mysql process """ _rx_prog = re.compile(r'^((?:CREATE TABLE )|(?:INSERT INTO ))`(\w+)`') _log_all = LOG_PIPES_OUTPUT _action_name = "restoring" _last_logged = "" def process(self, line): """ Process the last line that was read """ line = line.strip('\n') if SHOW_PROGRESSION: match = self._rx_prog.search(line) if match: tname = match.group(2) if tname != self._last_logged: logger.debug('... %s %s %s', self._action_name, 'structure of' if 'CREATE' in match.group(1) else 'data of', tname) print('.', end="", flush=True) self._last_logged = tname if self._log_all: logger.debug(line) class MySqlServer: """ A server hosting a Mysql instance """ def __init__(self, host, port, username, password, description="", ssh_tunnel=None): self.host = host self.port = port self.username = username self.password = password self.description = description[:30] self.ssh_tunnel = ssh_tunnel self.cnn = None self.active_db = "" def __repr__(self): s = f"{self.host}:{self.port} as {self.username}" if self.description: s = f"{self.description} ({s})" return s def connect(self, autocommit=True): """ Establish the connection to the Mysql server @see https://pymysql.readthedocs.io/en/latest/modules/connections.html """ if self.ssh_tunnel: self.ssh_tunnel.start() host, port = self.ssh_tunnel.LOCAL_ADRESS else: host, port = self.host, self.port self.cnn = pymysql.connect(host=host, port=port, user=self.username, password=self.password, autocommit=autocommit, max_allowed_packet="32M", ) if not self.cnn.open: raise RuntimeError(f'Unable to connect to {self}') return self.cnn def set_active_db(self, dbname): """ set the active database """ self.cnn.select_db(dbname) self.active_db = dbname def close(self): """ Close the connection to the database and the ssh tunnel if one is opened """ if self.cnn: self.cnn.close() if self.ssh_tunnel: self.ssh_tunnel.stop() logger.debug(f'{self} - connection closed') def exec_query(self, sql): """ Execute the sql code and return the resulting cursor @see https://pymysql.readthedocs.io/en/latest/modules/cursors.html """ self.cnn.ping(reconnect=True) cursor = self.cnn.cursor() if LOG_MYSQL_QUERIES: logger.debug(sql) cursor.execute(sql) return cursor def db_exists(self, dbname): """ Return True if the database exists """ cursor = self.exec_query(f"""SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '{dbname}'""") row = cursor.fetchone() return row is not None def list_tables(self, dbname=""): """ Return a list of tables (but not views!) for either the currently selected database, or the one given as a parameter""" cursor = self.exec_query( "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else "")) return (row[0] for row in cursor.fetchall()) def list_views(self, dbname=""): """ Return a list of views for either the currently selected database, or the one given as a parameter""" cursor = self.exec_query( "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else "")) return (row[0] for row in cursor.fetchall()) def get_view_definition(self, view_name, set_definer=""): """ Return the SQL create statement for the view If 'set_definer' is not empty, the username in the 'SET DEFINER' part of the create statement is replace by the one given """ cursor = self.exec_query(f"show create view {view_name}") definition = cursor.fetchone()[1] if set_definer: # force a new definer definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`', f"DEFINER=`{set_definer}`@`\1`", definition) return definition # Operation status UNKNOWN = 0 SUCCESS = 1 FAILURE = 2 # Behaviors for the tables cloning IGNORE = 0 STRUCTURE_ONLY = 1 STRUCTURE_AND_DATA = 2 # -> default behavior class CloningOperation: """ A database cloning operation between two Mysql servers """ def __init__(self, name, dbname, from_server, to_server, is_default=True, ignore_tables=None, structure_only=None, filter_tables=None, ignore_views=None, compress=True): self.name = name self.dbname = dbname self.from_server = from_server self.to_server = to_server self.is_default = is_default self.compress = compress self.ignore_tables = [re.compile(r) for r in ignore_tables] if ignore_tables else [] self.structure_only = [re.compile(r) for r in structure_only] if structure_only else [] self.filter_tables = [re.compile(r) for r in filter_tables] if filter_tables else [] self.ignore_views = [re.compile(r) for r in ignore_views] if ignore_views else [] self.status = UNKNOWN def __repr__(self): return f"Cloning {self.dbname} from {self.from_server} to {self.to_server}" def _build_dump_command(self, dump_options=None, tables=None): """ Build a mysqldump command line and return it as a ready-to-consume list for Popen @see https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary """ tables = tables or [] dump_options = dump_options or [] base_cmd = ["mysqldump", "--single-transaction", "-u", self.from_server.username, f"--password={self.from_server.password}", f"--max-allowed-packet={MAX_ALLOWED_PACKET}", "--skip-add-drop-table", "--skip-add-locks", "--skip-comments", ] if self.compress: base_cmd.append("--compress") if SHOW_PROGRESSION: base_cmd.append("--verbose") if self.from_server.ssh_tunnel: host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS base_cmd += ["--host", host, "--port", str(port)] return base_cmd + dump_options + [self.dbname] + tables def _build_restore_command(self): """ Build a mysql command line and return it as a ready-to-consume list for Popen @see https://dev.mysql.com/doc/refman/8.0/en/mysql-command-options.html#option_mysql_quick """ init_command = f"set global max_allowed_packet={MAX_ALLOWED_PACKET};" \ "set global wait_timeout=28800;" \ "set global interactive_timeout=28800;" cmd = ["mysql", "-h", self.to_server.host, "-P", str(self.to_server.port), "-u", self.to_server.username, f"--password={self.to_server.password}", f"--init-command={init_command}", "--reconnect", "--quick", "--unbuffered", "--wait", "--verbose", "-D", self.dbname ] # if LOG_PIPES_OUTPUT: # cmd.append("--verbose") if self.compress: cmd.append("--compress") return cmd @staticmethod def _run_piped_processes(dump_cmd, restore_cmd): """ Run the dump and the restore commands by piping them The output of the mysqldump process is piped into the input of the mysql one """ logger.debug(">>> Dump command: %s", " ".join(map(str, dump_cmd))) logger.debug(">>> Piped into: %s", " ".join(map(str, restore_cmd))) mysqldump_handler = MysqldumpHandler(logger.name) mysql_handler = MysqlHandler(logger.name) try: # noinspection PyTypeChecker with Popen(restore_cmd, stdin=PIPE, stdout=mysql_handler, stderr=mysql_handler) as mysql: # noinspection PyTypeChecker with Popen(dump_cmd, stdout=PIPE, stderr=mysqldump_handler) as mysqldump: mysql.stdin.write(mysqldump.stdout.read()) if mysqldump.returncode: raise RuntimeError('mysqldump returned a non zero code') if mysql.returncode: raise RuntimeError('mysql returned a non zero code') except (OSError, RuntimeError, CalledProcessError) as e: logger.error("Execution failed: %s", e) raise RuntimeError(f"An error happened at runtime: {e}") finally: mysqldump_handler.close() mysql_handler.close() def run(self): """ Run the cloning op """ logger.info(f"*** Cloning {self.dbname} ***") logger.info(f"> From {self.from_server}") logger.info(f"> To {self.to_server}") try: self.from_server.connect() self.from_server.set_active_db(self.dbname) logger.debug('Connected to %s', self.from_server) self.to_server.connect() logger.debug('Connected to %s', self.to_server) tables = {} for tname in self.from_server.list_tables(): if any(rx.match(tname) for rx in self.ignore_tables): tables[tname] = IGNORE elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables): tables[tname] = IGNORE elif any(rx.match(tname) for rx in self.structure_only): tables[tname] = STRUCTURE_ONLY else: tables[tname] = STRUCTURE_AND_DATA restore_cmd = self._build_restore_command() # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ... dump_structure_for = [t for t, s in tables.items() if s != IGNORE] dump_structure_cmd = self._build_dump_command(["--no-data", "--routines"], dump_structure_for) # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ... dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA] dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"], dump_data_for) if tables and not dump_structure_for and not dump_data_for: logging.warning('No table will be cloned') # Recreate the target DB logger.info("(Re)create the database") self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;") self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;") self.to_server.set_active_db(self.dbname) # Run mysqldump try: if dump_structure_for: logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...") self._run_piped_processes(dump_structure_cmd, restore_cmd) if dump_data_for: logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...") self._run_piped_processes(dump_data_cmd, restore_cmd) logger.info(f"Cloning views...") for v in self.from_server.list_views(self.dbname): if any(rx.match(v) for rx in self.ignore_views): continue logger.debug('* cloning view %s', v) definition = self.from_server.get_view_definition(v, self.to_server.username) try: self.to_server.exec_query(definition) except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e: logger.error('Unable to create the internal view %s: %s', v, e) self.status = SUCCESS logger.info("> the database was successfully cloned") except RuntimeError: self.status = FAILURE logger.error(" An error happened while cloning the '%s' database", self.dbname) finally: self.from_server.close() self.to_server.close() def main(settings, arguments): prompt = not arguments["--yes"] logger.info("Start db cloning utility...") logger.debug(f"Settings: %s", str(settings).replace('\r', '').replace('\n', '')) logger.debug(f"Arguments: %s", str(arguments).replace('\r', '').replace('\n', '')) # Load the servers' configuration servers = {} for server_name, server_settings in settings['servers'].items(): hostname = server_settings['host'] match = re.search(r"^docker:(\w+)$", hostname) if match: logger.debug("resolve IP for docker %s", match.group(1)) ip = resolve_docker_ip(match.group(1)) logger.debug("substitute '%s' to '%s' as hostname", ip, hostname) hostname = ip if 'ssh' in server_settings: ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh']) else: ssh_tunnel = None server = MySqlServer(hostname, **server_settings['mysql'], description=server_settings['description'], ssh_tunnel=ssh_tunnel) servers[server_name] = server # Load the cloning ops' configuration ops = {} for name, args in settings['operations'].items(): dbname = args['dbname'] from_server = servers[args['from_server']] to_server = servers[args['to_server']] kwargs = {k: v for k, v in args.items() if k not in ('dbname', 'from_server', 'to_server')} op = CloningOperation(name, dbname, from_server, to_server, **kwargs) ops[name] = op # Operations to launch if arguments.get('', None): selected_ops = [] for opname in arguments['']: try: selected_ops.append(ops[opname]) except KeyError: logger.error('No operation found with name %s', opname) else: selected_ops = [op for op in ops.values() if op.is_default] if not selected_ops: raise RuntimeError('No operation to launch') # Ask for confirmation (except if '--yes' is in arguments) if prompt: logger.debug('Ask for confirmation...') msg = "The following operations will be launched:\n{}\n" \ "WARNING: the existing local databases will be replaced" \ "".format("\n".join(f"* {op}" for op in selected_ops)) if not ask_confirmation(msg): logger.info("-- Operation cancelled by user --") return logger.debug('> User confirmed') # Run the cloning operations for op in selected_ops: op.run() failures = [op.name for op in selected_ops if op.status == FAILURE] if failures: logger.error("WARNING! the following operations failed: %s", ', '.join(failures)) if __name__ == '__main__': # load settings from settings.yml file settings = load_settings() # parse CLI arguments arguments = docopt(__doc__, help=__doc__, version=__VERSION__) with Lockfile(path=HERE / '.clonedb.lock', on_error=lambda: logger.critical("A cloning process is already running, please wait...")): main(settings, arguments)