diff options
author | Linus Nordberg <linus@nordu.net> | 2016-11-30 16:47:23 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2016-11-30 16:47:23 +0100 |
commit | bff5d58fcce0534cf4774df386ff448261b28c20 (patch) | |
tree | 076adf9062bb24b210f8712b4aeb303b3023643d /tools/merge_dist.py | |
parent | 720473257b4b7ab9916826ae87e617d1df138260 (diff) |
Parallelise merge_dist.
Also deduplicate some code.
Diffstat (limited to 'tools/merge_dist.py')
-rwxr-xr-x | tools/merge_dist.py | 131 |
1 files changed, 81 insertions, 50 deletions
diff --git a/tools/merge_dist.py b/tools/merge_dist.py index d612600..bc9c676 100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -14,11 +14,12 @@ import logging from time import sleep from base64 import b64encode, b64decode from os import stat +from multiprocessing import Process, Pipe from certtools import timing_point, create_ssl_context from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \ publish_sth, sendlog, sendentries, parse_args, perm, \ get_frontend_verifiedsize, frontend_verify_entries, \ - waitforfile, flock_ex_or_fail, Status, loginit + waitforfile, flock_ex_or_fail, Status, loginit, start_worker def sendlog_helper(entries, curpos, nodename, nodeaddress, own_key, paths, statusupdates): @@ -70,12 +71,51 @@ def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, own_key, paths) timing_point(timing, "get missing") -def merge_dist(args, localconfig, frontendnodes, timestamp): - maxwindow = localconfig.get("maxwindow", 1000) +def do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s): + timing = timing_point() paths = localconfig["paths"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) + maxwindow = localconfig.get("maxwindow", 1000) + nodename = frontendnode["name"] + nodeaddress = "https://%s/" % frontendnode["address"] + + logging.info("distributing for node %s", nodename) + curpos = get_curpos(nodename, nodeaddress, own_key, paths) + timing_point(timing, "get curpos") + logging.info("current position %d", curpos) + + verifiedsize = \ + get_frontend_verifiedsize(nodename, nodeaddress, own_key, paths) + timing_point(timing, "get verified size") + logging.info("verified size %d", verifiedsize) + + assert verifiedsize >= curpos + + while verifiedsize < len(logorder): + uptopos = min(verifiedsize + maxwindow, len(logorder)) + + entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] + sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s) + timing_point(timing, "sendlog") + + fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s) + + verifiedsize = frontend_verify_entries(nodename, nodeaddress, own_key, paths, uptopos) + + logging.info("sending sth to node %s", nodename) + publishsthresult = publish_sth(nodename, nodeaddress, own_key, paths, sth) + if publishsthresult["result"] != "ok": + logging.info("publishsth: %s", publishsthresult) + sys.exit(1) + timing_point(timing, "send sth") + + if args.timing: + logging.debug("timing: merge_dist: %s", timing["deltatimes"]) + +def merge_dist(args, localconfig, frontendnodes, timestamp): + paths = localconfig["paths"] mergedb = paths["mergedb"] chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" @@ -89,56 +129,49 @@ def merge_dist(args, localconfig, frontendnodes, timestamp): sth = json.loads(open(sthfile, 'r').read()) except (IOError, ValueError): logging.warning("No valid STH file found in %s", sthfile) - return timestamp + return timestamp, 0 if sth['timestamp'] < timestamp: logging.warning("New STH file older than the previous one: %d < %d", - sth['timestamp'], timestamp) - return timestamp + sth['timestamp'], timestamp) + return timestamp, 0 if sth['timestamp'] == timestamp: - return timestamp + return timestamp, 0 timestamp = sth['timestamp'] logorder = get_logorder(logorderfile, sth['tree_size']) timing_point(timing, "get logorder") + procs = {} for frontendnode in frontendnodes: - nodeaddress = "https://%s/" % frontendnode["address"] nodename = frontendnode["name"] - timing = timing_point() - - logging.info("distributing for node %s", nodename) - curpos = get_curpos(nodename, nodeaddress, own_key, paths) - timing_point(timing, "get curpos") - logging.info("current position %d", curpos) - - verifiedsize = get_frontend_verifiedsize(nodename, nodeaddress, own_key, paths) - timing_point(timing, "get verified size") - logging.info("verified size %d", verifiedsize) - - assert verifiedsize >= curpos - while verifiedsize < len(logorder): - uptopos = min(verifiedsize + maxwindow, len(logorder)) - - entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] - sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s) - timing_point(timing, "sendlog") + if args.mergeinterval: + name = 'dist_%s' % nodename + p, pipe = start_worker(name, + lambda _, argv: do_send(*(argv)), + (args, localconfig, frontendnode, logorder, sth, chainsdb, s)) + procs[p] = (nodename, pipe) + else: + do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s) - fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s) + if not args.mergeinterval: + return timestamp, 0 - verifiedsize = frontend_verify_entries(nodename, nodeaddress, own_key, paths, uptopos) + failures = 0 + while True: + for p in list(procs): + if not p.is_alive(): + p.join() + nodename, _ = procs[p] + if p.exitcode != 0: + logging.warning("%s failure: %d", nodename, p.exitcode) + failures += 1 + del procs[p] + if not procs: + break + sleep(1) - logging.info("sending sth to node %s", nodename) - publishsthresult = publish_sth(nodename, nodeaddress, own_key, paths, sth) - if publishsthresult["result"] != "ok": - logging.info("publishsth: %s", publishsthresult) - sys.exit(1) - timing_point(timing, "send sth") - - if args.timing: - logging.debug("timing: merge_dist: %s", timing["deltatimes"]) - - return timestamp + return timestamp, failures def main(): """ @@ -146,12 +179,12 @@ def main(): Distribute missing entries and the STH to all frontend nodes. - If `--mergeinterval', wait until 'sth' is updated and read it and - start distributing again. + If `--mergeinterval', start over again. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] + sth_path = localconfig["paths"]["mergedb"] + "/sth" lockfile = mergedb + "/.merge_dist.lock" timestamp = 0 @@ -166,20 +199,18 @@ def main(): else: nodes = [n for n in config["frontendnodes"] if n["name"] in args.node] - if args.mergeinterval is None: - if merge_dist(args, localconfig, nodes, timestamp) < 0: - return 1 - return 0 - - sth_path = localconfig["paths"]["mergedb"] + "/sth" sth_statinfo = waitforfile(sth_path) while True: - if merge_dist(args, localconfig, nodes, timestamp) < 0: - return 1 + timestamp, failures = merge_dist(args, localconfig, nodes, timestamp) + if not args.mergeinterval: + break sth_statinfo_old = sth_statinfo while sth_statinfo == sth_statinfo_old: - sleep(args.mergeinterval / 30) + sleep(max(3, args.mergeinterval / 10)) + if failures > 0: + break sth_statinfo = stat(sth_path) + return 0 if __name__ == '__main__': |