From b7b8903b6a7c3342933b9984afa72fb6527b5f72 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Tue, 24 Jan 2017 09:16:28 +0100 Subject: Parallelised merge, distribution phase. --- tools/compileconfig.py | 75 ++++++++++++++++++++++++++++++++++---------------- tools/initlog.py | 71 +++++++++++++++++++++++++++++++++++++++++++++++ tools/merge | 2 +- tools/merge_fetch.py | 2 +- tools/testcase1.py | 5 +++- 5 files changed, 128 insertions(+), 27 deletions(-) create mode 100755 tools/initlog.py (limited to 'tools') diff --git a/tools/compileconfig.py b/tools/compileconfig.py index b5e5053..aea7ace 100755 --- a/tools/compileconfig.py +++ b/tools/compileconfig.py @@ -8,6 +8,7 @@ import sys import readconfig import re import base64 +from datetime import datetime class Symbol(str): pass @@ -131,24 +132,24 @@ def gen_http_servers(nodetype, nodeconfig, bind_addresses, bind_publicaddress, b return (http_servers, https_servers) -def allowed_clients_frontend(mergenodenames, primarymergenode): +def allowed_clients_frontend(mergenodenames, primarymergenodename): return [ ("/plop/v1/frontend/sendentry", mergenodenames), ("/plop/v1/frontend/sendlog", mergenodenames), - ("/plop/v1/frontend/publish-sth", [primarymergenode]), - ("/plop/v1/frontend/verify-entries", [primarymergenode]), + ("/plop/v1/frontend/publish-sth", [primarymergenodename]), + ("/plop/v1/frontend/verify-entries", [primarymergenodename]), ("/plop/v1/frontend/currentposition", mergenodenames), ("/plop/v1/frontend/missingentries", mergenodenames), ] -def allowed_clients_mergesecondary(primarymergenode): +def allowed_clients_mergesecondary(primarymergenodename): return [ - ("/plop/v1/merge/sendentry", [primarymergenode]), - ("/plop/v1/merge/sendlog", [primarymergenode]), - ("/plop/v1/merge/verifyroot", [primarymergenode]), - ("/plop/v1/merge/verifiedsize", [primarymergenode]), - ("/plop/v1/merge/setverifiedsize", [primarymergenode]), - ("/plop/v1/merge/missingentries", [primarymergenode]), + ("/plop/v1/merge/sendentry", [primarymergenodename]), + ("/plop/v1/merge/sendlog", [primarymergenodename]), + ("/plop/v1/merge/verifyroot", [primarymergenodename]), + ("/plop/v1/merge/verifiedsize", [primarymergenodename]), + ("/plop/v1/merge/setverifiedsize", [primarymergenodename]), + ("/plop/v1/merge/missingentries", [primarymergenodename]), ] def allowed_clients_public(): @@ -164,10 +165,10 @@ def allowed_clients_public(): ("/ct/v1/get-roots", noauth), ] -def allowed_clients_signing(frontendnodenames, primarymergenode): +def allowed_clients_signing(frontendnodenames, primarymergenodename): return [ ("/plop/v1/signing/sct", frontendnodenames), - ("/plop/v1/signing/sth", [primarymergenode]), + ("/plop/v1/signing/sth", [primarymergenodename]), ] def allowed_clients_storage(frontendnodenames, mergenodenames): @@ -185,6 +186,14 @@ def allowed_servers_frontend(signingnodenames, storagenodenames): ("/plop/v1/signing/sct", signingnodenames), ] +def allowed_servers_primarymerge(frontendnodenames): + return [ + ("/plop/v1/frontend/verify-entries", frontendnodenames), + ("/plop/v1/frontend/sendlog", frontendnodenames), + ("/plop/v1/frontend/sendentry", frontendnodenames), + ("/plop/v1/frontend/publish-sth", frontendnodenames), + ] + def parse_ratelimit_expression(expression): if expression == "none": return Symbol("none") @@ -228,7 +237,10 @@ def gen_config(nodename, config, localconfig): print >>plopconfigfile, "%% plop configuration file (-*- erlang -*-)" (nodetype, nodeconfig) = get_node_config(nodename, config) - (http_servers, https_servers) = gen_http_servers(nodetype, nodeconfig, bind_addresses, bind_publicaddress, bind_publichttpaddress=bind_publichttpaddress) + if nodename == config["primarymergenode"]: + (http_servers, https_servers) = [], [] + else: + (http_servers, https_servers) = gen_http_servers(nodetype, nodeconfig, bind_addresses, bind_publicaddress, bind_publichttpaddress=bind_publichttpaddress) catlfishconfig = [] plopconfig = [] @@ -248,8 +260,6 @@ def gen_config(nodename, config, localconfig): (Symbol("http_servers"), http_servers), (Symbol("https_certfile"), paths["https_certfile"]), (Symbol("https_keyfile"), paths["https_keyfile"]), - (Symbol("https_cacertfile"), paths["https_cacertfile"]), - (Symbol("https_cacert_fingerprint"), Binary(base64.b16decode(config["cafingerprint"]))), ] catlfishconfig.append((Symbol("mmd"), config["mmd"])) @@ -263,6 +273,11 @@ def gen_config(nodename, config, localconfig): ]) ] + plopconfig += [ + (Symbol("https_cacertfile"), paths["https_cacertfile"]), + (Symbol("https_cacert_fingerprint"), Binary(base64.b16decode(config["cafingerprint"]))), + ] + if "dbbackend" in localconfig: dbbackend = localconfig["dbbackend"] if dbbackend not in ("fsdb", "permdb"): @@ -275,7 +290,7 @@ def gen_config(nodename, config, localconfig): print >>sys.stderr, "When using permdb, all services have to be in the same node" sys.exit(1) - print "nodetype", ", ".join(nodetype) + #print "nodetype", ", ".join(nodetype) if nodetype & set(["frontendnodes", "storagenodes"]): plopconfig += [ (Symbol("entry_root_path"), paths["db"] + "certentries"), @@ -310,9 +325,10 @@ def gen_config(nodename, config, localconfig): signingnodes = config["signingnodes"] signingnodeaddresses = ["https://%s/plop/v1/signing/" % node["address"] for node in config["signingnodes"]] mergenodenames = [node["name"] for node in config["mergenodes"]] - primarymergenode = config["primarymergenode"] + primarymergenodename = config["primarymergenode"] storagenodeaddresses = ["https://%s/plop/v1/storage/" % node["address"] for node in config["storagenodes"]] frontendnodenames = [node["name"] for node in config["frontendnodes"]] + frontendnodeaddresses = ["https://%s/plop/v1/frontend/" % node["address"] for node in config["frontendnodes"]] allowed_clients = [] allowed_servers = [] @@ -323,20 +339,30 @@ def gen_config(nodename, config, localconfig): reloadableplopconfig.append((Symbol("storage_nodes"), storagenodeaddresses)) reloadableplopconfig.append((Symbol("storage_nodes_quorum"), config["storage-quorum-size"])) services.add(Symbol("ht")) - allowed_clients += allowed_clients_frontend(mergenodenames, primarymergenode) + allowed_clients += allowed_clients_frontend(mergenodenames, primarymergenodename) allowed_clients += allowed_clients_public() allowed_servers += allowed_servers_frontend([node["name"] for node in signingnodes], storagenodenames) if "storagenodes" in nodetype: allowed_clients += allowed_clients_storage(frontendnodenames, mergenodenames) if "signingnodes" in nodetype: - allowed_clients += allowed_clients_signing(frontendnodenames, primarymergenode) + allowed_clients += allowed_clients_signing(frontendnodenames, primarymergenodename) services = [Symbol("sign")] if "mergenodes" in nodetype: - storagenodenames = [node["name"] for node in config["storagenodes"]] reloadableplopconfig.append((Symbol("storage_nodes"), storagenodeaddresses)) reloadableplopconfig.append((Symbol("storage_nodes_quorum"), config["storage-quorum-size"])) services.add(Symbol("ht")) - allowed_clients += allowed_clients_mergesecondary(primarymergenode) + if nodename == primarymergenodename: + merge = localconfig["merge"] + plopconfig.append((Symbol("db_backend_opt"), [(Symbol("write_flag"), Symbol("read"))])) + plopconfig.append((Symbol("merge_delay"), merge["min-delay"])) + plopconfig.append((Symbol("merge_dist_winsize"), merge["dist-window-size"])) + plopconfig.append((Symbol("merge_dist_sendlog_chunksize"), merge["dist-sendlog-chunksize"])) + plopconfig.append((Symbol("merge_dist_sendentries_chunksize"), merge["dist-sendentries-chunksize"])) + plopconfig.append((Symbol("frontend_nodes"), frontendnodeaddresses)) + plopconfig.append((Symbol("sth_path"), paths["mergedb"] + "/sth")) + allowed_servers += allowed_servers_primarymerge(frontendnodenames) + else: + allowed_clients += allowed_clients_mergesecondary(primarymergenodename) plopconfig += [ (Symbol("services"), list(services)), @@ -383,12 +409,13 @@ def gen_config(nodename, config, localconfig): def gen_testmakefile(config, testmakefile, machines, shellvars=False): configfile = open(testmakefile, "w") + print >>configfile, "#", testmakefile, "generated by", sys.argv[0], datetime.now() + frontendnodenames = set([node["name"] for node in config["frontendnodes"]]) storagenodenames = set([node["name"] for node in config["storagenodes"]]) signingnodenames = set([node["name"] for node in config["signingnodes"]]) mergenodenames = set([node["name"] for node in config["mergenodes"]]) - erlangnodenames = frontendnodenames | storagenodenames | signingnodenames | \ - set(filter(lambda name: name != config["primarymergenode"], mergenodenames)) + erlangnodenames_and_apps = ['%s:%s' % (nn, 'catlfish' if nn != config["primarymergenode"] else "merge") for nn in frontendnodenames | storagenodenames | signingnodenames | mergenodenames] frontendnodeaddresses = [node["publicaddress"] for node in config["frontendnodes"]] storagenodeaddresses = [node["address"] for node in config["storagenodes"]] @@ -398,7 +425,7 @@ def gen_testmakefile(config, testmakefile, machines, shellvars=False): delimiter = '"' if shellvars else '' print >>configfile, "NODES=" + delimiter + " ".join(frontendnodenames|storagenodenames|signingnodenames|mergenodenames) + delimiter - print >>configfile, "ERLANGNODES=" + delimiter + " ".join(erlangnodenames) + delimiter + print >>configfile, "ERLANGNODES=" + delimiter + " ".join(erlangnodenames_and_apps) + delimiter print >>configfile, "MACHINES=" + delimiter + " ".join([str(e) for e in range(1, machines+1)]) + delimiter print >>configfile, "TESTURLS=" + delimiter + " ".join(frontendnodeaddresses+storagenodeaddresses+signingnodeaddresses+mergenodeaddresses) + delimiter print >>configfile, "BASEURL=" + delimiter + config["baseurl"] + delimiter diff --git a/tools/initlog.py b/tools/initlog.py new file mode 100755 index 0000000..11ebc2e --- /dev/null +++ b/tools/initlog.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017, NORDUnet A/S. +# See LICENSE for licensing information. +# +# Initialise a new CT log. +# + +import sys +import os +import argparse +import yaml +import errno +from time import time +from base64 import b64encode +from certtools import build_merkle_tree, write_file +from mergetools import get_sth, perm, get_logorder + +def parse_args(): + parser = argparse.ArgumentParser(description="") + parser.add_argument('--config', help="System configuration", + required=True) + parser.add_argument('--localconfig', help="Local configuration", + required=True) + + args = parser.parse_args() + config = yaml.load(open(args.config)) + localconfig = yaml.load(open(args.localconfig)) + + return (args, config, localconfig) + +def main(): + """ + Initialise a log by creating + - perm database if it doesn't exist + """ + args, config, localconfig = parse_args() + signingnodes = config["signingnodes"] + paths = localconfig["paths"] + own_key = (localconfig["nodename"], + "%s/%s-private.pem" % (paths["privatekeys"], + localconfig["nodename"])) + mergedb = paths["mergedb"] + logorderfile = mergedb + "/logorder" + sthfile = mergedb + "/sth" + + # Don't do anything if there's already an sth file. + sth = get_sth(sthfile) + if sth['tree_size'] >= 0: + print >>sys.stderr, \ + "This log has an STH file with tree size %s." % sth['tree_size'] + print >>sys.stderr, "I refuse to destroy this log." + return 1 + + # Ensure that we can find our keyfile. + try: + os.stat(own_key[1]) + except OSError, e: + if e.errno == errno.ENOENT: + print >>sys.stderr, "Unable to open keyfile: %s" % own_key[1] + return 1 + raise + + # Create a chains database. + chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") + + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/tools/merge b/tools/merge index b5a50d5..0d3f36c 100755 --- a/tools/merge +++ b/tools/merge @@ -7,4 +7,4 @@ BINDIR=$(dirname $0) $BINDIR/merge_fetch.py "$@" $BINDIR/merge_backup.py "$@" $BINDIR/merge_sth.py "$@" -$BINDIR/merge_dist.py "$@" +#$BINDIR/merge_dist.py "$@" diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index 8f94aed..42a3089 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -59,7 +59,7 @@ def merge_fetch(args, config, localconfig): break verifycert = subprocess.Popen( - [paths["verifycert_bin"], paths["known_roots"]], + [paths["verifycert_bin"], paths["knownroots"]], stdin=subprocess.PIPE, stdout=subprocess.PIPE) added_entries = 0 diff --git a/tools/testcase1.py b/tools/testcase1.py index dbafe7a..5192074 100755 --- a/tools/testcase1.py +++ b/tools/testcase1.py @@ -13,6 +13,7 @@ import struct import hashlib import itertools import os.path +from time import sleep from certtools import * baseurls = [sys.argv[1]] @@ -148,8 +149,10 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl): len(submittedcertchain)) def merge(): - return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg", + rv = subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg", "--localconfig", testdir + "/catlfish-test-local-merge.cfg"]) + sleep(5) # FIXME: Just wait for dist instead. + return rv mergeresult = merge() assert_equal(mergeresult, 0, "merge", quiet=True, fatal=True) -- cgit v1.1