| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640 |
- #!/usr/bin/python3
- """
- Script de clonage des bases de données MySql
- (requiert python 3.6+)
- > Configuration: settings.yml
- Usage:
- clonedb.py [-v] [-y] [<opname>...]
- clonedb.py (-h | --help)
- clonedb.py --version
- Options:
- -y, --yes Do not ask for confirmation
- -h --help Show this screen.
- --version Show version.
- @author: olivier.massot, 05-2020
- """
- import logging
- import re
- from subprocess import Popen, PIPE, CalledProcessError
- import sys
- import pymysql
- import yaml
- from docopt import docopt
- from path import Path
- from core import logging_
- from core.docker import resolve_docker_ip
- from core.locker import Lockfile
- from core.pipe_handler import PipeHandler
- from core.ssh import SshTunnel
- from core.prompt import ask_confirmation
- __VERSION__ = "0.2"
- HERE = Path(__file__).parent
- # Start logger
- LOG_DIR = HERE / 'log'
- LOG_DIR.mkdir_p()
- logger = logging.getLogger('clonedb')
- logging_.start("clonedb", filename=LOG_DIR / 'clonedb.log', replace=True)
- # FIX the default ascii encoding on some linux dockers...
- sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
- # Options
- SHOW_PROGRESSION = True
- LOG_PIPES_OUTPUT = True
- LOG_MYSQL_QUERIES = True
- MAX_ALLOWED_PACKET = 1073741824
- CHARSET_TO_ENCODING = {
- 'utf8': 'utf-8',
- 'utf8mb4': 'utf-8',
- 'latin1': 'latin'
- }
- # Utilities
- def load_settings():
- """ Load the settings from the 'settings.yml' file
- If there is no such file, the base settings.yml file is created
- """
- settings_file = HERE / 'settings.yml'
- if not settings_file.exists():
- Path(HERE / 'settings.yml.dist').copy(HERE / 'settings.yml')
- with open(settings_file, 'r') as f:
- return yaml.load(f, Loader=yaml.FullLoader)
- def _print(msg, end=False):
- msg = msg.ljust(80)
- print(f'\r{msg}', end='' if not end else '\n', flush=True)
- class MysqldumpHandler(PipeHandler):
- """ Handle and process the stdout / stderr output from a mysqldump process
- """
- _rx_prog = re.compile(r'Retrieving table structure for table (\w+)')
- _log_all = LOG_PIPES_OUTPUT
- def __init__(self, logger_name, level, total_prog):
- super().__init__(logger_name, level)
- self.total_prog = total_prog
- self.prog = 0
- self._last_logged = ""
- def process(self, line):
- """ Process the last line that was read
- """
- line = line.strip('\n')
- if SHOW_PROGRESSION:
- match = self._rx_prog.search(line)
- if match:
- self.log_new_table(match.group(1), "dumping")
- if self._log_all:
- logger.debug(line)
- def log_new_table(self, tname, action_name=""):
- if tname == self._last_logged:
- return
- self.prog += 1
- logger.debug('... %s %s', action_name, tname)
- _print(f'{action_name} `{tname}` [{self.prog} / {self.total_prog}]')
- self._last_logged = tname
- def log_end(self):
- _print(f'\r-- done --', end=True)
- def close(self):
- """ Close the write end of the pipe.
- """
- super().close()
- class MysqlHandler(MysqldumpHandler):
- """ Handle and process the stdout / stderr output from a mysql process
- """
- _rx_prog = re.compile(r'^((?:CREATE TABLE )|(?:INSERT INTO ))`(\w+)`')
- _log_all = LOG_PIPES_OUTPUT
- _action_name = "restoring"
- def process(self, line):
- """ Process the last line that was read
- """
- line = line.strip('\n')
- if SHOW_PROGRESSION:
- match = self._rx_prog.search(line)
- if match:
- action_name = "restoring {}".format('structure of'
- if 'CREATE' in match.group(1)
- else 'data of')
- self.log_new_table(match.group(2), action_name)
- if self._log_all:
- logger.debug(line)
- class MySqlServer:
- """ A server hosting a Mysql instance
- """
- def __init__(self, host, port, username, password, description="", ssh_tunnel=None):
- self.host = host
- self.port = port
- self.username = username
- self.password = password
- self.description = description[:30]
- self.ssh_tunnel = ssh_tunnel
- self.cnn = None
- self.active_db = ""
- def __repr__(self):
- s = f"{self.host}:{self.port} as {self.username}"
- if self.description:
- s = f"{self.description} ({s})"
- return s
- def connect(self, autocommit=True):
- """ Establish the connection to the Mysql server
- @see https://pymysql.readthedocs.io/en/latest/modules/connections.html
- """
- if self.ssh_tunnel:
- self.ssh_tunnel.start()
- host, port = self.ssh_tunnel.LOCAL_ADRESS
- else:
- host, port = self.host, self.port
- self.cnn = pymysql.connect(host=host,
- port=port,
- user=self.username,
- password=self.password,
- autocommit=autocommit,
- max_allowed_packet=MAX_ALLOWED_PACKET,
- )
- if not self.cnn.open:
- raise RuntimeError(f'Unable to connect to {self}')
- return self.cnn
- def set_active_db(self, dbname):
- """ set the active database
- """
- self.cnn.select_db(dbname)
- self.active_db = dbname
- def close(self):
- """ Close the connection to the database
- and the ssh tunnel if one is opened
- """
- if self.cnn:
- self.cnn.close()
- if self.ssh_tunnel:
- self.ssh_tunnel.stop()
- logger.debug(f'{self} - connection closed')
- def exec_query(self, sql):
- """ Execute the sql code and return the resulting cursor
- @see https://pymysql.readthedocs.io/en/latest/modules/cursors.html
- """
- self.cnn.ping(reconnect=True)
- cursor = self.cnn.cursor()
- if LOG_MYSQL_QUERIES:
- logger.debug(sql)
- cursor.execute(sql)
- return cursor
- def db_exists(self, dbname):
- """ Return True if the database exists
- """
- cursor = self.exec_query(f"""SELECT SCHEMA_NAME
- FROM INFORMATION_SCHEMA.SCHEMATA
- WHERE SCHEMA_NAME = '{dbname}'""")
- row = cursor.fetchone()
- return row is not None
- def get_db_charset(self, dbname):
- """ return the charset (encoding) of the mysql database """
- cursor = self.exec_query(f"""SELECT default_character_set_name
- FROM information_schema.SCHEMATA S
- WHERE schema_name = '{dbname}';""")
- return cursor.fetchone()[0]
- def list_tables(self, dbname=""):
- """ Return a list of tables (but not views!)
- for either the currently selected database,
- or the one given as a parameter"""
- cursor = self.exec_query(
- "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else ""))
- return (row[0] for row in cursor.fetchall())
- def list_views(self, dbname=""):
- """ Return a list of views
- for either the currently selected database,
- or the one given as a parameter"""
- cursor = self.exec_query(
- "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else ""))
- return (row[0] for row in cursor.fetchall())
- def get_view_definition(self, view_name, set_definer=""):
- """ Return the SQL create statement for the view
- If 'set_definer' is not empty, the username in the 'SET DEFINER' part
- of the create statement is replaced by the one given
- """
- cursor = self.exec_query(f"show create view {view_name}")
- definition = cursor.fetchone()[1]
- if set_definer:
- # force a new definer
- definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`',
- f"DEFINER=`{set_definer}`@`\1`",
- definition)
- return definition
- class MysqlUser:
- def __init__(self, username, pwd, host='localhost'):
- self.username = username
- self.pwd = pwd
- self.host = host
- # Operation status
- UNKNOWN = 0
- SUCCESS = 1
- FAILURE = 2
- # Behaviors for the tables cloning
- IGNORE = 0
- STRUCTURE_ONLY = 1
- STRUCTURE_AND_DATA = 2 # -> default behavior
- class CloningOperation:
- """ A database cloning operation between two Mysql servers
- """
- def __init__(self, name, dbname, from_server, to_server, grant=None,
- is_default=True, ignore_tables=None, structure_only=None,
- filter_tables=None, ignore_views=None, compress=True):
- self.name = name
- self.dbname = dbname
- self.from_server = from_server
- self.to_server = to_server
- self.grant = grant if grant is not None else []
- self.is_default = is_default
- self.compress = compress
- self.ignore_tables = [re.compile(f"^{r}$") for r in ignore_tables] if ignore_tables else []
- self.structure_only = [re.compile(f"^{r}$") for r in structure_only] if structure_only else []
- self.filter_tables = [re.compile(f"^{r}$") for r in filter_tables] if filter_tables else []
- self.ignore_views = [re.compile(f"^{r}$") for r in ignore_views] if ignore_views else []
- self.status = UNKNOWN
- def __repr__(self):
- return f"Cloning {self.dbname} from {self.from_server} to {self.to_server}"
- def _build_dump_command(self, dump_options=None, tables=None):
- """ Build a mysqldump command line and return it as a
- ready-to-consume list for Popen
- @see https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary
- """
- tables = tables or []
- dump_options = dump_options or []
- base_cmd = ["mysqldump",
- "--single-transaction",
- "-u", self.from_server.username,
- f"--password={self.from_server.password}",
- f"--max-allowed-packet={MAX_ALLOWED_PACKET}",
- "--skip-add-drop-table",
- "--skip-add-locks",
- "--skip-comments",
- "--column-statistics=0"
- ]
- if self.compress:
- base_cmd.append("--compress")
- if SHOW_PROGRESSION:
- base_cmd.append("--verbose")
- if self.from_server.ssh_tunnel:
- host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS
- base_cmd += ["--host", host,
- "--port", str(port)]
- return base_cmd + dump_options + [self.dbname] + tables
- def _build_restore_command(self):
- """ Build a mysql command line and return it as a
- ready-to-consume list for Popen
- @see https://dev.mysql.com/doc/refman/8.0/en/mysql-command-options.html#option_mysql_quick
- """
- init_command = f"set global max_allowed_packet={MAX_ALLOWED_PACKET};" \
- "set global wait_timeout=28800;" \
- "set global interactive_timeout=28800;"
- cmd = ["mysql",
- "-h", self.to_server.host,
- "-P", str(self.to_server.port),
- "-u", self.to_server.username,
- f"--password={self.to_server.password}",
- f"--init-command={init_command}",
- "--reconnect",
- "--quick",
- "--unbuffered",
- "--wait",
- "--verbose",
- "-D", self.dbname
- ]
- # if LOG_PIPES_OUTPUT:
- # cmd.append("--verbose")
- if self.compress:
- cmd.append("--compress")
- return cmd
- @staticmethod
- def _clean_sql(bin_cmd, encoding):
- """ clean some old sql declaration from mysql 5 in order to preserve
- a compatibility between servers"""
- cmd = bin_cmd.decode('latin')
- # To ensure compatibility between mysql5 and 8+
- cmd = re.sub(",?NO_AUTO_CREATE_USER", "", cmd)
- return cmd.encode('latin')
- @staticmethod
- def _run_piped_processes(
- dump_cmd,
- restore_cmd,
- tbl_count,
- encoding):
- """ Run the dump and the restore commands by piping them
- The output of the mysqldump process is piped into the input of the mysql one
- """
- logger.debug(">>> Dump command: %s", " ".join(map(str, dump_cmd)))
- logger.debug(">>> Piped into: %s", " ".join(map(str, restore_cmd)))
- mysqldump_handler = MysqldumpHandler(logger.name, logging.INFO, tbl_count)
- mysql_handler = MysqlHandler(logger.name, logging.INFO, tbl_count)
- try:
- # noinspection PyTypeChecker
- with Popen(restore_cmd, stdin=PIPE, stdout=mysql_handler, stderr=mysql_handler) as mysql:
- # noinspection PyTypeChecker
- with Popen(dump_cmd, stdout=PIPE, stderr=mysqldump_handler) as mysqldump:
- cmd = mysqldump.stdout.read()
- cmd = CloningOperation._clean_sql(cmd, encoding)
- mysql.stdin.write(cmd)
- if mysqldump.returncode:
- raise RuntimeError('mysqldump returned a non zero code')
- if mysql.returncode:
- raise RuntimeError('mysql returned a non zero code')
- mysql_handler.log_end()
- except (OSError, RuntimeError, CalledProcessError) as e:
- logger.error("Execution failed: %s", e)
- raise RuntimeError(f"An error happened at runtime: {e}")
- finally:
- mysqldump_handler.close()
- mysql_handler.close()
- def run(self):
- """ Run the cloning op
- """
- logger.info(f"*** Cloning {self.dbname} ***")
- logger.info(f"> From {self.from_server}")
- logger.info(f"> To {self.to_server}")
- try:
- self.from_server.connect()
- self.from_server.set_active_db(self.dbname)
- logger.debug('Connected to %s', self.from_server)
- self.to_server.connect()
- logger.debug('Connected to %s', self.to_server)
- # Create admin users if not exist
- for user in self.grant:
- exists = self.to_server.exec_query(
- f"SELECT count(*) FROM mysql.user WHERE User = '{user.username}' and Host='{user.host}';"
- ).fetchone()[0] > 0
- if not exists:
- logger.info(f'Create user %s@%s on %s', user.username, user.host, self.to_server)
- self.to_server.exec_query(
- f"CREATE USER '{user.username}'@'{user.host}' IDENTIFIED BY '{user.pwd}';")
- # List tables
- tables = {}
- for tname in self.from_server.list_tables():
- if any(rx.match(tname) for rx in self.ignore_tables):
- tables[tname] = IGNORE
- elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables):
- tables[tname] = IGNORE
- elif any(rx.match(tname) for rx in self.structure_only):
- tables[tname] = STRUCTURE_ONLY
- else:
- tables[tname] = STRUCTURE_AND_DATA
- restore_cmd = self._build_restore_command()
- # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ...
- dump_structure_for = [t for t, s in tables.items() if s != IGNORE]
- dump_structure_cmd = self._build_dump_command(["--no-data", "--routines"],
- dump_structure_for)
- # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ...
- dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA]
- dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"],
- dump_data_for)
- if tables and not dump_structure_for and not dump_data_for:
- logging.warning('No table will be cloned')
- # Recreate the target DB
- logger.info("(Re)create the database")
- self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;")
- self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;")
- self.to_server.set_active_db(self.dbname)
- # Following is to avoid conflict between mysql 5 and mysql 8+
- # (@see https://stackoverflow.com/questions/50336378/variable-sql-mode-cant-be-set-to-the-value-of-no-auto-create-user)
- self.to_server.exec_query(f"SET GLOBAL log_bin_trust_function_creators = 1;")
- # Grant admin users if any
- for user in self.grant:
- self.to_server.exec_query(
- f"GRANT ALL ON {self.dbname}.* TO '{user.username}'@'{user.host}';"
- )
- # charsets
- charset = self.from_server.get_db_charset(self.dbname)
- encoding = CHARSET_TO_ENCODING[charset]
- # Run mysqldump
- try:
- if dump_structure_for:
- logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...")
- self._run_piped_processes(
- dump_structure_cmd,
- restore_cmd,
- len(dump_structure_for),
- encoding
- )
- if dump_data_for:
- logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...")
- self._run_piped_processes(
- dump_data_cmd,
- restore_cmd,
- len(dump_data_for),
- encoding
- )
- logger.info(f"Cloning views...")
- self.from_server.set_active_db(self.dbname)
- self.to_server.set_active_db(self.dbname)
- for v in self.from_server.list_views(self.dbname):
- if any(rx.match(v) for rx in self.ignore_views):
- continue
- logger.debug('* cloning view %s', v)
- definition = self.from_server.get_view_definition(v, self.to_server.username)
- try:
- self.to_server.exec_query(definition)
- except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e:
- logger.error('Unable to create the internal view %s: %s', v, e)
- self.status = SUCCESS
- logger.info("> the database was successfully cloned")
- except RuntimeError:
- self.status = FAILURE
- logger.error("<!> An error happened while cloning the '%s' database", self.dbname)
- finally:
- self.from_server.close()
- self.to_server.close()
- def main(settings, arguments):
- prompt = not arguments["--yes"]
- logger.info("Start db cloning utility...")
- logger.debug(f"Settings: %s", str(settings).replace('\r', '').replace('\n', ''))
- logger.debug(f"Arguments: %s", str(arguments).replace('\r', '').replace('\n', ''))
- # Load the servers' configuration
- servers = {}
- if 'servers' not in settings:
- raise RuntimeError(f'Missing section in settings.yml: {servers}')
- for server_name, server_settings in settings['servers'].items():
- hostname = server_settings['host']
- match = re.search(r"^docker:(\w+)$", hostname)
- if match:
- logger.debug("resolve IP for docker %s", match.group(1))
- ip = resolve_docker_ip(match.group(1))
- logger.debug("substitute '%s' to '%s' as hostname", ip, hostname)
- hostname = ip
- if 'ssh' in server_settings:
- ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh'])
- else:
- ssh_tunnel = None
- server = MySqlServer(hostname,
- **server_settings['mysql'],
- description=server_settings['description'],
- ssh_tunnel=ssh_tunnel)
- servers[server_name] = server
- # Load the users' configuration
- users = {}
- for username, args in settings.get('users', {}).items():
- host = args.get('host', 'localhost')
- pwd = args.get('pwd', '')
- users[username] = MysqlUser(username, pwd, host)
- # Load the cloning ops' configuration
- ops = {}
- if 'operations' not in settings:
- raise RuntimeError(f'Missing section in settings.yml: {servers}')
- for name, args in settings['operations'].items():
- dbname = args['dbname']
- from_server = servers[args['from_server']]
- to_server = servers[args['to_server']]
- grant = args.get('grant', [])
- admins = [user for username, user in users.items() if username in grant]
- kwargs = {k: v for k, v in args.items() \
- if k not in ('dbname', 'from_server', 'to_server', 'grant')}
- op = CloningOperation(name, dbname, from_server, to_server, admins, **kwargs)
- ops[name] = op
- # Operations to launch
- if arguments.get('<opname>', None):
- selected_ops = []
- for opname in arguments['<opname>']:
- try:
- selected_ops.append(ops[opname])
- except KeyError:
- logger.error('No operation found with name %s', opname)
- else:
- selected_ops = [op for op in ops.values() if op.is_default]
- if not selected_ops:
- logger.error('No operations to launch')
- return
- # Ask for confirmation (except if '--yes' is in arguments)
- if prompt:
- logger.debug('Ask for confirmation...')
- msg = "The following operations will be launched:\n{}\n" \
- "WARNING: the existing local databases will be replaced" \
- "".format("\n".join(f"* {op}" for op in selected_ops))
- if not ask_confirmation(msg):
- logger.info("-- Operation cancelled by user --")
- return
- logger.debug('> User confirmed')
- # Create the user if they do not exist
- # CREATE USER IF NOT EXISTS 'user'@'localhost' IDENTIFIED BY 'password';
- # GRANT ALL ON opentalent TO 'opentalent'@'localhost';
- # Run the cloning operations
- for op in selected_ops:
- op.run()
- failures = [op.name for op in selected_ops if op.status == FAILURE]
- if failures:
- logger.error("WARNING! the following operations failed: %s", ', '.join(failures))
- if __name__ == '__main__':
- # load settings from settings.yml file
- settings = load_settings()
- # parse CLI arguments
- arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
- with Lockfile(path=HERE / '.clonedb.lock',
- on_error=lambda: logger.critical("A cloning process is already running, please wait...")):
- main(settings, arguments)
|