From 8057d647ad76ccd2fef8f45be6c652a81e946d98 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 11 Jun 2015 17:45:14 +0200 Subject: Improve merge progress reporting --- tools/merge.py | 123 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 79 insertions(+), 44 deletions(-) (limited to 'tools/merge.py') diff --git a/tools/merge.py b/tools/merge.py index 8766491..9904b84 100755 --- a/tools/merge.py +++ b/tools/merge.py @@ -84,10 +84,10 @@ def get_new_entries(node, baseurl): parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": return [base64.b64decode(entry) for entry in parsed_result[u"entries"]] - print "ERROR: fetchnewentries", parsed_result + print >>sys.stderr, "ERROR: fetchnewentries", parsed_result sys.exit(1) except urllib2.HTTPError, e: - print "ERROR: fetchnewentries", e.read() + print >>sys.stderr, "ERROR: fetchnewentries", e.read() sys.exit(1) def get_entries(node, baseurl, hashes): @@ -100,10 +100,10 @@ def get_entries(node, baseurl, hashes): assert len(entries) == len(hashes) assert set(entries.keys()) == set(hashes) return entries - print "ERROR: getentry", parsed_result + print >>sys.stderr, "ERROR: getentry", parsed_result sys.exit(1) except urllib2.HTTPError, e: - print "ERROR: getentry", e.read() + print >>sys.stderr, "ERROR: getentry", e.read() sys.exit(1) def get_curpos(node, baseurl): @@ -112,10 +112,10 @@ def get_curpos(node, baseurl): parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": return parsed_result[u"position"] - print "ERROR: currentposition", parsed_result + print >>sys.stderr, "ERROR: currentposition", parsed_result sys.exit(1) except urllib2.HTTPError, e: - print "ERROR: currentposition", e.read() + print >>sys.stderr, "ERROR: currentposition", e.read() sys.exit(1) def sendlog(node, baseurl, submission): @@ -124,14 +124,16 @@ def sendlog(node, baseurl, submission): json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return json.loads(result) except urllib2.HTTPError, e: - print "ERROR: sendlog", e.read() + print >>sys.stderr, "ERROR: sendlog", e.read() + sys.stderr.flush() return None except ValueError, e: - print "==== FAILED REQUEST ====" - print submission - print "======= RESPONSE =======" - print result - print "========================" + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() raise e def sendentry(node, baseurl, entry, hash): @@ -141,14 +143,15 @@ def sendentry(node, baseurl, entry, hash): verifynode=node, publickeydir=paths["publickeys"]) return json.loads(result) except urllib2.HTTPError, e: - print "ERROR: sendentry", e.read() + print >>sys.stderr, "ERROR: sendentry", e.read() sys.exit(1) except ValueError, e: - print "==== FAILED REQUEST ====" - print hash - print "======= RESPONSE =======" - print result - print "========================" + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, hash + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() raise e def sendsth(node, baseurl, submission): @@ -157,14 +160,15 @@ def sendsth(node, baseurl, submission): json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return json.loads(result) except urllib2.HTTPError, e: - print "ERROR: sendsth", e.read() + print >>sys.stderr, "ERROR: sendsth", e.read() sys.exit(1) except ValueError, e: - print "==== FAILED REQUEST ====" - print submission - print "======= RESPONSE =======" - print result - print "========================" + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() raise e def get_missingentries(node, baseurl): @@ -173,10 +177,10 @@ def get_missingentries(node, baseurl): parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": return parsed_result[u"entries"] - print "ERROR: missingentries", parsed_result + print >>sys.stderr, "ERROR: missingentries", parsed_result sys.exit(1) except urllib2.HTTPError, e: - print "ERROR: missingentries", e.read() + print >>sys.stderr, "ERROR: missingentries", e.read() sys.exit(1) def chunks(l, n): @@ -195,7 +199,8 @@ new_entries = set() entries_to_fetch = {} for storagenode in storagenodes: - print "getting new entries from", storagenode["name"] + print >>sys.stderr, "getting new entries from", storagenode["name"] + sys.stderr.flush() new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"])) new_entries.update(new_entries_per_node[storagenode["name"]]) entries_to_fetch[storagenode["name"]] = [] @@ -206,7 +211,8 @@ timing_point(timing, "get new entries") new_entries -= certsinlog -print "adding", len(new_entries), "entries" +print >>sys.stderr, "adding", len(new_entries), "entries" +sys.stderr.flush() if args.nomerge: sys.exit(0) @@ -222,7 +228,8 @@ verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]], added_entries = 0 for storagenode in storagenodes: - print "getting", len(entries_to_fetch[storagenode["name"]]), "entries from", storagenode["name"] + print >>sys.stderr, "getting %d entries from %s:" % (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), + sys.stderr.flush() for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk) for hash in chunk: @@ -233,9 +240,14 @@ for storagenode in storagenodes: logorder.append(hash) certsinlog.add(hash) added_entries += 1 + print >>sys.stderr, added_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() fsync_logorder() timing_point(timing, "add entries") -print "added", added_entries, "entries" +print >>sys.stderr, "added", added_entries, "entries" +sys.stderr.flush() verifycert.communicate(struct.pack("I", 0)) @@ -252,12 +264,14 @@ for secondary in secondaries: localdir = localdir + "/" print >>sys.stderr, "copying database to secondary:", remotehost + sys.stderr.flush() rsyncstatus = subprocess.call(["rsync", "-r", "--append", "--rsh=ssh", localdir, remotedir]) if rsyncstatus: print >>sys.stderr, "rsync failed:", rsyncstatus sys.exit(1) print >>sys.stderr, "verifying database at secondary:", remotehost + sys.stderr.flush() verifysecondary = subprocess.Popen(["ssh", remotehost, secondary["verifycommand"], secondary["mergedir"]], stdout=subprocess.PIPE) @@ -275,7 +289,8 @@ for signingnode in signingnodes: root_hash, "https://%s/" % signingnode["address"], key=own_key) break except urllib2.URLError, e: - print e + print >>sys.stderr, e + sys.stderr.flush() if tree_head_signature == None: print >>sys.stderr, "Could not contact any signing nodes" sys.exit(1) @@ -289,19 +304,25 @@ check_sth_signature(ctbaseurl, sth, publickey=logpublickey) timing_point(timing, "build sth") if args.timing: - print timing["deltatimes"] + print >>sys.stderr, timing["deltatimes"] + sys.stderr.flush() -print "root hash", base64.b16encode(root_hash) +print base64.b16encode(root_hash) +sys.stdout.flush() for frontendnode in frontendnodes: nodeaddress = "https://%s/" % frontendnode["address"] nodename = frontendnode["name"] timing = timing_point() - print "distributing for node", nodename + print >>sys.stderr, "distributing for node", nodename + sys.stderr.flush() curpos = get_curpos(nodename, nodeaddress) timing_point(timing, "get curpos") - print "current position", curpos + print >>sys.stderr, "current position", curpos + sys.stderr.flush() entries = [base64.b64encode(entry) for entry in logorder[curpos:]] + print >>sys.stderr, "sending log:", + sys.stderr.flush() for chunk in chunks(entries, 1000): for trynumber in range(5, 0, -1): sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk}) @@ -309,31 +330,45 @@ for frontendnode in frontendnodes: if trynumber == 1: sys.exit(1) select.select([], [], [], 10.0) - print "tries left:", trynumber + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() continue break if sendlogresult["result"] != "ok": - print "sendlog:", sendlogresult + print >>sys.stderr, "sendlog:", sendlogresult sys.exit(1) curpos += len(chunk) - print curpos, - sys.stdout.flush() + print >>sys.stderr, curpos, + sys.stderr.flush() + print >>sys.stderr timing_point(timing, "sendlog") - print "log sent" + print >>sys.stderr, "log sent" + sys.stderr.flush() missingentries = get_missingentries(nodename, nodeaddress) timing_point(timing, "get missing") - print "missing entries:", len(missingentries) + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + fetched_entries = 0 + print >>sys.stderr, "fetching missing entries", + sys.stderr.flush() for missingentry in missingentries: hash = base64.b64decode(missingentry) sendentryresult = sendentry(nodename, nodeaddress, read_chain(chainsdir, hash), hash) if sendentryresult["result"] != "ok": - print "send sth:", sendentryresult + print >>sys.stderr, "send sth:", sendentryresult sys.exit(1) + fetched_entries += 1 + if added_entries % 1000 == 0: + print >>sys.stderr, fetched_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() timing_point(timing, "send missing") sendsthresult = sendsth(nodename, nodeaddress, sth) if sendsthresult["result"] != "ok": - print "send sth:", sendsthresult + print >>sys.stderr, "send sth:", sendsthresult sys.exit(1) timing_point(timing, "send sth") if args.timing: - print timing["deltatimes"] + print >>sys.stderr, timing["deltatimes"] + sys.stderr.flush() -- cgit v1.1