||
- """
- Script de clonage des bases de données MySql
- (requiert python 3.6+)
- > Configuration: settings.yml
- Usage:
- clonedb.py [-v] [-y] [<dbname>]
- clonedb.py (-h | --help)
- clonedb.py --version
- Options:
- -y, --yes Do not ask for confirmation
- -h --help Show this screen.
- --version Show version.
- @author: olivier.massot, 05-2020
- """
- import logging
- import os
- import re
- import threading
- from subprocess import Popen, PIPE, CalledProcessError
- import sys
- import pymysql
- import yaml
- from docopt import docopt
- from path import Path
- from sshtunnel import SSHTunnelForwarder
- import logging_
- from locker import Lockfile
- __VERSION__ = "0.1"
- HERE = Path(__file__).parent
- # Start logger
- logger = logging.getLogger('clonedb')
- logging_.start("clonedb", replace=True)
- # FIX the default ascii encoding on some linux dockers...
- sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
- # Options
- VERBOSE_DUMP = True
- # Utilities
- def load_settings():
- # Load settings
- with open(HERE / 'settings.yml', 'r') as f:
- settings = yaml.load(f, Loader=yaml.FullLoader)
- return settings
- class StderrPipeHandler(threading.Thread):
- """ Handle the stderr output from a Popen object """
- def __init__(self, logger_name, default_level=logging.INFO):
- """Setup the object with a logger and a loglevel
- and start the thread
- """
- threading.Thread.__init__(self)
- self.logger = logging.getLogger(logger_name)
- self.daemon = False
- self.default_level = default_level
- self.fdRead, self.fdWrite = os.pipe()
- self.pipeReader = os.fdopen(self.fdRead)
- self.start()
- def fileno(self):
- """Return the write file descriptor of the pipe
- """
- return self.fdWrite
- def process(self, line):
- self.logger.log(self.default_level, line.strip('\n'))
- def run(self):
- """Run the thread, logging everything.
- """
- for line in iter(self.pipeReader.readline, ''):
- self.process(line)
- self.pipeReader.close()
- def close(self):
- """Close the write end of the pipe.
- """
- os.close(self.fdWrite)
- class StderrMysqlDumpHandler(StderrPipeHandler):
- """ Handle and process the stderr output from a mysqldump process """
- _rx_newtable = re.compile(r'Retrieving table structure for table (\w+)')
- def process(self, line):
- line = line.strip('\n')
- match = self._rx_newtable.search(line)
- if match:
- print('.', end="", flush=True)
- # logger.info('** %s', match.group(1))
- logger.debug(line)
- def close(self):
- print('', flush=True)
- super().close()
- class SshTunnel:
- LOCAL_ADRESS = ('127.0.0.1', 6000)
- def __init__(self, host, remote_port, port=22, user="root", key_file="~/.ssh/id_rsa"):
- self.host = host
- self.remote_port = remote_port
- self.port = int(port)
- self.user = user
- self.key_file = key_file
- self._tunnel = SSHTunnelForwarder(
- (self.host, self.port),
- ssh_username=self.user,
- ssh_pkey=self.key_file,
- local_bind_address=self.LOCAL_ADRESS,
- remote_bind_address=('127.0.0.1', self.remote_port)
- )
- def start(self):
- self._tunnel.start()
- if not self._tunnel.tunnel_is_up[self.LOCAL_ADRESS]:
- raise RuntimeError('Unable to open the SSH Tunnel')
- def stop(self):
- self._tunnel.stop()
- class MySqlServer:
- def __init__(self, host, port, username, password, description="", ssh_tunnel=None):
- self.host = host
- self.port = port
- self.username = username
- self.password = password
- self.description = description or "no description"
- self.ssh_tunnel = ssh_tunnel
- self.cnn = None
- self.active_db = ""
- def __repr__(self):
- return f"{self.host}:{self.port} as {self.username} ({self.description})"
- def connect(self, autocommit=True):
- if self.ssh_tunnel:
- self.ssh_tunnel.start()
- host, port = self.ssh_tunnel.LOCAL_ADRESS
- else:
- host, port = self.host, self.port
- self.cnn = pymysql.connect(host=host,
- port=port,
- user=self.username,
- password=self.password,
- autocommit=autocommit)
- if not self.cnn.open:
- raise RuntimeError(f'Unable to connect to {self}')
- return self.cnn
- def set_constraints(self, active):
- self.exec_query(f"SET FOREIGN_KEY_CHECKS={int(bool(active))};")
- def warmup(self):
- self.set_constraints(True)
- self.exec_query(""" SET MAX_ALLOWED_PACKETS=1073741824;
- SET CONNECT_TIMEOUT=28800;
- SET WAIT_TIMEOUT=28800;
- SET INTERACTIVE_TIMEOUT=28800;
- """)
- def set_active_db(self, dbname):
- self.cnn.select_db(dbname)
- self.active_db = dbname
- def close(self):
- if self.cnn:
- self.cnn.close()
- if self.ssh_tunnel:
- self.ssh_tunnel.stop()
- def exec_query(self, sql):
- """ Execute the sql code and return the cursor """
- cursor = self.cnn.cursor()
- logger.debug(sql)
- cursor.execute(sql)
- return cursor
- def db_exists(self, dbname):
- cursor = self.exec_query(f"""SELECT SCHEMA_NAME
- FROM INFORMATION_SCHEMA.SCHEMATA
- WHERE SCHEMA_NAME = '{dbname}'""")
- row = cursor.fetchone()
- return row is not None
- def list_tables(self, dbname=""):
- """ Return a list of tables (not views!)
- for either the current database or the given one"""
- cursor = self.exec_query(
- "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else ""))
- return (row[0] for row in cursor.fetchall())
- def list_views(self, dbname=""):
- """ Return a list of views
- for either the current database or the given one"""
- cursor = self.exec_query(
- "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else ""))
- return (row[0] for row in cursor.fetchall())
- def get_view_definition(self, view_name):
- cursor = self.exec_query(f"show create view {view_name}")
- return cursor.fetchone()[1]
- IGNORE = 0
- STRUCTURE_ONLY = 1
- STRUCTURE_AND_DATA = 2 # default
- class CloningOperation:
- def __init__(self, dbname, from_server, to_server, is_default=True, ignore_tables=None, structure_only=None,
- filter_tables=None, ignore_views=None, ignore_procedures=None, ignore_functions=None):
- self.dbname = dbname
- self.from_server = from_server
- self.to_server = to_server
- self.is_default = is_default
- self.ignore_tables = [re.compile(r) for r in ignore_tables] if ignore_tables else []
- self.structure_only = [re.compile(r) for r in structure_only] if structure_only else []
- self.filter_tables = [re.compile(r) for r in filter_tables] if filter_tables else []
- self.ignore_views = [re.compile(r) for r in ignore_views] if ignore_views else []
- self.ignore_procedures = [re.compile(r) for r in ignore_procedures] if ignore_procedures else []
- self.ignore_functions = [re.compile(r) for r in ignore_functions] if ignore_functions else []
- self.table_count = None
- def __repr__(self):
- return f"Clone {self.dbname} from {self.from_server} to {self.to_server} [ignored tables: {len(self.ignore_tables)}]"
- def _build_dump_command(self, dump_options=None, tables=None):
- # > https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary
- tables = tables or []
- dump_options = dump_options or []
- base_cmd = ["mysqldump",
- "--single-transaction",
- "-u", self.from_server.username,
- f"--password={self.from_server.password}",
- # "--compress"
- ]
- if VERBOSE_DUMP:
- base_cmd.append("--verbose")
- if self.from_server.ssh_tunnel:
- host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS
- base_cmd += ["--host", host,
- "--port", str(port)]
- return base_cmd + dump_options + [self.dbname] + tables
- def _build_restore_command(self):
- return ["mysql",
- "-h", self.to_server.host,
- "-P", str(self.to_server.port),
- "-u", self.to_server.username,
- f"--password={self.to_server.password}",
- "-D", self.dbname
- ]
- def _run_piped_processes(self, dump_cmd, restore_cmd):
- logger.debug("Run: %s | %s", " ".join(map(str, dump_cmd)), " ".join(map(str, restore_cmd)))
- logger.debug("Dump command: %s", " ".join(map(str, dump_cmd)))
- logger.debug("Piped into: %s", " ".join(map(str, restore_cmd)))
- stderr_handler = StderrMysqlDumpHandler(logger.name)
- try:
- # noinspection PyTypeChecker
- with Popen(restore_cmd, stdin=PIPE, stdout=PIPE, stderr=stderr_handler) as mysql:
- # noinspection PyTypeChecker
- with Popen(dump_cmd, stdout=PIPE, stderr=stderr_handler) as mysqldump:
- mysql.stdin.write(mysqldump.stdout.read())
- if mysqldump.returncode or mysql.returncode:
- raise RuntimeError
- except (OSError, RuntimeError, CalledProcessError) as e:
- logger.error("Execution failed: %s", e)
- raise RuntimeError(f"An error happened at runtime: {e}")
- finally:
- stderr_handler.close()
- def run(self):
- logger.info(f"*** Cloning {self.dbname} ***")
- logger.info(f"> From {self.from_server}")
- logger.info(f"> To {self.to_server}")
- try:
- self.from_server.connect()
- self.from_server.set_active_db(self.dbname)
- logger.debug('Connected to %s', self.from_server)
- self.to_server.connect()
- logger.debug('Connected to %s', self.to_server)
- tables = {}
- for tname in self.from_server.list_tables():
- if any(rx.match(tname) for rx in self.ignore_tables):
- tables[tname] = IGNORE
- elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables):
- tables[tname] = IGNORE
- elif any(rx.match(tname) for rx in self.structure_only):
- tables[tname] = STRUCTURE_ONLY
- else:
- tables[tname] = STRUCTURE_AND_DATA
- restore_cmd = self._build_restore_command()
- # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ...
- dump_structure_for = [t for t, s in tables.items() if s != IGNORE]
- dump_structure_cmd = self._build_dump_command(["--single-transaction", "--no-data", "--routines"],
- dump_structure_for)
- # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ...
- dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA]
- dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"],
- dump_data_for)
- if tables and not dump_structure_for and not dump_data_for:
- logging.warning('No table will be cloned')
- # Recreate the target DB
- logger.info("(Re)create the database")
- self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;")
- self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;")
- self.to_server.set_active_db(self.dbname)
- # Run mysqldump
- try:
- if dump_structure_for:
- logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...")
- self._run_piped_processes(dump_structure_cmd, restore_cmd)
- if dump_data_for:
- logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...")
- self._run_piped_processes(dump_data_cmd, restore_cmd)
- logger.info(f"Cloning views...")
- for v in self.from_server.list_views(self.dbname):
- if any(rx.match(v) for rx in self.ignore_views):
- continue
- logger.debug('* cloning view %s', v)
- definition = self.from_server.get_view_definition(v)
- # force the definer
- definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`',
- f"DEFINER=`{self.to_server.username}`@`localhost`",
- definition)
- try:
- self.to_server.exec_query(definition)
- except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e:
- logger.error('Unable to create the internal view {p}: {e}')
- logger.info("> the database was successfully cloned")
- except RuntimeError:
- logger.error(f"<!> An error happened while cloning the '{self.dbname}' database")
- finally:
- self.to_server.set_constraints(True)
- self.from_server.close()
- self.to_server.close()
- def ask_confirmation(msg):
- logger.debug('Ask for confirmation...')
- msg += "\nWould you like to continue? (yes/no)"
- while 1:
- answer = input(msg)
- if answer in ('oui', 'yes', 'y', 'o'):
- logger.debug(f"> user confirmed by answering '{answer}'")
- return True
- elif answer in ('non', 'no', 'n'):
- logger.debug(f"> user cancelled by answering '{answer}'")
- return False
- else:
- msg = "The answer could'nt be understood. Continue? (yes/no)"
- def main(settings, arguments):
- prompt = not arguments["--yes"]
- logger.info("Start db cloning utility...")
- logger.debug(f"Settings: {settings}")
- logger.debug(f"Arguments: {arguments}")
- # Load the servers' configuration
- servers = {}
- for server_name, server_settings in settings['servers'].items():
- hostname = server_settings['host']
- if 'ssh' in server_settings:
- ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh'])
- else:
- ssh_tunnel = None
- server = MySqlServer(hostname,
- **server_settings['mysql'],
- description=server_settings['description'],
- ssh_tunnel=ssh_tunnel)
- servers[server_name] = server
- # Load the cloning ops' configuration
- ops = {}
- for name, args in settings['operations'].items():
- dbname = args['dbname']
- from_server = servers[args['from_server']]
- to_server = servers[args['to_server']]
- kwargs = {k: v for k, v in args.items() if k not in ('dbname', 'from_server', 'to_server')}
- op = CloningOperation(dbname, from_server, to_server, **kwargs)
- ops[name] = op
- # Operations to launch
- if arguments.get('<dbname>', None):
- selected_ops = [ops[arguments['<dbname>']]]
- else:
- selected_ops = [op for op in ops.values() if op.is_default]
- if not selected_ops:
- raise RuntimeError('No operation to launch')
- # Ask for confirmation (except if '--yes' is in arguments)
- if prompt:
- # Ask for confirmation
- msg = "The following operations will be launched:\n{}\n" \
- "WARNING: the existing local databases will be replaced" \
- "".format("\n".join(f"* {op}" for op in selected_ops))
- if not ask_confirmation(msg):
- logger.info("-- Operation cancelled by user --")
- return
- for op in selected_ops:
- op.run()
- if __name__ == '__main__':
- # load settings from settings.yml file
- settings = load_settings()
- # parse CLI arguments
- arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
- with Lockfile(path=HERE / '.clonedb.lock',
- on_error=lambda: logger.critical("A cloning process is already running, please wait...")):
- main(settings, arguments)
|