""" Script de clonage des bases de données mariaDb depuis le serveur de production vers le serveur local (requiert python 3.6+) > Configuration: settings.yml Usage: clonedb.py [-v] [-y] [] clonedb.py (-h | --help) clonedb.py --version Options: -y, --yes Do not ask for confirmation -h --help Show this screen. --version Show version. @author: olivier.massot, 05-2020 """ import logging import subprocess import sys import yaml from docopt import docopt from path import Path import logging_ from locker import Lockfile, AlreadyRunning from mysql import MySqlServer __VERSION__ = "0.1" HERE = Path(__file__).parent # Start logger logger = logging.getLogger('clonedb') logging_.start("clonedb", replace=True) # Load settings with open(HERE / 'settings.yml', 'r') as f: SETTINGS = yaml.load(f, Loader=yaml.FullLoader) # FIX the default ascii encoding on some linux dockers... sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1) # Utilities def _format_cmd(cmd): if type(cmd) is list: return " ".join(cmd) return cmd def _run_process(cmds): logger.debug("Run: %s", " ".join(map(str, cmds))) logpipe = logging_.LogPipe('clonedb') try: # noinspection PyTypeChecker p = subprocess.run(cmds, shell=False, stdin=sys.stdin, stdout=logpipe, stderr=logpipe) if p.returncode != 0: logger.error("Process was terminated by signal %s", p.returncode) raise RuntimeError() except (OSError, RuntimeError) as e: logger.error("Execution failed: %s", e) raise RuntimeError(f"An error happened at runtime: {e}") finally: logpipe.close() class SshTunnel: def __init__(self, host, port=22, user="root", key_file="~/.ssh/id_rsa"): self.host = host self.port = port self.user = user self.key_file = key_file def call(self, cmd): ssh_call = ["ssh", "-i", self.key_file, "-p", str(self.port), "-o", "StrictHostKeyChecking=no", f"{self.user}@{self.host}", ] ssh_cmd = ssh_call + cmd _run_process(ssh_cmd) def clonedb(ssh_tunnel, remote_mysql, local_mysql, dbname, options): logger.info(f"*** Cloning {dbname} ***") logger.debug(f"From {remote_mysql}") logger.debug(f"To {local_mysql}") dump_cmd = ["mysqldump", "--single-transaction", "-u", remote_mysql.username, f"--password={remote_mysql.password}", "--add-drop-database", "--compress", dbname] restore_cmd = ["mysql", "-h", local_mysql.host, "-P", str(local_mysql.port), "-u", local_mysql.username, f"--password={local_mysql.password}", "-D", dbname] cloning_cmd = dump_cmd + ['|'] + restore_cmd try: ssh_tunnel.call(cloning_cmd) logger.info("> the database was successfully cloned") except RuntimeError: logger.error(f" An error happened while cloning the '{dbname}' database") def ask_confirmation(msg): logger.debug('Ask for confirmation...') msg += "\nWould you like to continue? (yes/no)" while 1: answer = input(msg) if answer in ('oui', 'yes', 'y', 'o'): logger.debug(f"> user confirmed by answering '{answer}'") return True elif answer in ('non', 'no', 'n'): logger.debug(f"> user cancelled by answering '{answer}'") return False else: msg = "The answer could'nt be understood. Continue? (yes/no)" def main(dbnames, prompt=True): ssh_tunnel = SshTunnel(**SETTINGS['ssh']) remote_mysql = MySqlServer(**SETTINGS['remote']) local_mysql = MySqlServer(**SETTINGS['local']) if prompt: # Ask for confirmation msg = f"""The followind databases will be cloned from '{remote_mysql}' to '{local_mysql}': > {', '.join(dbnames)} "WARNING: the existing local databases will be replaced""" if not ask_confirmation(msg): logger.info("-- Operation cancelled by user --") return # start to clone for dbname in dbnames: options = SETTINGS['databases'].get(dbname, None) or {} clonedb(ssh_tunnel, remote_mysql, local_mysql, dbname, options) if __name__ == '__main__': # parse CLI arguments arguments = docopt(__doc__, help=__doc__, version=__VERSION__) prompt = "--yes" in arguments dbname = arguments.get('dbname', None) print('\n') logger.info("Start db cloning utility...") logger.debug(f"Arguments: {arguments}") logger.debug(f"Settings: {SETTINGS}") # database to clone if arguments.get('', None): dbnames = [arguments['']] else: dbnames = list(SETTINGS['databases']) with Lockfile(path=HERE / '.clonedb.lock', on_error=lambda: logger.critical("A cloning process is already running, please wait...")): main(dbnames, prompt)