""" Script de clonage des bases de données MySql (requiert python 3.6+) > Configuration: settings.yml Usage: clonedb.py [-v] [-y] [] clonedb.py (-h | --help) clonedb.py --version Options: -y, --yes Do not ask for confirmation -h --help Show this screen. --version Show version. @author: olivier.massot, 05-2020 """ import logging import os import re import threading from subprocess import Popen, PIPE, CalledProcessError import sys import pymysql import yaml from docopt import docopt from path import Path from sshtunnel import SSHTunnelForwarder import logging_ from locker import Lockfile __VERSION__ = "0.1" HERE = Path(__file__).parent # Start logger logger = logging.getLogger('clonedb') logging_.start("clonedb", replace=True) # FIX the default ascii encoding on some linux dockers... sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1) # Options VERBOSE_DUMP = True # Utilities def load_settings(): # Load settings with open(HERE / 'settings.yml', 'r') as f: settings = yaml.load(f, Loader=yaml.FullLoader) return settings class StderrPipeHandler(threading.Thread): """ Handle the stderr output from a Popen object """ def __init__(self, logger_name, default_level=logging.INFO): """Setup the object with a logger and a loglevel and start the thread """ threading.Thread.__init__(self) self.logger = logging.getLogger(logger_name) self.daemon = False self.default_level = default_level self.fdRead, self.fdWrite = os.pipe() self.pipeReader = os.fdopen(self.fdRead) self.start() def fileno(self): """Return the write file descriptor of the pipe """ return self.fdWrite def process(self, line): self.logger.log(self.default_level, line.strip('\n')) def run(self): """Run the thread, logging everything. """ for line in iter(self.pipeReader.readline, ''): self.process(line) self.pipeReader.close() def close(self): """Close the write end of the pipe. """ os.close(self.fdWrite) class StderrMysqlDumpHandler(StderrPipeHandler): """ Handle and process the stderr output from a mysqldump process """ _rx_newtable = re.compile(r'Retrieving table structure for table (\w+)') def process(self, line): line = line.strip('\n') match = self._rx_newtable.search(line) if match: print('.', end="", flush=True) # logger.info('** %s', match.group(1)) logger.debug(line) def close(self): print('', flush=True) super().close() class SshTunnel: LOCAL_ADRESS = ('127.0.0.1', 6000) def __init__(self, host, remote_port, port=22, user="root", key_file="~/.ssh/id_rsa"): self.host = host self.remote_port = remote_port self.port = int(port) self.user = user self.key_file = key_file self._tunnel = SSHTunnelForwarder( (self.host, self.port), ssh_username=self.user, ssh_pkey=self.key_file, local_bind_address=self.LOCAL_ADRESS, remote_bind_address=('127.0.0.1', self.remote_port) ) def start(self): self._tunnel.start() if not self._tunnel.tunnel_is_up[self.LOCAL_ADRESS]: raise RuntimeError('Unable to open the SSH Tunnel') def stop(self): self._tunnel.stop() class MySqlServer: def __init__(self, host, port, username, password, description="", ssh_tunnel=None): self.host = host self.port = port self.username = username self.password = password self.description = description or "no description" self.ssh_tunnel = ssh_tunnel self.cnn = None self.active_db = "" def __repr__(self): return f"{self.host}:{self.port} as {self.username} ({self.description})" def connect(self, autocommit=True): if self.ssh_tunnel: self.ssh_tunnel.start() host, port = self.ssh_tunnel.LOCAL_ADRESS else: host, port = self.host, self.port self.cnn = pymysql.connect(host=host, port=port, user=self.username, password=self.password, autocommit=autocommit) if not self.cnn.open: raise RuntimeError(f'Unable to connect to {self}') return self.cnn def set_constraints(self, active): self.exec_query(f"SET FOREIGN_KEY_CHECKS={int(bool(active))};") def warmup(self): self.set_constraints(True) self.exec_query(""" SET MAX_ALLOWED_PACKETS=1073741824; SET CONNECT_TIMEOUT=28800; SET WAIT_TIMEOUT=28800; SET INTERACTIVE_TIMEOUT=28800; """) def set_active_db(self, dbname): self.cnn.select_db(dbname) self.active_db = dbname def close(self): if self.cnn: self.cnn.close() if self.ssh_tunnel: self.ssh_tunnel.stop() def exec_query(self, sql): """ Execute the sql code and return the cursor """ cursor = self.cnn.cursor() logger.debug(sql) cursor.execute(sql) return cursor def db_exists(self, dbname): cursor = self.exec_query(f"""SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '{dbname}'""") row = cursor.fetchone() return row is not None def list_tables(self, dbname=""): """ Return a list of tables (not views!) for either the current database or the given one""" cursor = self.exec_query( "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else "")) return (row[0] for row in cursor.fetchall()) def list_views(self, dbname=""): """ Return a list of views for either the current database or the given one""" cursor = self.exec_query( "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else "")) return (row[0] for row in cursor.fetchall()) def get_view_definition(self, view_name): cursor = self.exec_query(f"show create view {view_name}") return cursor.fetchone()[1] IGNORE = 0 STRUCTURE_ONLY = 1 STRUCTURE_AND_DATA = 2 # default class CloningOperation: def __init__(self, dbname, from_server, to_server, is_default=True, ignore_tables=None, structure_only=None, filter_tables=None, ignore_views=None, ignore_procedures=None, ignore_functions=None): self.dbname = dbname self.from_server = from_server self.to_server = to_server self.is_default = is_default self.ignore_tables = [re.compile(r) for r in ignore_tables] if ignore_tables else [] self.structure_only = [re.compile(r) for r in structure_only] if structure_only else [] self.filter_tables = [re.compile(r) for r in filter_tables] if filter_tables else [] self.ignore_views = [re.compile(r) for r in ignore_views] if ignore_views else [] self.ignore_procedures = [re.compile(r) for r in ignore_procedures] if ignore_procedures else [] self.ignore_functions = [re.compile(r) for r in ignore_functions] if ignore_functions else [] self.table_count = None def __repr__(self): return f"Clone {self.dbname} from {self.from_server} to {self.to_server} [ignored tables: {len(self.ignore_tables)}]" def _build_dump_command(self, dump_options=None, tables=None): # > https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary tables = tables or [] dump_options = dump_options or [] base_cmd = ["mysqldump", "--single-transaction", "-u", self.from_server.username, f"--password={self.from_server.password}", # "--compress" ] if VERBOSE_DUMP: base_cmd.append("--verbose") if self.from_server.ssh_tunnel: host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS base_cmd += ["--host", host, "--port", str(port)] return base_cmd + dump_options + [self.dbname] + tables def _build_restore_command(self): return ["mysql", "-h", self.to_server.host, "-P", str(self.to_server.port), "-u", self.to_server.username, f"--password={self.to_server.password}", "-D", self.dbname ] def _run_piped_processes(self, dump_cmd, restore_cmd): logger.debug("Run: %s | %s", " ".join(map(str, dump_cmd)), " ".join(map(str, restore_cmd))) logger.debug("Dump command: %s", " ".join(map(str, dump_cmd))) logger.debug("Piped into: %s", " ".join(map(str, restore_cmd))) stderr_handler = StderrMysqlDumpHandler(logger.name) try: # noinspection PyTypeChecker with Popen(restore_cmd, stdin=PIPE, stdout=PIPE, stderr=stderr_handler) as mysql: # noinspection PyTypeChecker with Popen(dump_cmd, stdout=PIPE, stderr=stderr_handler) as mysqldump: mysql.stdin.write(mysqldump.stdout.read()) if mysqldump.returncode or mysql.returncode: raise RuntimeError except (OSError, RuntimeError, CalledProcessError) as e: logger.error("Execution failed: %s", e) raise RuntimeError(f"An error happened at runtime: {e}") finally: stderr_handler.close() def run(self): logger.info(f"*** Cloning {self.dbname} ***") logger.info(f"> From {self.from_server}") logger.info(f"> To {self.to_server}") try: self.from_server.connect() self.from_server.set_active_db(self.dbname) logger.debug('Connected to %s', self.from_server) self.to_server.connect() logger.debug('Connected to %s', self.to_server) tables = {} for tname in self.from_server.list_tables(): if any(rx.match(tname) for rx in self.ignore_tables): tables[tname] = IGNORE elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables): tables[tname] = IGNORE elif any(rx.match(tname) for rx in self.structure_only): tables[tname] = STRUCTURE_ONLY else: tables[tname] = STRUCTURE_AND_DATA restore_cmd = self._build_restore_command() # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ... dump_structure_for = [t for t, s in tables.items() if s != IGNORE] dump_structure_cmd = self._build_dump_command(["--single-transaction", "--no-data", "--routines"], dump_structure_for) # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ... dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA] dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"], dump_data_for) if tables and not dump_structure_for and not dump_data_for: logging.warning('No table will be cloned') # Recreate the target DB logger.info("(Re)create the database") self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;") self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;") self.to_server.set_active_db(self.dbname) # Run mysqldump try: if dump_structure_for: logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...") self._run_piped_processes(dump_structure_cmd, restore_cmd) if dump_data_for: logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...") self._run_piped_processes(dump_data_cmd, restore_cmd) logger.info(f"Cloning views...") for v in self.from_server.list_views(self.dbname): if any(rx.match(v) for rx in self.ignore_views): continue logger.debug('* cloning view %s', v) definition = self.from_server.get_view_definition(v) # force the definer definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`', f"DEFINER=`{self.to_server.username}`@`localhost`", definition) try: self.to_server.exec_query(definition) except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e: logger.error('Unable to create the internal view {p}: {e}') logger.info("> the database was successfully cloned") except RuntimeError: logger.error(f" An error happened while cloning the '{self.dbname}' database") finally: self.to_server.set_constraints(True) self.from_server.close() self.to_server.close() def ask_confirmation(msg): logger.debug('Ask for confirmation...') msg += "\nWould you like to continue? (yes/no)" while 1: answer = input(msg) if answer in ('oui', 'yes', 'y', 'o'): logger.debug(f"> user confirmed by answering '{answer}'") return True elif answer in ('non', 'no', 'n'): logger.debug(f"> user cancelled by answering '{answer}'") return False else: msg = "The answer could'nt be understood. Continue? (yes/no)" def main(settings, arguments): prompt = not arguments["--yes"] logger.info("Start db cloning utility...") logger.debug(f"Settings: {settings}") logger.debug(f"Arguments: {arguments}") # Load the servers' configuration servers = {} for server_name, server_settings in settings['servers'].items(): hostname = server_settings['host'] if 'ssh' in server_settings: ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh']) else: ssh_tunnel = None server = MySqlServer(hostname, **server_settings['mysql'], description=server_settings['description'], ssh_tunnel=ssh_tunnel) servers[server_name] = server # Load the cloning ops' configuration ops = {} for name, args in settings['operations'].items(): dbname = args['dbname'] from_server = servers[args['from_server']] to_server = servers[args['to_server']] kwargs = {k: v for k, v in args.items() if k not in ('dbname', 'from_server', 'to_server')} op = CloningOperation(dbname, from_server, to_server, **kwargs) ops[name] = op # Operations to launch if arguments.get('', None): selected_ops = [ops[arguments['']]] else: selected_ops = [op for op in ops.values() if op.is_default] if not selected_ops: raise RuntimeError('No operation to launch') # Ask for confirmation (except if '--yes' is in arguments) if prompt: # Ask for confirmation msg = "The following operations will be launched:\n{}\n" \ "WARNING: the existing local databases will be replaced" \ "".format("\n".join(f"* {op}" for op in selected_ops)) if not ask_confirmation(msg): logger.info("-- Operation cancelled by user --") return for op in selected_ops: op.run() if __name__ == '__main__': # load settings from settings.yml file settings = load_settings() # parse CLI arguments arguments = docopt(__doc__, help=__doc__, version=__VERSION__) with Lockfile(path=HERE / '.clonedb.lock', on_error=lambda: logger.critical("A cloning process is already running, please wait...")): main(settings, arguments)