summaryrefslogtreecommitdiff
path: root/tools/merge.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2015-09-22 21:59:48 +0200
committerLinus Nordberg <linus@nordu.net>2015-11-10 12:48:46 +0100
commit2ab3e4fcf5118b4606cbb98ae5fea8bd64183dec (patch)
tree5d8a61fb94ee5c4e73e01610aa386da5bd083fe2 /tools/merge.py
parenta11ef0bb9db8425c4a7e1c879b4a0116d168a1fb (diff)
Split merge.py into three pieces.
Diffstat (limited to 'tools/merge.py')
-rwxr-xr-xtools/merge.py542
1 files changed, 31 insertions, 511 deletions
diff --git a/tools/merge.py b/tools/merge.py
index 2065a2d..212c171 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -1,518 +1,38 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2014, NORDUnet A/S.
+# Copyright (c) 2014-2015, NORDUnet A/S.
# See LICENSE for licensing information.
import argparse
-import json
-import base64
-import urllib
-import urllib2
-import sys
-import time
-import ecdsa
-import hashlib
-import urlparse
-import os
import yaml
-import select
-import struct
-from certtools import build_merkle_tree, create_sth_signature, \
- check_sth_signature, get_eckey_from_file, timing_point, http_request, \
- get_public_key_from_file, get_leaf_hash, decode_certificate_chain, \
- create_ssl_context
-from mergetools import parselogrow, get_logorder, read_chain, \
- verify_entry
-
-parser = argparse.ArgumentParser(description="")
-parser.add_argument('--config', help="System configuration", required=True)
-parser.add_argument('--localconfig', help="Local configuration", required=True)
-parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge")
-parser.add_argument("--timing", action='store_true', help="Print timing information")
-args = parser.parse_args()
-
-config = yaml.load(open(args.config))
-localconfig = yaml.load(open(args.localconfig))
-
-ctbaseurl = config["baseurl"]
-frontendnodes = config["frontendnodes"]
-storagenodes = config["storagenodes"]
-secondaries = config.get("mergenodes", [])
-paths = localconfig["paths"]
-mergedb = paths["mergedb"]
-
-signingnodes = config["signingnodes"]
-create_ssl_context(cafile=paths["https_cacertfile"])
-
-chainsdir = mergedb + "/chains"
-logorderfile = mergedb + "/logorder"
-
-own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"]))
-
-logpublickey = get_public_key_from_file(paths["logpublickey"])
-
-hashed_dir = True
-
-def hexencode(key):
- return base64.b16encode(key).lower()
-
-def write_chain(key, value):
- filename = hexencode(key)
- if hashed_dir:
- path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
- try:
- os.makedirs(path)
- except Exception, e:
- pass
- else:
- path = chainsdir
- f = open(path + "/" + filename, "w")
- f.write(value)
- f.close()
-
-def add_to_logorder(key):
- f = open(logorderfile, "a")
- f.write(hexencode(key) + "\n")
- f.close()
-
-def fsync_logorder():
- f = open(logorderfile, "a")
- os.fsync(f.fileno())
- f.close()
-
-def get_new_entries(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/storage/fetchnewentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return [base64.b64decode(entry) for entry in parsed_result[u"entries"]]
- print >>sys.stderr, "ERROR: fetchnewentries", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: fetchnewentries", e.read()
- sys.exit(1)
-
-def get_entries(node, baseurl, hashes):
- try:
- params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True)
- result = http_request(baseurl + "plop/v1/storage/getentry?" + params, key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]])
- assert len(entries) == len(hashes)
- assert set(entries.keys()) == set(hashes)
- return entries
- print >>sys.stderr, "ERROR: getentry", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: getentry", e.read()
- sys.exit(1)
-
-def get_curpos(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"position"]
- print >>sys.stderr, "ERROR: currentposition", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: currentposition", e.read()
- sys.exit(1)
-
-def get_verifiedsize(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"size"]
- print >>sys.stderr, "ERROR: verifiedsize", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: verifiedsize", e.read()
- sys.exit(1)
-
-
-
-def sendlog(node, baseurl, submission):
- try:
- result = http_request(baseurl + "plop/v1/frontend/sendlog",
- json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendlog", e.read()
- sys.stderr.flush()
- return None
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def backup_sendlog(node, baseurl, submission):
- try:
- result = http_request(baseurl + "plop/v1/merge/sendlog",
- json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendlog", e.read()
- sys.stderr.flush()
- return None
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def sendentry(node, baseurl, entry, hash):
- try:
- result = http_request(baseurl + "plop/v1/frontend/sendentry",
- json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
- verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, hash
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def sendentry_merge(node, baseurl, entry, hash):
- try:
- result = http_request(baseurl + "plop/v1/merge/sendentry",
- json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
- verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, hash
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def sendsth(node, baseurl, submission):
- try:
- result = http_request(baseurl + "plop/v1/frontend/sendsth",
- json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendsth", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def verifyroot(node, baseurl, treesize):
- try:
- result = http_request(baseurl + "plop/v1/merge/verifyroot",
- json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: verifyroot", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def setverifiedsize(node, baseurl, treesize):
- try:
- result = http_request(baseurl + "plop/v1/merge/setverifiedsize",
- json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: setverifiedsize", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def get_missingentries(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"entries"]
- print >>sys.stderr, "ERROR: missingentries", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: missingentries", e.read()
- sys.exit(1)
-
-def get_missingentriesforbackup(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"entries"]
- print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: missingentriesforbackup", e.read()
- sys.exit(1)
-
-def chunks(l, n):
- return [l[i:i+n] for i in range(0, len(l), n)]
-
-timing = timing_point()
-
-logorder = get_logorder(logorderfile)
-
-timing_point(timing, "get logorder")
-
-certsinlog = set(logorder)
-
-new_entries_per_node = {}
-new_entries = set()
-entries_to_fetch = {}
-
-for storagenode in storagenodes:
- print >>sys.stderr, "getting new entries from", storagenode["name"]
- sys.stderr.flush()
- new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"]))
- new_entries.update(new_entries_per_node[storagenode["name"]])
- entries_to_fetch[storagenode["name"]] = []
-
-import subprocess
-
-timing_point(timing, "get new entries")
-
-new_entries -= certsinlog
-
-print >>sys.stderr, "adding", len(new_entries), "entries"
-sys.stderr.flush()
-
-if args.nomerge:
- sys.exit(0)
-
-for hash in new_entries:
- for storagenode in storagenodes:
- if hash in new_entries_per_node[storagenode["name"]]:
- entries_to_fetch[storagenode["name"]].append(hash)
- break
-
-verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
-
-added_entries = 0
-for storagenode in storagenodes:
- print >>sys.stderr, "getting %d entries from %s:" % (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
- sys.stderr.flush()
- for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
- entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk)
- for hash in chunk:
- entry = entries[hash]
- verify_entry(verifycert, entry, hash)
- write_chain(hash, entry)
- add_to_logorder(hash)
- logorder.append(hash)
- certsinlog.add(hash)
- added_entries += 1
- print >>sys.stderr, added_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
-fsync_logorder()
-timing_point(timing, "add entries")
-print >>sys.stderr, "added", added_entries, "entries"
-sys.stderr.flush()
-
-verifycert.communicate(struct.pack("I", 0))
-
-tree = build_merkle_tree(logorder)
-tree_size = len(logorder)
-root_hash = tree[-1][0]
-timestamp = int(time.time() * 1000)
-
-for secondary in secondaries:
- if secondary["name"] == config["primarymergenode"]:
- continue
- nodeaddress = "https://%s/" % secondary["address"]
- nodename = secondary["name"]
- timing = timing_point()
- print >>sys.stderr, "backing up to node", nodename
- sys.stderr.flush()
- verifiedsize = get_verifiedsize(nodename, nodeaddress)
- timing_point(timing, "get verified size")
- print >>sys.stderr, "verified size", verifiedsize
- sys.stderr.flush()
- entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
- verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
- missingentries = get_missingentriesforbackup(nodename, nodeaddress)
- timing_point(timing, "get missing")
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
- fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
- for missingentry in missingentries:
- hash = base64.b64decode(missingentry)
- sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "send sth:", sendentryresult
- sys.exit(1)
- fetched_entries += 1
- if added_entries % 1000 == 0:
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
- verifyrootresult = verifyroot(nodename, nodeaddress, tree_size)
- if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
- sys.exit(1)
- secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
- if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
- sys.exit(1)
- timing_point(timing, "verifyroot")
- setverifiedsize(nodename, nodeaddress, tree_size)
- if args.timing:
- print >>sys.stderr, timing["deltatimes"]
- sys.stderr.flush()
-
-tree_head_signature = None
-for signingnode in signingnodes:
- try:
- tree_head_signature = create_sth_signature(tree_size, timestamp,
- root_hash, "https://%s/" % signingnode["address"], key=own_key)
- break
- except urllib2.URLError, e:
- print >>sys.stderr, e
- sys.stderr.flush()
-if tree_head_signature == None:
- print >>sys.stderr, "Could not contact any signing nodes"
- sys.exit(1)
-
-sth = {"tree_size": tree_size, "timestamp": timestamp,
- "sha256_root_hash": base64.b64encode(root_hash),
- "tree_head_signature": base64.b64encode(tree_head_signature)}
-
-check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
-
-timing_point(timing, "build sth")
-
-if args.timing:
- print >>sys.stderr, timing["deltatimes"]
- sys.stderr.flush()
-
-print hexencode(root_hash)
-sys.stdout.flush()
-
-for frontendnode in frontendnodes:
- nodeaddress = "https://%s/" % frontendnode["address"]
- nodename = frontendnode["name"]
- timing = timing_point()
- print >>sys.stderr, "distributing for node", nodename
- sys.stderr.flush()
- curpos = get_curpos(nodename, nodeaddress)
- timing_point(timing, "get curpos")
- print >>sys.stderr, "current position", curpos
- sys.stderr.flush()
- entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
- curpos += len(chunk)
- print >>sys.stderr, curpos,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
- missingentries = get_missingentries(nodename, nodeaddress)
- timing_point(timing, "get missing")
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
- fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
- for missingentry in missingentries:
- hash = base64.b64decode(missingentry)
- sendentryresult = sendentry(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "send sth:", sendentryresult
- sys.exit(1)
- fetched_entries += 1
- if added_entries % 1000 == 0:
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
- sendsthresult = sendsth(nodename, nodeaddress, sth)
- if sendsthresult["result"] != "ok":
- print >>sys.stderr, "send sth:", sendsthresult
- sys.exit(1)
- timing_point(timing, "send sth")
- if args.timing:
- print >>sys.stderr, timing["deltatimes"]
- sys.stderr.flush()
+import sys
+from certtools import create_ssl_context
+from merge_fetch import merge_fetch
+from merge_backup import merge_backup
+from merge_dist import merge_dist
+
+def main():
+ parser = argparse.ArgumentParser(description="")
+ parser.add_argument('--config', help="System configuration",
+ required=True)
+ parser.add_argument('--localconfig', help="Local configuration",
+ required=True)
+ parser.add_argument("--nomerge", action='store_true',
+ help="Don't actually do merge")
+ parser.add_argument("--timing", action='store_true',
+ help="Print timing information")
+ args = parser.parse_args()
+
+ config = yaml.load(open(args.config))
+ localconfig = yaml.load(open(args.localconfig))
+ paths = localconfig["paths"]
+
+ create_ssl_context(cafile=paths["https_cacertfile"])
+
+ sth = merge_fetch(args, config, localconfig)
+ merge_backup(args, config, localconfig, sth)
+ merge_dist(args, config, localconfig, sth)
+
+if __name__ == '__main__':
+ sys.exit(main())