From 4e1f11749167c7c79a3fc6a0e146487e7cc1022c Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Sun, 27 Nov 2016 23:59:25 +0100 Subject: Parallelise merge_backup. We're still failing the tests when the merge secondary goes away, sometimes. --- tools/merge_backup.py | 139 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 94 insertions(+), 45 deletions(-) (limited to 'tools/merge_backup.py') diff --git a/tools/merge_backup.py b/tools/merge_backup.py index f25b22a..c057b5a 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -15,12 +15,14 @@ import logging from time import sleep from base64 import b64encode, b64decode from os import stat +from multiprocessing import Process, Pipe, active_children from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ - get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status + get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, \ + Status, loginit def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): @@ -97,12 +99,63 @@ def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timin timing_point(timing, "verifyroot") return root_hash -def merge_backup(args, config, localconfig, secondaries): +def do_it(backupargs): + secondary, localconfig, chainsdb, logorder, s, timing = backupargs maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] + nodename = secondary["name"] + nodeaddress = "https://%s/" % secondary["address"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) + tree_size = len(logorder) + + logging.info("backing up to node %s", nodename) + verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + timing_point(timing, "get verified size") + logging.info("verified size %d", verifiedsize) + + if verifiedsize == tree_size: + root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing) + else: + while verifiedsize < tree_size: + uptopos = min(verifiedsize + maxwindow, tree_size) + + entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] + sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s) + timing_point(timing, "sendlog") + + fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s) + + root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing) + + verifiedsize = uptopos + setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize) + return root_hash + +def worker(pipe, backupargs): + root_hash = do_it(backupargs) + pipe.send(root_hash) + return 0 + +def start_worker(backupargs): + _, _, _, nodename, _, _, _, _, _, _, _ = backupargs + parent_conn, child_conn = Pipe() + p = Process(target=worker, + args=(child_conn, backupargs), + name='backup_%s' % nodename) + p.start() + return p, parent_conn + +def update_backupfile(mergedb, nodename, tree_size, root_hash): + backuppath = mergedb + "/verified." + nodename + backupdata = {"tree_size": tree_size, + "sha256_root_hash": hexencode(root_hash)} + logging.debug("writing to %s: %s", backuppath, backupdata) + write_file(backuppath, backupdata) + +def merge_backup(args, config, localconfig, secondaries): + paths = localconfig["paths"] mergedb = paths["mergedb"] chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" @@ -117,43 +170,43 @@ def merge_backup(args, config, localconfig, secondaries): tree_size = len(logorder) timing_point(timing, "get logorder") + procs = {} for secondary in secondaries: if secondary["name"] == config["primarymergenode"]: continue - nodeaddress = "https://%s/" % secondary["address"] nodename = secondary["name"] timing = timing_point() - logging.info("backing up to node %s", nodename) - verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) - timing_point(timing, "get verified size") - logging.info("verified size %d", verifiedsize) - if verifiedsize == tree_size: - root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing) + backupargs = (secondary, localconfig, chainsdb, logorder, s, timing) + if args.mergeinterval: + pipe_mine, pipe_theirs = Pipe() + p = Process(target=worker, + args=(pipe_theirs, backupargs), + name='backup_%s' % nodename) + p.start() + procs[p] = (nodename, pipe_mine) else: - while verifiedsize < tree_size: - uptopos = min(verifiedsize + maxwindow, tree_size) - - entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] - sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s) - timing_point(timing, "sendlog") - - fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s) - - root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing) - - verifiedsize = uptopos - setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize) - - backuppath = mergedb + "/verified." + nodename - backupdata = {"tree_size": tree_size, - "sha256_root_hash": hexencode(root_hash)} - logging.debug("writing to %s: %s", backuppath, backupdata) - write_file(backuppath, backupdata) - - if args.timing: - logging.debug("timing: merge_backup: %s", timing["deltatimes"]) - + root_hash = do_it(backupargs) + update_backupfile(mergedb, nodename, tree_size, root_hash) + + if args.mergeinterval: + while True: + for p in list(procs): + if not p.is_alive(): + p.join() + nodename, pipe = procs[p] + if p.exitcode != 0: + logging.warning("%s failure: %d", nodename, p.exitcode) + continue + root_hash = pipe.recv() + update_backupfile(mergedb, nodename, tree_size, root_hash) + del procs[p] + if not procs: + break + sleep(1) + + if args.timing: + logging.debug("timing: merge_backup: %s", timing["deltatimes"]) return 0 def main(): @@ -163,9 +216,9 @@ def main(): Read 'logorder' up until what's indicated by 'fetched' and build the tree. - Distribute entries to all secondaries, write tree size and tree head - to 'backup.' files as each secondary is verified to have - the entries. + Distribute entries to all secondaries, in parallel if `--mergeinterval'. + Write tree size and tree head to 'backup.' files as each + secondary is verified to have the entries. If `--mergeinterval', wait until 'fetched' is updated and read it and start over from the point where 'logorder' is read. @@ -190,28 +243,24 @@ def main(): all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] - create_ssl_context(cafile=paths["https_cacertfile"]) - if len(args.node) == 0: nodes = all_secondaries else: nodes = [n for n in all_secondaries if n["name"] in args.node] - if args.mergeinterval is None: - return merge_backup(args, config, localconfig, nodes) - + create_ssl_context(cafile=paths["https_cacertfile"]) fetched_statinfo = waitforfile(fetched_path) - + retval = 0 while True: - err = merge_backup(args, config, localconfig, nodes) - if err: - return err + retval = merge_backup(args, config, localconfig, nodes) + if retval or not args.mergeinterval: + break fetched_statinfo_old = fetched_statinfo while fetched_statinfo == fetched_statinfo_old: sleep(max(3, args.mergeinterval / 10)) fetched_statinfo = stat(fetched_path) - return 0 + return retval if __name__ == '__main__': sys.exit(main()) -- cgit v1.1