summaryrefslogtreecommitdiff
path: root/tools/merge_fetch.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2015-09-22 21:59:48 +0200
committerLinus Nordberg <linus@nordu.net>2015-09-27 13:38:29 +0200
commit5f80e1a1b67221ad81a3c717de569647bf121967 (patch)
tree5d8a61fb94ee5c4e73e01610aa386da5bd083fe2 /tools/merge_fetch.py
parenta11ef0bb9db8425c4a7e1c879b4a0116d168a1fb (diff)
Split merge.py into three pieces.
Diffstat (limited to 'tools/merge_fetch.py')
-rw-r--r--tools/merge_fetch.py97
1 files changed, 97 insertions, 0 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
new file mode 100644
index 0000000..a0a0396
--- /dev/null
+++ b/tools/merge_fetch.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2014-2015, NORDUnet A/S.
+# See LICENSE for licensing information.
+
+import sys
+import struct
+import time
+import subprocess
+from mergetools import get_logorder, verify_entry, get_new_entries, \
+ chunks, fsync_logorder, get_entries, write_chain, add_to_logorder
+from certtools import timing_point, build_merkle_tree
+
+def merge_fetch(args, config, localconfig):
+ paths = localconfig["paths"]
+ storagenodes = config["storagenodes"]
+ mergedb = paths["mergedb"]
+ logorderfile = mergedb + "/logorder"
+ chainsdir = mergedb + "/chains"
+ own_key = (localconfig["nodename"],
+ "%s/%s-private.pem" % (paths["privatekeys"],
+ localconfig["nodename"]))
+
+ timing = timing_point()
+
+ logorder = get_logorder(logorderfile)
+ timing_point(timing, "get logorder")
+
+ certsinlog = set(logorder)
+
+ new_entries_per_node = {}
+ new_entries = set()
+ entries_to_fetch = {}
+
+ for storagenode in storagenodes:
+ print >>sys.stderr, "getting new entries from", storagenode["name"]
+ sys.stderr.flush()
+ new_entries_per_node[storagenode["name"]] = \
+ set(get_new_entries(storagenode["name"],
+ "https://%s/" % storagenode["address"],
+ own_key, paths))
+ new_entries.update(new_entries_per_node[storagenode["name"]])
+ entries_to_fetch[storagenode["name"]] = []
+ timing_point(timing, "get new entries")
+
+ new_entries -= certsinlog
+ print >>sys.stderr, "adding", len(new_entries), "entries"
+ sys.stderr.flush()
+
+ if args.nomerge:
+ sys.exit(0)
+
+ for ehash in new_entries:
+ for storagenode in storagenodes:
+ if ehash in new_entries_per_node[storagenode["name"]]:
+ entries_to_fetch[storagenode["name"]].append(ehash)
+ break
+
+ verifycert = subprocess.Popen(
+ [paths["verifycert_bin"], paths["known_roots"]],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+
+ added_entries = 0
+ for storagenode in storagenodes:
+ print >>sys.stderr, "getting %d entries from %s:" % \
+ (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
+ sys.stderr.flush()
+ for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
+ entries = get_entries(storagenode["name"],
+ "https://%s/" % storagenode["address"],
+ own_key, paths, chunk)
+ for ehash in chunk:
+ entry = entries[ehash]
+ verify_entry(verifycert, entry, ehash)
+ write_chain(ehash, entry, chainsdir)
+ add_to_logorder(logorderfile, ehash)
+ logorder.append(ehash)
+ certsinlog.add(ehash)
+ added_entries += 1
+ print >>sys.stderr, added_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ fsync_logorder(logorderfile)
+ timing_point(timing, "add entries")
+ print >>sys.stderr, "added", added_entries, "entries"
+ sys.stderr.flush()
+
+ verifycert.communicate(struct.pack("I", 0))
+
+ tree = build_merkle_tree(logorder)
+ tree_size = len(logorder)
+ root_hash = tree[-1][0]
+ timestamp = int(time.time() * 1000)
+
+ return (tree_size, root_hash, timestamp)