summaryrefslogtreecommitdiff
path: root/tools/merge_dist.py
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2016-11-02 13:10:37 +0100
committerMagnus Ahltorp <map@kth.se>2016-11-02 13:10:37 +0100
commitb48c1689e36bdcc65a34b4ab12763478b072a716 (patch)
treef812f21492567b97aecba2c80dc0ce97a8e6eb6a /tools/merge_dist.py
parent6659a1c08dda0d6ec20f945e135b23b544db55a4 (diff)
Change algorithm for merge backup and merge dist
Diffstat (limited to 'tools/merge_dist.py')
-rwxr-xr-xtools/merge_dist.py140
1 files changed, 79 insertions, 61 deletions
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index 9d66cfd..ded25a1 100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -13,9 +13,71 @@ from base64 import b64encode, b64decode
from certtools import timing_point, \
create_ssl_context
from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \
- sendsth, sendlog, sendentries, parse_args, perm
+ publish_sth, sendlog, sendentries, parse_args, perm, get_frontend_verifiedsize, \
+ frontend_verify_entries
+
+def sendlog_helper(entries, curpos, nodename, nodeaddress, own_key, paths):
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = sendlog(nodename, nodeaddress,
+ own_key, paths,
+ {"start": curpos, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ sys.exit(1)
+ sleep(10)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ break
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "sendlog:", sendlogresult
+ sys.exit(1)
+ curpos += len(chunk)
+ print >>sys.stderr, curpos,
+ sys.stderr.flush()
+ print >>sys.stderr
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+
+def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing):
+ missingentries = get_missingentries(nodename, nodeaddress, own_key,
+ paths)
+ timing_point(timing, "get missing")
+
+ while missingentries:
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+
+ sent_entries = 0
+ print >>sys.stderr, "sending missing entries",
+ sys.stderr.flush()
+ with requests.sessions.Session() as session:
+ for missingentry_chunk in chunks(missingentries, 100):
+ missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
+ hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
+ sendentryresult = sendentries(nodename, nodeaddress,
+ own_key, paths,
+ hashes_and_entries, session)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "sendentries:", sendentryresult
+ sys.exit(1)
+ sent_entries += len(missingentry_hashes)
+ print >>sys.stderr, sent_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+
+ missingentries = get_missingentries(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+
def merge_dist(args, localconfig, frontendnodes, timestamp):
+ maxwindow = localconfig.get("maxwindow", 1000)
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -55,72 +117,28 @@ def merge_dist(args, localconfig, frontendnodes, timestamp):
print >>sys.stderr, "current position", curpos
sys.stderr.flush()
- entries = [b64encode(entry) for entry in logorder[curpos:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = sendlog(nodename, nodeaddress,
- own_key, paths,
- {"start": curpos, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- sleep(10)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
- curpos += len(chunk)
- print >>sys.stderr, curpos,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
-
- missingentries = get_missingentries(nodename, nodeaddress, own_key,
- paths)
- timing_point(timing, "get missing")
+ verifiedsize = get_frontend_verifiedsize(nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "get verified size")
+ print >>sys.stderr, "verified size", verifiedsize
+ assert verifiedsize >= curpos
+ while verifiedsize < len(logorder):
+ uptopos = min(verifiedsize + maxwindow, len(logorder))
+
+ entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
+ sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "sendlog")
- while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
-
- sent_entries = 0
- print >>sys.stderr, "sending missing entries",
- sys.stderr.flush()
- with requests.sessions.Session() as session:
- for missingentry_chunk in chunks(missingentries, 100):
- missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
- hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
- sendentryresult = sendentries(nodename, nodeaddress,
- own_key, paths,
- hashes_and_entries, session)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentries:", sendentryresult
- sys.exit(1)
- sent_entries += len(missingentry_hashes)
- print >>sys.stderr, sent_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
-
- missingentries = get_missingentries(nodename, nodeaddress,
- own_key, paths)
- timing_point(timing, "get missing")
+ fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing)
+ verifiedsize = frontend_verify_entries(nodename, nodeaddress, own_key, paths, uptopos)
+
print >>sys.stderr, "sending sth to node", nodename
sys.stderr.flush()
- sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
- if sendsthresult["result"] != "ok":
- print >>sys.stderr, "sendsth:", sendsthresult
+ publishsthresult = publish_sth(nodename, nodeaddress, own_key, paths, sth)
+ if publishsthresult["result"] != "ok":
+ print >>sys.stderr, "publishsth:", publishsthresult
sys.exit(1)
timing_point(timing, "send sth")