| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- """
- Script de clonage des bases de données MySql
- (requiert python 3.6+)
- > Configuration: settings.yml
- Usage:
- clonedb.py [-v] [-y] [<dbname>]
- clonedb.py (-h | --help)
- clonedb.py --version
- Options:
- -y, --yes Do not ask for confirmation
- -h --help Show this screen.
- --version Show version.
- @author: olivier.massot, 05-2020
- """
- import logging
- import re
- from subprocess import Popen, PIPE, CalledProcessError
- import sys
- import pymysql
- import yaml
- from docopt import docopt
- from path import Path
- from core import logging_
- from core.docker import resolve_docker_ip
- from core.locker import Lockfile
- from core.pipe_handler import PipeHandler
- from core.ssh import SshTunnel
- from core.prompt import ask_confirmation
- __VERSION__ = "0.2"
- HERE = Path(__file__).parent
- # Start logger
- logger = logging.getLogger('clonedb')
- logging_.start("clonedb", filename=HERE / 'log' / 'clonedb.log', replace=True)
- # FIX the default ascii encoding on some linux dockers...
- sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
- # Options
- SHOW_PROGRESSION = True
- MAX_ALLOWED_PACKET = 1073741824
- DEBUG = False
- LOG_MYSQL_QUERIES = DEBUG
- # Utilities
- def load_settings():
- """ Load the settings from the 'settings.yml' file
- """
- with open(HERE / 'settings.yml', 'r') as f:
- return yaml.load(f, Loader=yaml.FullLoader)
- class MysqldumpHandler(PipeHandler):
- """ Handle and process the stdout / stderr output from a mysqldump process
- """
- _rx_newtable = re.compile(r'Retrieving table structure for table (\w+)')
- def process(self, line):
- """ Process the last line that was read
- """
- line = line.strip('\n')
- if SHOW_PROGRESSION:
- match = self._rx_newtable.search(line)
- if match:
- # logger.debug('... %s', match.group(1))
- print('.', end="", flush=True)
- logger.debug(line)
- def close(self):
- """ Close the write end of the pipe.
- """
- print('', flush=True)
- super().close()
- class MySqlServer:
- """ A server hosting a Mysql instance
- """
- def __init__(self, host, port, username, password, description="", ssh_tunnel=None):
- self.host = host
- self.port = port
- self.username = username
- self.password = password
- self.description = description[:30]
- self.ssh_tunnel = ssh_tunnel
- self.cnn = None
- self.active_db = ""
- def __repr__(self):
- s = f"{self.host}:{self.port} as {self.username}"
- if self.description:
- s = f"{self.description} ({s})"
- return s
- def connect(self, autocommit=True):
- """ Establish the connection to the Mysql server
- @see https://pymysql.readthedocs.io/en/latest/modules/connections.html
- """
- if self.ssh_tunnel:
- self.ssh_tunnel.start()
- host, port = self.ssh_tunnel.LOCAL_ADRESS
- else:
- host, port = self.host, self.port
- self.cnn = pymysql.connect(host=host,
- port=port,
- user=self.username,
- password=self.password,
- autocommit=autocommit,
- max_allowed_packet=MAX_ALLOWED_PACKET,
- )
- if not self.cnn.open:
- raise RuntimeError(f'Unable to connect to {self}')
- return self.cnn
- def set_active_db(self, dbname):
- """ set the active database
- """
- self.cnn.select_db(dbname)
- self.active_db = dbname
- def close(self):
- """ Close the connection to the database
- and the ssh tunnel if one is opened
- """
- if self.cnn:
- self.cnn.close()
- if self.ssh_tunnel:
- self.ssh_tunnel.stop()
- logger.debug(f'{self} - connection closed')
- def exec_query(self, sql):
- """ Execute the sql code and return the resulting cursor
- @see https://pymysql.readthedocs.io/en/latest/modules/cursors.html
- """
- self.cnn.ping(reconnect=True)
- cursor = self.cnn.cursor()
- if LOG_MYSQL_QUERIES:
- logger.debug(sql)
- cursor.execute(sql)
- return cursor
- def db_exists(self, dbname):
- """ Return True if the database exists
- """
- cursor = self.exec_query(f"""SELECT SCHEMA_NAME
- FROM INFORMATION_SCHEMA.SCHEMATA
- WHERE SCHEMA_NAME = '{dbname}'""")
- row = cursor.fetchone()
- return row is not None
- def list_tables(self, dbname=""):
- """ Return a list of tables (but not views!)
- for either the currently selected database,
- or the one given as a parameter"""
- cursor = self.exec_query(
- "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else ""))
- return (row[0] for row in cursor.fetchall())
- def list_views(self, dbname=""):
- """ Return a list of views
- for either the currently selected database,
- or the one given as a parameter"""
- cursor = self.exec_query(
- "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else ""))
- return (row[0] for row in cursor.fetchall())
- def get_view_definition(self, view_name, set_definer=""):
- """ Return the SQL create statement for the view
- If 'set_definer' is not empty, the username in the 'SET DEFINER' part
- of the create statement is replace by the one given
- """
- cursor = self.exec_query(f"show create view {view_name}")
- definition = cursor.fetchone()[1]
- if set_definer:
- # force a new definer
- definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`',
- f"DEFINER=`{set_definer}`@`\1`",
- definition)
- return definition
- # Beahviors for the tables cloning
- IGNORE = 0
- STRUCTURE_ONLY = 1
- STRUCTURE_AND_DATA = 2 # -> default behavior
- class CloningOperation:
- """ A database cloning operation between two Mysql servers
- """
- def __init__(self, dbname, from_server, to_server, is_default=True, ignore_tables=None, structure_only=None,
- filter_tables=None, ignore_views=None, compress=True):
- self.dbname = dbname
- self.from_server = from_server
- self.to_server = to_server
- self.is_default = is_default
- self.compress = compress
- self.ignore_tables = [re.compile(r) for r in ignore_tables] if ignore_tables else []
- self.structure_only = [re.compile(r) for r in structure_only] if structure_only else []
- self.filter_tables = [re.compile(r) for r in filter_tables] if filter_tables else []
- self.ignore_views = [re.compile(r) for r in ignore_views] if ignore_views else []
- def __repr__(self):
- return f"Cloning {self.dbname} from {self.from_server} to {self.to_server}"
- def _build_dump_command(self, dump_options=None, tables=None):
- """ Build a mysqldump command line and return it as a
- ready-to-consume list for Popen
- @see https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary
- """
- tables = tables or []
- dump_options = dump_options or []
- base_cmd = ["mysqldump",
- "--single-transaction",
- "-u", self.from_server.username,
- f"--password={self.from_server.password}",
- "--skip-add-drop-table",
- "--skip-add-locks",
- "--skip-comments",
- ]
- if self.compress:
- base_cmd.append("--compress")
- if SHOW_PROGRESSION:
- base_cmd.append("--verbose")
- if self.from_server.ssh_tunnel:
- host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS
- base_cmd += ["--host", host,
- "--port", str(port)]
- return base_cmd + dump_options + [self.dbname] + tables
- def _build_restore_command(self):
- """ Build a mysql command line and return it as a
- ready-to-consume list for Popen
- @see https://dev.mysql.com/doc/refman/8.0/en/mysql-command-options.html#option_mysql_quick
- """
- cmd = ["mysql",
- "-h", self.to_server.host,
- "-P", str(self.to_server.port),
- "-u", self.to_server.username,
- f"--password={self.to_server.password}",
- f"--max-allowed-packet={MAX_ALLOWED_PACKET}",
- "--reconnect",
- "--quick",
- "-D", self.dbname
- ]
- if LOG_MYSQL_QUERIES:
- cmd.append("--verbose")
- if self.compress:
- cmd.append("--compress")
- return cmd
- @staticmethod
- def _run_piped_processes(dump_cmd, restore_cmd):
- """ Run the dump and the restore commands by piping them
- The output of the mysqldump process is piped into the input of the mysql one
- """
- logger.debug(">>> Dump command: %s", " ".join(map(str, dump_cmd)))
- logger.debug(">>> Piped into: %s", " ".join(map(str, restore_cmd)))
- pipe_handler = MysqldumpHandler(logger.name)
- try:
- # noinspection PyTypeChecker
- with Popen(restore_cmd, stdin=PIPE, stdout=pipe_handler, stderr=pipe_handler) as mysql:
- # noinspection PyTypeChecker
- with Popen(dump_cmd, stdout=PIPE, stderr=pipe_handler) as mysqldump:
- mysql.stdin.write(mysqldump.stdout.read())
- if mysqldump.returncode:
- raise RuntimeError('mysqldump returned a non zero code')
- if mysql.returncode:
- raise RuntimeError('mysql returned a non zero code')
- except (OSError, RuntimeError, CalledProcessError) as e:
- logger.error("Execution failed: %s", e)
- raise RuntimeError(f"An error happened at runtime: {e}")
- finally:
- pipe_handler.close()
- def run(self):
- """ Run the cloning op
- """
- logger.info(f"*** Cloning {self.dbname} ***")
- logger.info(f"> From {self.from_server}")
- logger.info(f"> To {self.to_server}")
- try:
- self.from_server.connect()
- self.from_server.set_active_db(self.dbname)
- logger.debug('Connected to %s', self.from_server)
- self.to_server.connect()
- logger.debug('Connected to %s', self.to_server)
- tables = {}
- for tname in self.from_server.list_tables():
- if any(rx.match(tname) for rx in self.ignore_tables):
- tables[tname] = IGNORE
- elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables):
- tables[tname] = IGNORE
- elif any(rx.match(tname) for rx in self.structure_only):
- tables[tname] = STRUCTURE_ONLY
- else:
- tables[tname] = STRUCTURE_AND_DATA
- restore_cmd = self._build_restore_command()
- # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ...
- dump_structure_for = [t for t, s in tables.items() if s != IGNORE]
- dump_structure_cmd = self._build_dump_command(["--single-transaction", "--no-data", "--routines"],
- dump_structure_for)
- # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ...
- dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA]
- dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"],
- dump_data_for)
- if tables and not dump_structure_for and not dump_data_for:
- logging.warning('No table will be cloned')
- # Recreate the target DB
- logger.info("(Re)create the database")
- self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;")
- self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;")
- self.to_server.set_active_db(self.dbname)
- # Run mysqldump
- try:
- if dump_structure_for:
- logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...")
- self._run_piped_processes(dump_structure_cmd, restore_cmd)
- if dump_data_for:
- logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...")
- self._run_piped_processes(dump_data_cmd, restore_cmd)
- logger.info(f"Cloning views...")
- for v in self.from_server.list_views(self.dbname):
- if any(rx.match(v) for rx in self.ignore_views):
- continue
- logger.debug('* cloning view %s', v)
- definition = self.from_server.get_view_definition(v, self.to_server.username)
- try:
- self.to_server.exec_query(definition)
- except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e:
- logger.error('Unable to create the internal view {p}: {e}')
- logger.info("> the database was successfully cloned")
- except RuntimeError:
- logger.error(f"<!> An error happened while cloning the '{self.dbname}' database")
- finally:
- self.from_server.close()
- self.to_server.close()
- def main(settings, arguments):
- prompt = not arguments["--yes"]
- logger.info("Start db cloning utility...")
- logger.debug(f"Settings: %s", str(settings).replace('\r', '').replace('\n', ''))
- logger.debug(f"Arguments: %s", str(arguments).replace('\r', '').replace('\n', ''))
- # Load the servers' configuration
- servers = {}
- for server_name, server_settings in settings['servers'].items():
- hostname = server_settings['host']
- match = re.search(r"^docker:(\w+)$", hostname)
- if match:
- logger.debug("resolve IP for docker %s", match.group(1))
- ip = resolve_docker_ip(match.group(1))
- logger.debug("substitute '%s' to '%s' as hostname", ip, hostname)
- hostname = ip
- if 'ssh' in server_settings:
- ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh'])
- else:
- ssh_tunnel = None
- server = MySqlServer(hostname,
- **server_settings['mysql'],
- description=server_settings['description'],
- ssh_tunnel=ssh_tunnel)
- servers[server_name] = server
- # Load the cloning ops' configuration
- ops = {}
- for name, args in settings['operations'].items():
- dbname = args['dbname']
- from_server = servers[args['from_server']]
- to_server = servers[args['to_server']]
- kwargs = {k: v for k, v in args.items() if k not in ('dbname', 'from_server', 'to_server')}
- op = CloningOperation(dbname, from_server, to_server, **kwargs)
- ops[name] = op
- # Operations to launch
- if arguments.get('<dbname>', None):
- selected_ops = [ops[arguments['<dbname>']]]
- else:
- selected_ops = [op for op in ops.values() if op.is_default]
- if not selected_ops:
- raise RuntimeError('No operation to launch')
- # Ask for confirmation (except if '--yes' is in arguments)
- if prompt:
- logger.debug('Ask for confirmation...')
- msg = "The following operations will be launched:\n{}\n" \
- "WARNING: the existing local databases will be replaced" \
- "".format("\n".join(f"* {op}" for op in selected_ops))
- if not ask_confirmation(msg):
- logger.info("-- Operation cancelled by user --")
- return
- logger.debug('> User confirmed')
- # Run the cloning operations
- for op in selected_ops:
- op.run()
- if __name__ == '__main__':
- # load settings from settings.yml file
- settings = load_settings()
- # parse CLI arguments
- arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
- with Lockfile(path=HERE / '.clonedb.lock',
- on_error=lambda: logger.critical("A cloning process is already running, please wait...")):
- main(settings, arguments)
|