#!/usr/bin/python3 """ Script de clonage des bases de données MariaDb 10.* (requiert python 3.6+) > Configuration: settings.yml Usage: clonedb.py [-v] [-y] [...] clonedb.py (-h | --help) clonedb.py --version Options: -y, --yes Do not ask for confirmation -h --help Show this screen. --version Show version. @author: olivier.massot, 05-2020 """ import logging import re import subprocess from subprocess import Popen, PIPE, CalledProcessError import sys import pymysql import yaml from docopt import docopt from path import Path from core import logging_ from core.docker import resolve_docker_ip from core.locker import Lockfile from core.pipe_handler import PipeHandler from core.ssh import SshTunnel from core.prompt import ask_confirmation __VERSION__ = "0.2" HERE = Path(__file__).parent # Start logger LOG_DIR = HERE / 'log' LOG_DIR.mkdir_p() logger = logging.getLogger('clonedb') logging_.start("clonedb", filename=LOG_DIR / 'clonedb.log', replace=True) # FIX the default ascii encoding on some linux dockers... sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1) # Options SHOW_PROGRESSION = True LOG_PIPES_OUTPUT = True LOG_MYSQL_QUERIES = True MAX_ALLOWED_PACKET = 1073741824 # Utilities def load_settings(): """ Load the settings from the 'settings.yml' file If there is no such file, the base settings.yml file is created """ settings_file = HERE / 'settings.yml' if not settings_file.exists(): Path(HERE / 'settings.yml.dist').copy(HERE / 'settings.yml') with open(settings_file, 'r') as f: return yaml.load(f, Loader=yaml.FullLoader) def _print(msg, end=False): msg = msg.ljust(80) print(f'\r{msg}', end='' if not end else '\n', flush=True) def fetch_mysqldump_version(): p = subprocess.run('mysqldump --version', shell=True, capture_output=True) out, err = p.stdout.decode('utf-8'), p.stderr.decode('utf-8') if err: raise RuntimeError('mysqldump can not be found, is it installed on this machine?') fork = "mariadb" if "MariaDB" in out else "mysql" match = re.search(r"Ver ((?:\d+\.?)+)", out) version = match.group(1) if match else "0" return fork, version class MysqldumpHandler(PipeHandler): """ Handle and process the stdout / stderr output from a mysqldump process """ _rx_prog = re.compile(r'Retrieving table structure for table (\w+)') _log_all = LOG_PIPES_OUTPUT def __init__(self, logger_name, level, total_prog): super().__init__(logger_name, level) self.total_prog = total_prog self.prog = 0 self._last_logged = "" def process(self, line): """ Process the last line that was read """ line = line.strip('\n') if SHOW_PROGRESSION: match = self._rx_prog.search(line) if match: self.log_new_table(match.group(1), "dumping") if self._log_all: logger.debug(line) def log_new_table(self, tname, action_name=""): if tname == self._last_logged: return self.prog += 1 logger.debug('... %s %s', action_name, tname) _print(f'{action_name} `{tname}` [{self.prog} / {self.total_prog}]') self._last_logged = tname def log_end(self): _print(f'\r-- done --', end=True) def close(self): """ Close the write end of the pipe. """ super().close() class MysqlHandler(MysqldumpHandler): """ Handle and process the stdout / stderr output from a mysql process """ _rx_prog = re.compile(r'^((?:CREATE TABLE )|(?:INSERT INTO ))`(\w+)`') _log_all = LOG_PIPES_OUTPUT _action_name = "restoring" def process(self, line): """ Process the last line that was read """ line = line.strip('\n') if SHOW_PROGRESSION: match = self._rx_prog.search(line) if match: action_name = "restoring {}".format('structure of' if 'CREATE' in match.group(1) else 'data of') self.log_new_table(match.group(2), action_name) if self._log_all: logger.debug(line) class MySqlServer: """ A server hosting a Mysql instance """ def __init__(self, host, port, username, password, description="", ssh_tunnel=None): self.host = host self.port = port self.username = username self.password = password self.description = description[:30] self.ssh_tunnel = ssh_tunnel self.cnn = None self.active_db = "" def __repr__(self): s = f"{self.host}:{self.port} as {self.username}" if self.description: s = f"{self.description} ({s})" return s def connect(self, autocommit=True): """ Establish the connection to the Mysql server @see https://pymysql.readthedocs.io/en/latest/modules/connections.html """ if self.ssh_tunnel: self.ssh_tunnel.start() host, port = self.ssh_tunnel.LOCAL_ADRESS else: host, port = self.host, self.port self.cnn = pymysql.connect(host=host, port=port, user=self.username, password=self.password, autocommit=autocommit, max_allowed_packet=MAX_ALLOWED_PACKET, ) if not self.cnn.open: raise RuntimeError(f'Unable to connect to {self}') return self.cnn def set_active_db(self, dbname): """ set the active database """ self.cnn.select_db(dbname) self.active_db = dbname def close(self): """ Close the connection to the database and the ssh tunnel if one is opened """ if self.cnn: self.cnn.close() if self.ssh_tunnel: self.ssh_tunnel.stop() logger.debug(f'{self} - connection closed') def exec_query(self, sql): """ Execute the sql code and return the resulting cursor @see https://pymysql.readthedocs.io/en/latest/modules/cursors.html """ self.cnn.ping(reconnect=True) cursor = self.cnn.cursor() if LOG_MYSQL_QUERIES: logger.debug(sql) cursor.execute(sql) return cursor def db_exists(self, dbname): """ Return True if the database exists """ cursor = self.exec_query(f"""SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '{dbname}'""") row = cursor.fetchone() return row is not None def get_db_charset(self, dbname): """ return the charset (encoding) of the mysql database """ cursor = self.exec_query(f"""SELECT default_character_set_name FROM information_schema.SCHEMATA S WHERE schema_name = '{dbname}';""") return cursor.fetchone()[0] def list_tables(self, dbname=""): """ Return a list of tables (but not views!) for either the currently selected database, or the one given as a parameter""" cursor = self.exec_query( "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else "")) return (row[0] for row in cursor.fetchall()) def list_views(self, dbname=""): """ Return a list of views for either the currently selected database, or the one given as a parameter""" cursor = self.exec_query( "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else "")) return (row[0] for row in cursor.fetchall()) def get_view_definition(self, view_name, set_definer=""): """ Return the SQL create statement for the view If 'set_definer' is not empty, the username in the 'SET DEFINER' part of the create statement is replaced by the one given """ cursor = self.exec_query(f"show create view {view_name}") definition = cursor.fetchone()[1] if set_definer: # force a new definer definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`', f"DEFINER=`{set_definer}`@`\1`", definition) return definition class MysqlUser: def __init__(self, username, pwd, host='localhost'): self.username = username self.pwd = pwd self.host = host # Operation status UNKNOWN = 0 SUCCESS = 1 FAILURE = 2 # Behaviors for the tables cloning IGNORE = 0 STRUCTURE_ONLY = 1 STRUCTURE_AND_DATA = 2 # -> default behavior class CloningOperation: """ A database cloning operation between two Mysql servers """ def __init__(self, name, dbname, from_server, to_server, grant=None, is_default=True, ignore_tables=None, structure_only=None, filter_tables=None, ignore_views=None, compress=True): self.name = name self.dbname = dbname self.from_server = from_server self.to_server = to_server self.grant = grant if grant is not None else [] self.is_default = is_default self.compress = compress self.ignore_tables = [re.compile(f"^{r}$") for r in ignore_tables] if ignore_tables else [] self.structure_only = [re.compile(f"^{r}$") for r in structure_only] if structure_only else [] self.filter_tables = [re.compile(f"^{r}$") for r in filter_tables] if filter_tables else [] self.ignore_views = [re.compile(f"^{r}$") for r in ignore_views] if ignore_views else [] self.status = UNKNOWN def __repr__(self): return f"Cloning {self.dbname} from {self.from_server} to {self.to_server}" def _build_dump_command(self, dump_options=None, tables=None): """ Build a mysqldump command line and return it as a ready-to-consume list for Popen @see https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary """ tables = tables or [] dump_options = dump_options or [] mysqldump_version = fetch_mysqldump_version() base_cmd = ["mysqldump", "--single-transaction", "-u", self.from_server.username, f"--password={self.from_server.password}", f"--max-allowed-packet={MAX_ALLOWED_PACKET}", "--skip-add-drop-table", "--skip-add-locks", "--skip-comments" ] # <-- fix the occasional 'Unknown table 'COLUMN_STATISTICS' in information_schema' bug # https://stackoverflow.com/questions/52423595/mysqldump-couldnt-execute-unknown-table-column-statistics-in-information-sc if mysqldump_version[0] == 'mysql' and mysqldump_version[1] >= "8": try: self.from_server.exec_query("SELECT COLUMN_NAME FROM information_schema.COLUMN_STATISTICS;") except pymysql.err.OperationalError: base_cmd.append("--column-statistics=0") except pymysql.err.MySQLError: pass # --> if self.compress: base_cmd.append("--compress") if SHOW_PROGRESSION: base_cmd.append("--verbose") if self.from_server.ssh_tunnel: host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS base_cmd += ["--host", host, "--port", str(port)] return base_cmd + dump_options + [self.dbname] + tables def _build_restore_command(self): """ Build a mysql command line and return it as a ready-to-consume list for Popen @see https://dev.mysql.com/doc/refman/8.0/en/mysql-command-options.html#option_mysql_quick """ init_command = f"set global max_allowed_packet={MAX_ALLOWED_PACKET};" \ "set global wait_timeout=28800;" \ "set global interactive_timeout=28800;" cmd = ["mysql", "-h", self.to_server.host, "-P", str(self.to_server.port), "-u", self.to_server.username, f"--password={self.to_server.password}", f"--init-command={init_command}", "--reconnect", "--quick", "--unbuffered", "--wait", "--verbose", "-D", self.dbname ] # if LOG_PIPES_OUTPUT: # cmd.append("--verbose") if self.compress: cmd.append("--compress") return cmd @staticmethod def _run_piped_processes( dump_cmd, restore_cmd, tbl_count): """ Run the dump and the restore commands by piping them The output of the mysqldump process is piped into the input of the mysql one """ logger.debug(">>> Dump command: %s", " ".join(map(str, dump_cmd))) logger.debug(">>> Piped into: %s", " ".join(map(str, restore_cmd))) mysqldump_handler = MysqldumpHandler(logger.name, logging.INFO, tbl_count) mysql_handler = MysqlHandler(logger.name, logging.INFO, tbl_count) try: # noinspection PyTypeChecker with Popen(restore_cmd, stdin=PIPE, stdout=mysql_handler, stderr=mysql_handler) as mysql: # noinspection PyTypeChecker with Popen(dump_cmd, stdout=PIPE, stderr=mysqldump_handler) as mysqldump: mysql.stdin.write(mysqldump.stdout.read()) if mysqldump.returncode: raise RuntimeError('mysqldump returned a non zero code') if mysql.returncode: raise RuntimeError('mysql returned a non zero code') mysql_handler.log_end() except (OSError, RuntimeError, CalledProcessError) as e: logger.error("Execution failed: %s", e) raise RuntimeError(f"An error happened at runtime: {e}") finally: mysqldump_handler.close() mysql_handler.close() def run(self): """ Run the cloning op """ logger.info(f"*** Cloning {self.dbname} ***") logger.info(f"> From {self.from_server}") logger.info(f"> To {self.to_server}") try: self.from_server.connect() self.from_server.set_active_db(self.dbname) logger.debug('Connected to %s', self.from_server) self.to_server.connect() logger.debug('Connected to %s', self.to_server) # Create admin users if not exist for user in self.grant: exists = self.to_server.exec_query( f"SELECT count(*) FROM mysql.user WHERE User = '{user.username}' and Host='{user.host}';" ).fetchone()[0] > 0 if not exists: logger.info(f'Create user %s@%s on %s', user.username, user.host, self.to_server) self.to_server.exec_query( f"CREATE USER '{user.username}'@'{user.host}' IDENTIFIED BY '{user.pwd}';") # List tables tables = {} for tname in self.from_server.list_tables(): if any(rx.match(tname) for rx in self.ignore_tables): tables[tname] = IGNORE elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables): tables[tname] = IGNORE elif any(rx.match(tname) for rx in self.structure_only): tables[tname] = STRUCTURE_ONLY else: tables[tname] = STRUCTURE_AND_DATA restore_cmd = self._build_restore_command() # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ... dump_structure_for = [t for t, s in tables.items() if s != IGNORE] dump_structure_cmd = self._build_dump_command(["--no-data", "--routines"], dump_structure_for) # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ... dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA] dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"], dump_data_for) if tables and not dump_structure_for and not dump_data_for: logging.warning('No table will be cloned') # Recreate the target DB logger.info("(Re)create the database") self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;") self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;") self.to_server.set_active_db(self.dbname) # Grant admin users if any for user in self.grant: self.to_server.exec_query( f"GRANT ALL ON {self.dbname}.* TO '{user.username}'@'{user.host}';" ) # Run mysqldump try: if dump_structure_for: logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...") self._run_piped_processes( dump_structure_cmd, restore_cmd, len(dump_structure_for) ) if dump_data_for: logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...") self._run_piped_processes( dump_data_cmd, restore_cmd, len(dump_data_for) ) logger.info(f"Cloning views...") self.from_server.set_active_db(self.dbname) self.to_server.set_active_db(self.dbname) for v in self.from_server.list_views(self.dbname): if any(rx.match(v) for rx in self.ignore_views): continue logger.debug('* cloning view %s', v) definition = self.from_server.get_view_definition(v, self.to_server.username) try: self.to_server.exec_query(definition) except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e: logger.error('Unable to create the internal view %s: %s', v, e) self.status = SUCCESS logger.info("> the database was successfully cloned") except RuntimeError: self.status = FAILURE logger.error(" An error happened while cloning the '%s' database", self.dbname) finally: self.from_server.close() self.to_server.close() def main(settings, arguments): prompt = not arguments["--yes"] logger.info("Start db cloning utility...") logger.debug(f"Settings: %s", str(settings).replace('\r', '').replace('\n', '')) logger.debug(f"Arguments: %s", str(arguments).replace('\r', '').replace('\n', '')) # Load the servers' configuration servers = {} if 'servers' not in settings: raise RuntimeError(f'Missing section in settings.yml: {servers}') for server_name, server_settings in settings['servers'].items(): hostname = server_settings['host'] match = re.search(r"^docker:(\w+)$", hostname) if match: logger.debug("resolve IP for docker %s", match.group(1)) ip = resolve_docker_ip(match.group(1)) logger.debug("substitute '%s' to '%s' as hostname", ip, hostname) hostname = ip if 'ssh' in server_settings: ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh']) else: ssh_tunnel = None server = MySqlServer(hostname, **server_settings['mysql'], description=server_settings['description'], ssh_tunnel=ssh_tunnel) servers[server_name] = server # Load the users' configuration users = {} for username, args in settings.get('users', {}).items(): host = args.get('host', 'localhost') pwd = args.get('pwd', '') users[username] = MysqlUser(username, pwd, host) # Load the cloning ops' configuration ops = {} if 'operations' not in settings: raise RuntimeError(f'Missing section in settings.yml: {servers}') for name, args in settings['operations'].items(): dbname = args['dbname'] from_server = servers[args['from_server']] to_server = servers[args['to_server']] grant = args.get('grant', []) admins = [user for username, user in users.items() if username in grant] kwargs = {k: v for k, v in args.items() \ if k not in ('dbname', 'from_server', 'to_server', 'grant')} op = CloningOperation(name, dbname, from_server, to_server, admins, **kwargs) ops[name] = op # Operations to launch if arguments.get('', None): selected_ops = [] for opname in arguments['']: try: selected_ops.append(ops[opname]) except KeyError: logger.error('No operation found with name %s', opname) else: selected_ops = [op for op in ops.values() if op.is_default] if not selected_ops: logger.error('No operations to launch') return # Ask for confirmation (except if '--yes' is in arguments) if prompt: logger.debug('Ask for confirmation...') msg = "The following operations will be launched:\n{}\n" \ "WARNING: the existing local databases will be replaced" \ "".format("\n".join(f"* {op}" for op in selected_ops)) if not ask_confirmation(msg): logger.info("-- Operation cancelled by user --") return logger.debug('> User confirmed') # Run the cloning operations for op in selected_ops: op.run() failures = [op.name for op in selected_ops if op.status == FAILURE] if failures: logger.error("WARNING! the following operations failed: %s", ', '.join(failures)) if __name__ == '__main__': # load settings from settings.yml file settings = load_settings() # parse CLI arguments arguments = docopt(__doc__, help=__doc__, version=__VERSION__) with Lockfile(path=HERE / '.clonedb.lock', on_error=lambda: logger.critical("A cloning process is already running, please wait...")): main(settings, arguments)