From 8057d647ad76ccd2fef8f45be6c652a81e946d98 Mon Sep 17 00:00:00 2001
From: Magnus Ahltorp <map@kth.se>
Date: Thu, 11 Jun 2015 17:45:14 +0200
Subject: Improve merge progress reporting

---
 tools/merge.py | 123 ++++++++++++++++++++++++++++++++++++---------------------
 1 file changed, 79 insertions(+), 44 deletions(-)

(limited to 'tools')

diff --git a/tools/merge.py b/tools/merge.py
index 8766491..9904b84 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -84,10 +84,10 @@ def get_new_entries(node, baseurl):
         parsed_result = json.loads(result)
         if parsed_result.get(u"result") == u"ok":
             return [base64.b64decode(entry) for entry in parsed_result[u"entries"]]
-        print "ERROR: fetchnewentries", parsed_result
+        print >>sys.stderr, "ERROR: fetchnewentries", parsed_result
         sys.exit(1)
     except urllib2.HTTPError, e:
-        print "ERROR: fetchnewentries", e.read()
+        print >>sys.stderr, "ERROR: fetchnewentries", e.read()
         sys.exit(1)
 
 def get_entries(node, baseurl, hashes):
@@ -100,10 +100,10 @@ def get_entries(node, baseurl, hashes):
             assert len(entries) == len(hashes)
             assert set(entries.keys()) == set(hashes)
             return entries
-        print "ERROR: getentry", parsed_result
+        print >>sys.stderr, "ERROR: getentry", parsed_result
         sys.exit(1)
     except urllib2.HTTPError, e:
-        print "ERROR: getentry", e.read()
+        print >>sys.stderr, "ERROR: getentry", e.read()
         sys.exit(1)
 
 def get_curpos(node, baseurl):
@@ -112,10 +112,10 @@ def get_curpos(node, baseurl):
         parsed_result = json.loads(result)
         if parsed_result.get(u"result") == u"ok":
             return parsed_result[u"position"]
-        print "ERROR: currentposition", parsed_result
+        print >>sys.stderr, "ERROR: currentposition", parsed_result
         sys.exit(1)
     except urllib2.HTTPError, e:
-        print "ERROR: currentposition", e.read()
+        print >>sys.stderr, "ERROR: currentposition", e.read()
         sys.exit(1)
 
 def sendlog(node, baseurl, submission):
@@ -124,14 +124,16 @@ def sendlog(node, baseurl, submission):
             json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
         return json.loads(result)
     except urllib2.HTTPError, e:
-        print "ERROR: sendlog", e.read()
+        print >>sys.stderr, "ERROR: sendlog", e.read()
+        sys.stderr.flush()
         return None
     except ValueError, e:
-        print "==== FAILED REQUEST ===="
-        print submission
-        print "======= RESPONSE ======="
-        print result
-        print "========================"
+        print >>sys.stderr, "==== FAILED REQUEST ===="
+        print >>sys.stderr, submission
+        print >>sys.stderr, "======= RESPONSE ======="
+        print >>sys.stderr, result
+        print >>sys.stderr, "========================"
+        sys.stderr.flush()
         raise e
 
 def sendentry(node, baseurl, entry, hash):
@@ -141,14 +143,15 @@ def sendentry(node, baseurl, entry, hash):
             verifynode=node, publickeydir=paths["publickeys"])
         return json.loads(result)
     except urllib2.HTTPError, e:
-        print "ERROR: sendentry", e.read()
+        print >>sys.stderr, "ERROR: sendentry", e.read()
         sys.exit(1)
     except ValueError, e:
-        print "==== FAILED REQUEST ===="
-        print hash
-        print "======= RESPONSE ======="
-        print result
-        print "========================"
+        print >>sys.stderr, "==== FAILED REQUEST ===="
+        print >>sys.stderr, hash
+        print >>sys.stderr, "======= RESPONSE ======="
+        print >>sys.stderr, result
+        print >>sys.stderr, "========================"
+        sys.stderr.flush()
         raise e
 
 def sendsth(node, baseurl, submission):
@@ -157,14 +160,15 @@ def sendsth(node, baseurl, submission):
             json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
         return json.loads(result)
     except urllib2.HTTPError, e:
-        print "ERROR: sendsth", e.read()
+        print >>sys.stderr, "ERROR: sendsth", e.read()
         sys.exit(1)
     except ValueError, e:
-        print "==== FAILED REQUEST ===="
-        print submission
-        print "======= RESPONSE ======="
-        print result
-        print "========================"
+        print >>sys.stderr, "==== FAILED REQUEST ===="
+        print >>sys.stderr, submission
+        print >>sys.stderr, "======= RESPONSE ======="
+        print >>sys.stderr, result
+        print >>sys.stderr, "========================"
+        sys.stderr.flush()
         raise e
 
 def get_missingentries(node, baseurl):
@@ -173,10 +177,10 @@ def get_missingentries(node, baseurl):
         parsed_result = json.loads(result)
         if parsed_result.get(u"result") == u"ok":
             return parsed_result[u"entries"]
-        print "ERROR: missingentries", parsed_result
+        print >>sys.stderr, "ERROR: missingentries", parsed_result
         sys.exit(1)
     except urllib2.HTTPError, e:
-        print "ERROR: missingentries", e.read()
+        print >>sys.stderr, "ERROR: missingentries", e.read()
         sys.exit(1)
 
 def chunks(l, n):
@@ -195,7 +199,8 @@ new_entries = set()
 entries_to_fetch = {}
 
 for storagenode in storagenodes:
-    print "getting new entries from", storagenode["name"]
+    print >>sys.stderr, "getting new entries from", storagenode["name"]
+    sys.stderr.flush()
     new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"]))
     new_entries.update(new_entries_per_node[storagenode["name"]])
     entries_to_fetch[storagenode["name"]] = []
@@ -206,7 +211,8 @@ timing_point(timing, "get new entries")
 
 new_entries -= certsinlog
 
-print "adding", len(new_entries), "entries"
+print >>sys.stderr, "adding", len(new_entries), "entries"
+sys.stderr.flush()
 
 if args.nomerge:
     sys.exit(0)
@@ -222,7 +228,8 @@ verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]],
 
 added_entries = 0
 for storagenode in storagenodes:
-    print "getting", len(entries_to_fetch[storagenode["name"]]), "entries from", storagenode["name"]
+    print >>sys.stderr, "getting %d entries from %s:" % (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
+    sys.stderr.flush()
     for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
         entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk)
         for hash in chunk:
@@ -233,9 +240,14 @@ for storagenode in storagenodes:
             logorder.append(hash)
             certsinlog.add(hash)
             added_entries += 1
+        print >>sys.stderr, added_entries,
+        sys.stderr.flush()
+    print >>sys.stderr
+    sys.stderr.flush()
 fsync_logorder()
 timing_point(timing, "add entries")
-print "added", added_entries, "entries"
+print >>sys.stderr, "added", added_entries, "entries"
+sys.stderr.flush()
 
 verifycert.communicate(struct.pack("I", 0))
 
@@ -252,12 +264,14 @@ for secondary in secondaries:
         localdir = localdir + "/"
 
     print >>sys.stderr, "copying database to secondary:", remotehost
+    sys.stderr.flush()
     rsyncstatus = subprocess.call(["rsync", "-r", "--append", "--rsh=ssh", localdir, remotedir])
     if rsyncstatus:
         print >>sys.stderr, "rsync failed:", rsyncstatus
         sys.exit(1)
 
     print >>sys.stderr, "verifying database at secondary:", remotehost
+    sys.stderr.flush()
     verifysecondary = subprocess.Popen(["ssh", remotehost, secondary["verifycommand"], secondary["mergedir"]],
                                     stdout=subprocess.PIPE)
 
@@ -275,7 +289,8 @@ for signingnode in signingnodes:
                                                    root_hash, "https://%s/" % signingnode["address"], key=own_key)
         break
     except urllib2.URLError, e:
-        print e
+        print >>sys.stderr, e
+        sys.stderr.flush()
 if tree_head_signature == None:
     print >>sys.stderr, "Could not contact any signing nodes"
     sys.exit(1)
@@ -289,19 +304,25 @@ check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
 timing_point(timing, "build sth")
 
 if args.timing:
-    print timing["deltatimes"]
+    print >>sys.stderr, timing["deltatimes"]
+    sys.stderr.flush()
 
-print "root hash", base64.b16encode(root_hash)
+print base64.b16encode(root_hash)
+sys.stdout.flush()
 
 for frontendnode in frontendnodes:
     nodeaddress = "https://%s/" % frontendnode["address"]
     nodename = frontendnode["name"]
     timing = timing_point()
-    print "distributing for node", nodename
+    print >>sys.stderr, "distributing for node", nodename
+    sys.stderr.flush()
     curpos = get_curpos(nodename, nodeaddress)
     timing_point(timing, "get curpos")
-    print "current position", curpos
+    print >>sys.stderr, "current position", curpos
+    sys.stderr.flush()
     entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
+    print >>sys.stderr, "sending log:",
+    sys.stderr.flush()
     for chunk in chunks(entries, 1000):
         for trynumber in range(5, 0, -1):
             sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk})
@@ -309,31 +330,45 @@ for frontendnode in frontendnodes:
                 if trynumber == 1:
                     sys.exit(1)
                 select.select([], [], [], 10.0)
-                print "tries left:", trynumber
+                print >>sys.stderr, "tries left:", trynumber
+                sys.stderr.flush()
                 continue
             break
         if sendlogresult["result"] != "ok":
-            print "sendlog:", sendlogresult
+            print >>sys.stderr, "sendlog:", sendlogresult
             sys.exit(1)
         curpos += len(chunk)
-        print curpos,
-        sys.stdout.flush()
+        print >>sys.stderr, curpos,
+        sys.stderr.flush()
+    print >>sys.stderr
     timing_point(timing, "sendlog")
-    print "log sent"
+    print >>sys.stderr, "log sent"
+    sys.stderr.flush()
     missingentries = get_missingentries(nodename, nodeaddress)
     timing_point(timing, "get missing")
-    print "missing entries:", len(missingentries)
+    print >>sys.stderr, "missing entries:", len(missingentries)
+    sys.stderr.flush()
+    fetched_entries = 0
+    print >>sys.stderr, "fetching missing entries",
+    sys.stderr.flush()
     for missingentry in missingentries:
         hash = base64.b64decode(missingentry)
         sendentryresult = sendentry(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
         if sendentryresult["result"] != "ok":
-            print "send sth:", sendentryresult
+            print >>sys.stderr, "send sth:", sendentryresult
             sys.exit(1)
+        fetched_entries += 1
+        if added_entries % 1000 == 0:
+            print >>sys.stderr, fetched_entries,
+            sys.stderr.flush()
+    print >>sys.stderr
+    sys.stderr.flush()
     timing_point(timing, "send missing")
     sendsthresult = sendsth(nodename, nodeaddress, sth)
     if sendsthresult["result"] != "ok":
-        print "send sth:", sendsthresult
+        print >>sys.stderr, "send sth:", sendsthresult
         sys.exit(1)
     timing_point(timing, "send sth")
     if args.timing:
-        print timing["deltatimes"]
+        print >>sys.stderr, timing["deltatimes"]
+        sys.stderr.flush()
-- 
cgit v1.1