| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- """
- Script de clonage des bases de données mariaDb depuis le
- serveur de production vers le serveur local
- (requiert python 3.6+)
- > Configuration: settings.yml
- Usage:
- clonedb.py [-v] [-y] [dbname]
- clonedb.py (-h | --help)
- clonedb.py --version
- Options:
- -v, --verbose Displays more informations
- -y, --yes Do not ask for confirmation
- -h --help Show this screen.
- --version Show version.
- @author: olivier.massot, 05-2020
- """
- import logging
- import sys
- import mysql.connector
- import yaml
- from docopt import docopt
- from path import Path
- import logging_
- __VERSION__ = "0.1"
- HERE = Path(__file__).parent
- LOCKFILE = HERE / '.clonedb.lock'
- with open(HERE / 'settings.yml', 'r') as f:
- SETTINGS = yaml.load(f, Loader=yaml.FullLoader)
- class MySqlServer:
- def __init__(self, host, port, username, password, description=""):
- self.host = host
- self.port = port
- self.username = username
- self.password = password
- self.description = description or "no description"
- self.cnn = None
- def __repr__(self):
- return f"{self.username}@{self.host}:{self.port} ({self.description})"
- def connect(self):
- logger.debug(f'Try to connect to {self}')
- self.cnn = mysql.connector.connect(
- host=self.host,
- port=self.port,
- user=self.username,
- passwd=self.password
- )
- def db_exists(self, dbname):
- cursor = self.cnn.cursor()
- cursor.execute(f"""SELECT SCHEMA_NAME
- FROM INFORMATION_SCHEMA.SCHEMATA
- WHERE SCHEMA_NAME = '{dbname}'""")
- row = cursor.fetchone()
- return row is not None
- def clonedb(from_server, to_server, dbname):
- logger.info(f"*** Cloning {dbname} ***")
- logger.debug(f"From {from_server}")
- logger.debug(f"To {to_server}")
- to_server.cnn.cmd_query('DROP DATABASE {dbname};')
- to_server.cnn.cmd_query('CREATE DATABASE {dbname};')
- dump_cmd = "mysqldump --single-transaction -u ${USERDBROOTREMOTE} --password=${PASSDBROOTREMOTE} $1"
- ssh_dump_cmd = f"ssh -i {SSHEXPLOITATIONKEY} -p {PORT} -C exploitation@${IPPROD} {dump_cmd}"
- restore_cmd = "mysql -h ${3} -P ${4} -u ${USERDBROOT} --password=${PASSDBROOT} -D $2"
- cmd = f"{ssh_dump_cmd} | {restore_cmd}"
- cmd = "ssh -i ${SSHEXPLOITATIONKEY} -p ${PORT} -C exploitation@${IPPROD} mysqldump --single-transaction -u ${USERDBROOTREMOTE} --password=${PASSDBROOTREMOTE} $1 | mysql -h ${3} -P ${4} -u ${USERDBROOT} --password=${PASSDBROOT} -D $2"
- if __name__ == '__main__':
- arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
- verbose = '--verbose' in arguments
- logger = logging.getLogger('clonedb')
- logging_.start("clonedb", logging.DEBUG if verbose else logging.INFO, replace=True)
- if LOCKFILE.exists():
- logger.critical("Une opération de clonage est déjà en cours. "
- "veuillez patienter ou annuler le traitement existant")
- sys.exit(1)
- logger.debug(f"Arguments given: {arguments}")
- logger.debug(f"Settings: {SETTINGS}")
- if verbose:
- logger.debug("Mode: Verbose")
- remote_server = MySqlServer(**SETTINGS['remote'])
- remote_server.connect()
- local_server = MySqlServer(**SETTINGS['local'])
- local_server.connect()
- if arguments['dbname']:
- dbnames = [arguments['dbname']]
- else:
- dbnames = [db for db in SETTINGS['databases']]
- # Demande confirmation
- if not '--yes' in arguments:
- logger.debug('Ask for confirmation...')
- answer = ""
- msg = f"Les bases de données suivantes vont être clonées depuis " \
- f"'{remote_server} vers '{local_server}':\n' " \
- f"> {', '.join(dbnames)} \n" \
- "ATTENTION: Les bases existantes seront remplacées.\n" \
- "Voulez vous continuer? (oui/non)"
- while 1:
- answer = input(msg)
- if answer in ('oui', 'yes', 'y', 'o'):
- logger.debug(f"> user confirmed by answering '{answer}'")
- break
- elif answer in ('non', 'no', 'n'):
- logger.info("-- Opération annulée par l'utilisateur --")
- sys.exit(1)
- else:
- msg = "La réponse n'a pas été comprise. Voulez vous continuer? (oui/non)"
- # check for databases existence
- missing_db = [dbname for dbname in dbnames if not remote_server.db_exists(dbname)]
- if missing_db:
- for missing in missing_db:
- logger.critical(
- f"<!> Aucune base de donnée nommée '{missing}' trouvée sur {remote_server}")
- logger.critical("-- Opération annulée --")
- sys.exit(1)
- # start to clone
- try:
- LOCKFILE.touch()
- for dbname in dbnames:
- clonedb(remote_server, local_server, dbname)
- logger.info("-- Clonage des bases de données terminé --")
- finally:
- LOCKFILE.remove()
|