summaryrefslogtreecommitdiff
path: root/tools/merge_dist.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2015-09-24 16:47:32 +0200
committerLinus Nordberg <linus@nordu.net>2015-09-24 16:47:32 +0200
commit37cab78a89a4e6f7ec29c1cde1e68dfc5c46bd9d (patch)
tree6ed8ad47810ed8b6aa9aef0d3fe1a8ea459fb0f8 /tools/merge_dist.py
parent98c4ab09dd0f04bd3aa5567fcfb05cbb6a3a75f1 (diff)
Merge is now run by shell script tools/merge.
tools/merge run merge_fetch.py, merge_backup.py, merge_sth.py and merge_dist.py sequentially. TODO: test backupquorum != 0
Diffstat (limited to 'tools/merge_dist.py')
-rwxr-xr-x[-rw-r--r--]tools/merge_dist.py102
1 files changed, 54 insertions, 48 deletions
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index bfc0e61..0e85984 100644..100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -5,74 +5,56 @@
# See LICENSE for licensing information.
import sys
-import urllib2
-import base64
-import select
-from certtools import timing_point, check_sth_signature, \
- create_sth_signature, get_public_key_from_file
+import json
+from time import sleep
+from base64 import b64encode, b64decode
+from certtools import timing_point, \
+ create_ssl_context
from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \
- sendsth, hexencode, sendlog, sendentry, read_chain
+ sendsth, sendlog, sendentry, read_chain, parse_args
-def merge_dist(args, config, localconfig, sth_in):
+def merge_dist(args, config, localconfig, timestamp):
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
localconfig["nodename"]))
frontendnodes = config["frontendnodes"]
- signingnodes = config["signingnodes"]
- ctbaseurl = config["baseurl"]
- logpublickey = get_public_key_from_file(paths["logpublickey"])
mergedb = paths["mergedb"]
chainsdir = mergedb + "/chains"
logorderfile = mergedb + "/logorder"
+ sthfile = mergedb + "/sth"
+ create_ssl_context(cafile=paths["https_cacertfile"])
timing = timing_point()
- logorder = get_logorder(logorderfile)
- timing_point(timing, "get logorder")
-
- (tree_size, root_hash, timestamp) = sth_in
- tree_head_signature = None
- for signingnode in signingnodes:
- try:
- tree_head_signature = \
- create_sth_signature(tree_size, timestamp,
- root_hash,
- "https://%s/" % signingnode["address"],
- key=own_key)
- break
- except urllib2.URLError, err:
- print >>sys.stderr, err
- sys.stderr.flush()
- if tree_head_signature == None:
- print >>sys.stderr, "Could not contact any signing nodes"
- sys.exit(1)
-
- sth = {"tree_size": tree_size, "timestamp": timestamp,
- "sha256_root_hash": base64.b64encode(root_hash),
- "tree_head_signature": base64.b64encode(tree_head_signature)}
-
- check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
+ try:
+ sth = json.loads(open(sthfile, 'r').read())
+ except (IOError, ValueError):
+ print >>sys.stderr, "No valid STH file found in", sthfile
+ return timestamp
+ if sth['timestamp'] < timestamp:
+ print >>sys.stderr, "New STH file older than the previous one:", \
+ sth['timestamp'], "<", timestamp
+ return timestamp
+ if sth['timestamp'] == timestamp:
+ return timestamp
+ timestamp = sth['timestamp']
- timing_point(timing, "build sth")
-
- if args.timing:
- print >>sys.stderr, timing["deltatimes"]
- sys.stderr.flush()
-
- print hexencode(root_hash)
- sys.stdout.flush()
+ logorder = get_logorder(logorderfile, sth['tree_size'])
+ timing_point(timing, "get logorder")
for frontendnode in frontendnodes:
nodeaddress = "https://%s/" % frontendnode["address"]
nodename = frontendnode["name"]
timing = timing_point()
+
print >>sys.stderr, "distributing for node", nodename
sys.stderr.flush()
curpos = get_curpos(nodename, nodeaddress, own_key, paths)
timing_point(timing, "get curpos")
print >>sys.stderr, "current position", curpos
sys.stderr.flush()
- entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
+
+ entries = [b64encode(entry) for entry in logorder[curpos:]]
print >>sys.stderr, "sending log:",
sys.stderr.flush()
for chunk in chunks(entries, 1000):
@@ -83,7 +65,7 @@ def merge_dist(args, config, localconfig, sth_in):
if sendlogresult == None:
if trynumber == 1:
sys.exit(1)
- select.select([], [], [], 10.0)
+ sleep(10)
print >>sys.stderr, "tries left:", trynumber
sys.stderr.flush()
continue
@@ -98,20 +80,22 @@ def merge_dist(args, config, localconfig, sth_in):
timing_point(timing, "sendlog")
print >>sys.stderr, "log sent"
sys.stderr.flush()
+
missingentries = get_missingentries(nodename, nodeaddress, own_key,
paths)
timing_point(timing, "get missing")
+
print >>sys.stderr, "missing entries:", len(missingentries)
sys.stderr.flush()
fetched_entries = 0
print >>sys.stderr, "fetching missing entries",
sys.stderr.flush()
for missingentry in missingentries:
- ehash = base64.b64decode(missingentry)
+ ehash = b64decode(missingentry)
sendentryresult = sendentry(nodename, nodeaddress, own_key, paths,
read_chain(chainsdir, ehash), ehash)
if sendentryresult["result"] != "ok":
- print >>sys.stderr, "send sth:", sendentryresult
+ print >>sys.stderr, "sendentry:", sendentryresult
sys.exit(1)
fetched_entries += 1
if fetched_entries % 1000 == 0:
@@ -120,11 +104,33 @@ def merge_dist(args, config, localconfig, sth_in):
print >>sys.stderr
sys.stderr.flush()
timing_point(timing, "send missing")
+
+ print >>sys.stderr, "sending sth to node", nodename
+ sys.stderr.flush()
sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
if sendsthresult["result"] != "ok":
- print >>sys.stderr, "send sth:", sendsthresult
+ print >>sys.stderr, "sendsth:", sendsthresult
sys.exit(1)
timing_point(timing, "send sth")
+
if args.timing:
print >>sys.stderr, timing["deltatimes"]
sys.stderr.flush()
+
+ return timestamp
+
+def main():
+ """
+ Distribute missing entries and the STH to all frontend nodes.
+ """
+ args, config, localconfig = parse_args()
+ timestamp = 0
+ while True:
+ timestamp = merge_dist(args, config, localconfig, timestamp)
+ if args.interval is None:
+ break
+ print >>sys.stderr, "sleeping", args.interval, "seconds"
+ sleep(args.interval)
+
+if __name__ == '__main__':
+ sys.exit(main())