clonedb.py 16 KB


  1. """
  2. Script de clonage des bases de données MySql
  3. (requiert python 3.6+)
  4. > Configuration: settings.yml
  5. Usage:
  6. clonedb.py [-v] [-y] [<dbname>]
  7. clonedb.py (-h | --help)
  8. clonedb.py --version
  9. Options:
  10. -y, --yes Do not ask for confirmation
  11. -h --help Show this screen.
  12. --version Show version.
  13. @author: olivier.massot, 05-2020
  14. """
  15. import logging
  16. import os
  17. import re
  18. import threading
  19. from subprocess import Popen, PIPE, CalledProcessError
  20. import sys
  21. import pymysql
  22. import yaml
  23. from docopt import docopt
  24. from path import Path
  25. from sshtunnel import SSHTunnelForwarder
  26. import logging_
  27. from locker import Lockfile
  28. __VERSION__ = "0.1"
  29. HERE = Path(__file__).parent
  30. # Start logger
  31. logger = logging.getLogger('clonedb')
  32. logging_.start("clonedb", replace=True)
  33. # FIX the default ascii encoding on some linux dockers...
  34. sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
  35. # Options
  36. VERBOSE_DUMP = True
  37. # Utilities
  38. def load_settings():
  39. # Load settings
  40. with open(HERE / 'settings.yml', 'r') as f:
  41. settings = yaml.load(f, Loader=yaml.FullLoader)
  42. return settings
  43. class StderrPipeHandler(threading.Thread):
  44. """ Handle the stderr output from a Popen object """
  45. def __init__(self, logger_name, default_level=logging.INFO):
  46. """Setup the object with a logger and a loglevel
  47. and start the thread
  48. """
  49. threading.Thread.__init__(self)
  50. self.logger = logging.getLogger(logger_name)
  51. self.daemon = False
  52. self.default_level = default_level
  53. self.fdRead, self.fdWrite = os.pipe()
  54. self.pipeReader = os.fdopen(self.fdRead)
  55. self.start()
  56. def fileno(self):
  57. """Return the write file descriptor of the pipe
  58. """
  59. return self.fdWrite
  60. def process(self, line):
  61. self.logger.log(self.default_level, line.strip('\n'))
  62. def run(self):
  63. """Run the thread, logging everything.
  64. """
  65. for line in iter(self.pipeReader.readline, ''):
  66. self.process(line)
  67. self.pipeReader.close()
  68. def close(self):
  69. """Close the write end of the pipe.
  70. """
  71. os.close(self.fdWrite)
  72. class StderrMysqlDumpHandler(StderrPipeHandler):
  73. """ Handle and process the stderr output from a mysqldump process """
  74. _rx_newtable = re.compile(r'Retrieving table structure for table (\w+)')
  75. def process(self, line):
  76. line = line.strip('\n')
  77. match = self._rx_newtable.search(line)
  78. if match:
  79. print('.', end="", flush=True)
  80. # logger.info('** %s', match.group(1))
  81. logger.debug(line)
  82. def close(self):
  83. print('', flush=True)
  84. super().close()
  85. class SshTunnel:
  86. LOCAL_ADRESS = ('127.0.0.1', 6000)
  87. def __init__(self, host, remote_port, port=22, user="root", key_file="~/.ssh/id_rsa"):
  88. self.host = host
  89. self.remote_port = remote_port
  90. self.port = int(port)
  91. self.user = user
  92. self.key_file = key_file
  93. self._tunnel = SSHTunnelForwarder(
  94. (self.host, self.port),
  95. ssh_username=self.user,
  96. ssh_pkey=self.key_file,
  97. local_bind_address=self.LOCAL_ADRESS,
  98. remote_bind_address=('127.0.0.1', self.remote_port)
  99. )
  100. def start(self):
  101. self._tunnel.start()
  102. if not self._tunnel.tunnel_is_up[self.LOCAL_ADRESS]:
  103. raise RuntimeError('Unable to open the SSH Tunnel')
  104. def stop(self):
  105. self._tunnel.stop()
  106. class MySqlServer:
  107. def __init__(self, host, port, username, password, description="", ssh_tunnel=None):
  108. self.host = host
  109. self.port = port
  110. self.username = username
  111. self.password = password
  112. self.description = description or "no description"
  113. self.ssh_tunnel = ssh_tunnel
  114. self.cnn = None
  115. self.active_db = ""
  116. def __repr__(self):
  117. return f"{self.host}:{self.port} as {self.username} ({self.description})"
  118. def connect(self, autocommit=True):
  119. if self.ssh_tunnel:
  120. self.ssh_tunnel.start()
  121. host, port = self.ssh_tunnel.LOCAL_ADRESS
  122. else:
  123. host, port = self.host, self.port
  124. self.cnn = pymysql.connect(host=host,
  125. port=port,
  126. user=self.username,
  127. password=self.password,
  128. autocommit=autocommit)
  129. if not self.cnn.open:
  130. raise RuntimeError(f'Unable to connect to {self}')
  131. return self.cnn
  132. def set_constraints(self, active):
  133. self.exec_query(f"SET FOREIGN_KEY_CHECKS={int(bool(active))};")
  134. def warmup(self):
  135. self.set_constraints(True)
  136. self.exec_query(""" SET MAX_ALLOWED_PACKETS=1073741824;
  137. SET CONNECT_TIMEOUT=28800;
  138. SET WAIT_TIMEOUT=28800;
  139. SET INTERACTIVE_TIMEOUT=28800;
  140. """)
  141. def set_active_db(self, dbname):
  142. self.cnn.select_db(dbname)
  143. self.active_db = dbname
  144. def close(self):
  145. if self.cnn:
  146. self.cnn.close()
  147. if self.ssh_tunnel:
  148. self.ssh_tunnel.stop()
  149. def exec_query(self, sql):
  150. """ Execute the sql code and return the cursor """
  151. cursor = self.cnn.cursor()
  152. logger.debug(sql)
  153. cursor.execute(sql)
  154. return cursor
  155. def db_exists(self, dbname):
  156. cursor = self.exec_query(f"""SELECT SCHEMA_NAME
  157. FROM INFORMATION_SCHEMA.SCHEMATA
  158. WHERE SCHEMA_NAME = '{dbname}'""")
  159. row = cursor.fetchone()
  160. return row is not None
  161. def list_tables(self, dbname=""):
  162. """ Return a list of tables (not views!)
  163. for either the current database or the given one"""
  164. cursor = self.exec_query(
  165. "SHOW FULL TABLES{} WHERE Table_type='BASE TABLE';".format(f" FROM {dbname}" if dbname else ""))
  166. return (row[0] for row in cursor.fetchall())
  167. def list_views(self, dbname=""):
  168. """ Return a list of views
  169. for either the current database or the given one"""
  170. cursor = self.exec_query(
  171. "SHOW FULL TABLES{} WHERE Table_type='VIEW';".format(f" FROM {dbname}" if dbname else ""))
  172. return (row[0] for row in cursor.fetchall())
  173. def get_view_definition(self, view_name):
  174. cursor = self.exec_query(f"show create view {view_name}")
  175. return cursor.fetchone()[1]
  176. IGNORE = 0
  177. STRUCTURE_ONLY = 1
  178. STRUCTURE_AND_DATA = 2 # default
  179. class CloningOperation:
  180. def __init__(self, dbname, from_server, to_server, is_default=True, ignore_tables=None, structure_only=None,
  181. filter_tables=None, ignore_views=None, ignore_procedures=None, ignore_functions=None):
  182. self.dbname = dbname
  183. self.from_server = from_server
  184. self.to_server = to_server
  185. self.is_default = is_default
  186. self.ignore_tables = [re.compile(r) for r in ignore_tables] if ignore_tables else []
  187. self.structure_only = [re.compile(r) for r in structure_only] if structure_only else []
  188. self.filter_tables = [re.compile(r) for r in filter_tables] if filter_tables else []
  189. self.ignore_views = [re.compile(r) for r in ignore_views] if ignore_views else []
  190. self.ignore_procedures = [re.compile(r) for r in ignore_procedures] if ignore_procedures else []
  191. self.ignore_functions = [re.compile(r) for r in ignore_functions] if ignore_functions else []
  192. self.table_count = None
  193. def __repr__(self):
  194. return f"Clone {self.dbname} from {self.from_server} to {self.to_server} [ignored tables: {len(self.ignore_tables)}]"
  195. def _build_dump_command(self, dump_options=None, tables=None):
  196. # > https://dev.mysql.com/doc/refman/5.7/en/mysqldump.html#mysqldump-option-summary
  197. tables = tables or []
  198. dump_options = dump_options or []
  199. base_cmd = ["mysqldump",
  200. "--single-transaction",
  201. "-u", self.from_server.username,
  202. f"--password={self.from_server.password}",
  203. # "--compress"
  204. ]
  205. if VERBOSE_DUMP:
  206. base_cmd.append("--verbose")
  207. if self.from_server.ssh_tunnel:
  208. host, port = self.from_server.ssh_tunnel.LOCAL_ADRESS
  209. base_cmd += ["--host", host,
  210. "--port", str(port)]
  211. return base_cmd + dump_options + [self.dbname] + tables
  212. def _build_restore_command(self):
  213. return ["mysql",
  214. "-h", self.to_server.host,
  215. "-P", str(self.to_server.port),
  216. "-u", self.to_server.username,
  217. f"--password={self.to_server.password}",
  218. "-D", self.dbname
  219. ]
  220. def _run_piped_processes(self, dump_cmd, restore_cmd):
  221. logger.debug("Run: %s | %s", " ".join(map(str, dump_cmd)), " ".join(map(str, restore_cmd)))
  222. logger.debug("Dump command: %s", " ".join(map(str, dump_cmd)))
  223. logger.debug("Piped into: %s", " ".join(map(str, restore_cmd)))
  224. stderr_handler = StderrMysqlDumpHandler(logger.name)
  225. try:
  226. # noinspection PyTypeChecker
  227. with Popen(restore_cmd, stdin=PIPE, stdout=PIPE, stderr=stderr_handler) as mysql:
  228. # noinspection PyTypeChecker
  229. with Popen(dump_cmd, stdout=PIPE, stderr=stderr_handler) as mysqldump:
  230. mysql.stdin.write(mysqldump.stdout.read())
  231. if mysqldump.returncode or mysql.returncode:
  232. raise RuntimeError
  233. except (OSError, RuntimeError, CalledProcessError) as e:
  234. logger.error("Execution failed: %s", e)
  235. raise RuntimeError(f"An error happened at runtime: {e}")
  236. finally:
  237. stderr_handler.close()
  238. def run(self):
  239. logger.info(f"*** Cloning {self.dbname} ***")
  240. logger.info(f"> From {self.from_server}")
  241. logger.info(f"> To {self.to_server}")
  242. try:
  243. self.from_server.connect()
  244. self.from_server.set_active_db(self.dbname)
  245. logger.debug('Connected to %s', self.from_server)
  246. self.to_server.connect()
  247. logger.debug('Connected to %s', self.to_server)
  248. tables = {}
  249. for tname in self.from_server.list_tables():
  250. if any(rx.match(tname) for rx in self.ignore_tables):
  251. tables[tname] = IGNORE
  252. elif self.filter_tables and not any(rx.match(tname) for rx in self.filter_tables):
  253. tables[tname] = IGNORE
  254. elif any(rx.match(tname) for rx in self.structure_only):
  255. tables[tname] = STRUCTURE_ONLY
  256. else:
  257. tables[tname] = STRUCTURE_AND_DATA
  258. restore_cmd = self._build_restore_command()
  259. # Dump structure: --single-transaction --no-data --routines {dbname} tbname1 tname2 ...
  260. dump_structure_for = [t for t, s in tables.items() if s != IGNORE]
  261. dump_structure_cmd = self._build_dump_command(["--single-transaction", "--no-data", "--routines"],
  262. dump_structure_for)
  263. # Dump data: --no-create-info --skip-triggers {dbname} tbname1 tname2 ...
  264. dump_data_for = [t for t, s in tables.items() if s == STRUCTURE_AND_DATA]
  265. dump_data_cmd = self._build_dump_command(["--no-create-info", "--skip-triggers"],
  266. dump_data_for)
  267. if tables and not dump_structure_for and not dump_data_for:
  268. logging.warning('No table will be cloned')
  269. # Recreate the target DB
  270. logger.info("(Re)create the database")
  271. self.to_server.exec_query(f"DROP DATABASE IF EXISTS `{self.dbname}`;")
  272. self.to_server.exec_query(f"CREATE SCHEMA `{self.dbname}`;")
  273. self.to_server.set_active_db(self.dbname)
  274. # Run mysqldump
  275. try:
  276. if dump_structure_for:
  277. logger.info(f"Cloning structure for {len(dump_structure_for)} tables (on {len(tables)})...")
  278. self._run_piped_processes(dump_structure_cmd, restore_cmd)
  279. if dump_data_for:
  280. logger.info(f"Cloning data for {len(dump_data_for)} tables (on {len(tables)})...")
  281. self._run_piped_processes(dump_data_cmd, restore_cmd)
  282. logger.info(f"Cloning views...")
  283. for v in self.from_server.list_views(self.dbname):
  284. if any(rx.match(v) for rx in self.ignore_views):
  285. continue
  286. logger.debug('* cloning view %s', v)
  287. definition = self.from_server.get_view_definition(v)
  288. # force the definer
  289. definition = re.sub(r'DEFINER=`\w+`@`[\w\-.]+`',
  290. f"DEFINER=`{self.to_server.username}`@`localhost`",
  291. definition)
  292. try:
  293. self.to_server.exec_query(definition)
  294. except (pymysql.err.ProgrammingError, pymysql.err.InternalError) as e:
  295. logger.error('Unable to create the internal view {p}: {e}')
  296. logger.info("> the database was successfully cloned")
  297. except RuntimeError:
  298. logger.error(f"<!> An error happened while cloning the '{self.dbname}' database")
  299. finally:
  300. self.to_server.set_constraints(True)
  301. self.from_server.close()
  302. self.to_server.close()
  303. def ask_confirmation(msg):
  304. logger.debug('Ask for confirmation...')
  305. msg += "\nWould you like to continue? (yes/no)"
  306. while 1:
  307. answer = input(msg)
  308. if answer in ('oui', 'yes', 'y', 'o'):
  309. logger.debug(f"> user confirmed by answering '{answer}'")
  310. return True
  311. elif answer in ('non', 'no', 'n'):
  312. logger.debug(f"> user cancelled by answering '{answer}'")
  313. return False
  314. else:
  315. msg = "The answer could'nt be understood. Continue? (yes/no)"
  316. def main(settings, arguments):
  317. prompt = not arguments["--yes"]
  318. logger.info("Start db cloning utility...")
  319. logger.debug(f"Settings: {settings}")
  320. logger.debug(f"Arguments: {arguments}")
  321. # Load the servers' configuration
  322. servers = {}
  323. for server_name, server_settings in settings['servers'].items():
  324. hostname = server_settings['host']
  325. if 'ssh' in server_settings:
  326. ssh_tunnel = SshTunnel(hostname, server_settings['mysql']['port'], **server_settings['ssh'])
  327. else:
  328. ssh_tunnel = None
  329. server = MySqlServer(hostname,
  330. **server_settings['mysql'],
  331. description=server_settings['description'],
  332. ssh_tunnel=ssh_tunnel)
  333. servers[server_name] = server
  334. # Load the cloning ops' configuration
  335. ops = {}
  336. for name, args in settings['operations'].items():
  337. dbname = args['dbname']
  338. from_server = servers[args['from_server']]
  339. to_server = servers[args['to_server']]
  340. kwargs = {k: v for k, v in args.items() if k not in ('dbname', 'from_server', 'to_server')}
  341. op = CloningOperation(dbname, from_server, to_server, **kwargs)
  342. ops[name] = op
  343. # Operations to launch
  344. if arguments.get('<dbname>', None):
  345. selected_ops = [ops[arguments['<dbname>']]]
  346. else:
  347. selected_ops = [op for op in ops.values() if op.is_default]
  348. if not selected_ops:
  349. raise RuntimeError('No operation to launch')
  350. # Ask for confirmation (except if '--yes' is in arguments)
  351. if prompt:
  352. # Ask for confirmation
  353. msg = "The following operations will be launched:\n{}\n" \
  354. "WARNING: the existing local databases will be replaced" \
  355. "".format("\n".join(f"* {op}" for op in selected_ops))
  356. if not ask_confirmation(msg):
  357. logger.info("-- Operation cancelled by user --")
  358. return
  359. for op in selected_ops:
  360. op.run()
  361. if __name__ == '__main__':
  362. # load settings from settings.yml file
  363. settings = load_settings()
  364. # parse CLI arguments
  365. arguments = docopt(__doc__, help=__doc__, version=__VERSION__)
  366. with Lockfile(path=HERE / '.clonedb.lock',
  367. on_error=lambda: logger.critical("A cloning process is already running, please wait...")):
  368. main(settings, arguments)