| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- """
- Script de clonage des bases de données mariaDb depuis le
- serveur de production vers le serveur local
- (requiert python 3.6+)
- > Configuration: settings.yml
- Usage:
- clonedb.py [-v] [-y] [<dbname>]
- clonedb.py (-h | --help)
- clonedb.py --version
- Options:
- -y, --yes Do not ask for confirmation
- -h --help Show this screen.
- --version Show version.
- @author: olivier.massot, 05-2020
- """
- import logging
- import subprocess
- import sys
- import yaml
- from docopt import docopt
- from path import Path
- import logging_
- from locker import Lockfile, AlreadyRunning
- from mysql import MySqlServer
- __VERSION__ = "0.1"
- HERE = Path(__file__).parent
- # Start logger
- logger = logging.getLogger('clonedb')
- logging_.start("clonedb", replace=True)
- # Load settings
- with open(HERE / 'settings.yml', 'r') as f:
- SETTINGS = yaml.load(f, Loader=yaml.FullLoader)
- # FIX the default ascii encoding on some linux dockers...
- sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
- # Utilities
- def _format_cmd(cmd):
- if type(cmd) is list:
- return " ".join(cmd)
- return cmd
- def _run_process(cmds):
- logger.debug("Run: %s", " ".join(map(str, cmds)))
- logpipe = logging_.LogPipe('clonedb')
- try:
- # noinspection PyTypeChecker
- p = subprocess.run(cmds, shell=False, stdin=sys.stdin, stdout=logpipe, stderr=logpipe)
- if p.returncode != 0:
- logger.error("Process was terminated by signal %s", p.returncode)
- raise RuntimeError()
- except (OSError, RuntimeError) as e:
- logger.error("Execution failed: %s", e)
- raise RuntimeError(f"An error happened at runtime: {e}")
- finally:
- logpipe.close()
- class SshTunnel:
- def __init__(self, host, port=22, user="root", key_file="~/.ssh/id_rsa"):
- self.host = host
- self.port = port
- self.user = user
- self.key_file = key_file
- def call(self, cmd):
- ssh_call = ["ssh",
- "-i", self.key_file,
- "-p", str(self.port),
- "-o", "StrictHostKeyChecking=no",
- f"{self.user}@{self.host}",
- ]
- ssh_cmd = ssh_call + cmd
- _run_process(ssh_cmd)
- def clonedb(ssh_tunnel, remote_mysql, local_mysql, dbname, options):
- logger.info(f"*** Cloning {dbname} ***")
- logger.debug(f"From {remote_mysql}")
- logger.debug(f"To {local_mysql}")
- dump_cmd = ["mysqldump",
- "--single-transaction",
- "-u", remote_mysql.username,
- f"--password={remote_mysql.password}",
- "--add-drop-database",
- "--compress",
- dbname]
- restore_cmd = ["mysql",
- "-h", local_mysql.host,
- "-P", str(local_mysql.port),
- "-u", local_mysql.username,
- f"--password={local_mysql.password}",
- "-D", dbname]
- cloning_cmd = dump_cmd + ['|'] + restore_cmd
- try:
- ssh_tunnel.call(cloning_cmd)
- logger.info("> the database was successfully cloned")
- except RuntimeError:
- logger.error(f"<!> An error happened while cloning the '{dbname}' database")
- def ask_confirmation(msg):
- logger.debug('Ask for confirmation...')
- msg += "\nWould you like to continue? (yes/no)"
- while 1:
- answer = input(msg)
- if answer in ('oui', 'yes', 'y', 'o'):
- logger.debug(f"> user confirmed by answering '{answer}'")
- return True
- elif answer in ('non', 'no', 'n'):
- logger.debug(f"> user cancelled by answering '{answer}'")
- return False
- else:
- msg = "The answer could'nt be understood. Continue? (yes/no)"
- def main(dbnames, prompt=True):
- ssh_tunnel = SshTunnel(**SETTINGS['ssh'])
- remote_mysql = MySqlServer(**SETTINGS['remote'])
- local_mysql = MySqlServer(**SETTINGS['local'])
- if prompt:
- # Ask for confirmation
- msg = f"""The followind databases will be cloned
- from '{remote_mysql}' to '{local_mysql}':
- > {', '.join(dbnames)}
- "WARNING: the existing local databases will be replaced"""
- if not ask_confirmation(msg):
- logger.info("-- Operation cancelled by user --")
- return
- # start to clone
- for dbname in dbnames:
- options = SETTINGS['databases'].get(dbname, None) or {}
- clonedb(ssh_tunnel, remote_mysql, local_mysql, dbname, options)
- if __name__ == '__main__':
- # parse CLI arguments
- arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
- prompt = "--yes" in arguments
- dbname = arguments.get('dbname', None)
- print('\n')
- logger.info("Start db cloning utility...")
- logger.debug(f"Arguments: {arguments}")
- logger.debug(f"Settings: {SETTINGS}")
- # database to clone
- if arguments.get('<dbname>', None):
- dbnames = [arguments['<dbname>']]
- else:
- dbnames = list(SETTINGS['databases'])
- with Lockfile(path=HERE / '.clonedb.lock',
- on_error=lambda: logger.critical("A cloning process is already running, please wait...")):
- main(dbnames, prompt)
|