summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-11-23 17:09:48 +0100
committerLinus Nordberg <linus@nordu.net>2016-11-23 17:09:48 +0100
commit19a2a611a839c0318f58347e2d93943c8e2401a5 (patch)
tree18cd302161a88d4546b39792a4bff6b1ade95c77 /tools
parent27e368196ce65e109c027987c706a697356f7bc5 (diff)
WIP
Merge can run as four separate processes, plus a fifth controlling proces 'merge'. Tests are limited to testcase1.py and they're failing because of the test with the dead merge secondary. Tests are also time consuming because they're waiting for 60s each time a merge needs to be verified. This could be improved by peeking at the control files, for example.
Diffstat (limited to 'tools')
-rwxr-xr-xtools/merge62
-rwxr-xr-xtools/merge_backup.py141
-rwxr-xr-xtools/merge_dist.py141
-rwxr-xr-xtools/merge_fetch.py59
-rwxr-xr-xtools/merge_sth.py81
-rw-r--r--tools/mergetools.py110
-rwxr-xr-xtools/testcase1.py16
7 files changed, 396 insertions, 214 deletions
diff --git a/tools/merge b/tools/merge
index b5a50d5..0d1bf0e 100755
--- a/tools/merge
+++ b/tools/merge
@@ -1,10 +1,58 @@
-#! /bin/sh
+#! /usr/bin/env python
+"""merge"""
-set -o errexit
+import os
+import sys
+import signal
+from time import sleep
+from mergetools import parse_args
+from multiprocessing import Process
+import merge_fetch, merge_backup, merge_sth, merge_dist
-BINDIR=$(dirname $0)
+def run_once():
+ """Merge once."""
+ ret = merge_fetch.main()
+ if ret == 0:
+ ret = merge_backup.main()
+ if ret == 0:
+ ret = merge_sth.main()
+ if ret == 0:
+ ret = merge_dist.main()
+ return ret
-$BINDIR/merge_fetch.py "$@"
-$BINDIR/merge_backup.py "$@"
-$BINDIR/merge_sth.py "$@"
-$BINDIR/merge_dist.py "$@"
+def term(signal, arg):
+ sys.exit(1)
+
+def run_continuously(pidfilepath):
+ """Run continuously."""
+ parts = ('fetch', merge_fetch), ('backup', merge_backup), ('sth', merge_sth), ('dist', merge_dist)
+ procs = {}
+ for part, mod in parts:
+ procs[part] = Process(target=mod.main, name='merge_%s' % part)
+ procs[part].daemon = True
+ procs[part].start()
+
+ if pidfilepath:
+ open(pidfilepath, 'w').write(str(os.getpid()) + '\n')
+
+ signal.signal(signal.SIGTERM, term)
+ while True:
+ for name, p in procs.items():
+ if not p.is_alive():
+ print >>sys.stderr, "\nERROR:", name, "process is gone; exiting"
+ return 1
+ sleep(1)
+
+def main():
+ """Main"""
+ args, _, _ = parse_args()
+
+ if args.mergeinterval is None:
+ ret = run_once()
+ else:
+ ret = run_continuously(args.pidfile)
+
+ return ret
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index 4f688c3..723fc7a 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -11,13 +11,16 @@ import sys
import base64
import select
import requests
+import errno
+import logging
from time import sleep
+from os import stat
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
from mergetools import chunks, backup_sendlog, get_logorder, \
get_verifiedsize, get_missingentriesforbackup, \
hexencode, setverifiedsize, sendentries_merge, verifyroot, \
- get_nfetched, parse_args, perm
+ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status
def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
for trynumber in range(5, 0, -1):
@@ -28,12 +31,10 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
if trynumber == 1:
return None
select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
+ logging.info("tries left: %d", trynumber)
continue
return sendlogresult
-
def merge_backup(args, config, localconfig, secondaries):
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
@@ -43,6 +44,8 @@ def merge_backup(args, config, localconfig, secondaries):
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
currentsizefile = mergedb + "/fetched"
+ statusfile = mergedb + "/merge_backup.status"
+ s = Status(statusfile)
timing = timing_point()
nfetched = get_nfetched(currentsizefile, logorderfile)
@@ -61,64 +64,60 @@ def merge_backup(args, config, localconfig, secondaries):
nodeaddress = "https://%s/" % secondary["address"]
nodename = secondary["name"]
timing = timing_point()
- print >>sys.stderr, "backing up to node", nodename
- sys.stderr.flush()
- verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ logging.info("backing up to node %s", nodename)
+ try:
+ verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ except requests.exceptions.ConnectionError, e:
+ logging.error("connection error when getting verified size from %s", nodename)
+ return 1
timing_point(timing, "get verified size")
- print >>sys.stderr, "verified size", verifiedsize
- sys.stderr.flush()
+ logging.info("verified size %d", verifiedsize)
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "determining end of log:",
+ logging.info("determining end of log")
for chunk in chunks(entries, 100000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10])
if sendlogresult == None:
- print >>sys.stderr, "sendlog result was None"
- sys.exit(1)
+ logging.error("sendlog result was None")
+ return 1
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
+ logging.error("backup_sendlog: %s", sendlogresult)
+ return 1
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
+ s.status("INFO: determining end of log: %d" % verifiedsize)
if verifiedsize > 100000:
verifiedsize -= 100000
else:
verifiedsize = 0
+ logging.info("end of log determined")
timing_point(timing, "checklog")
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
+ logging.info("sending log")
for chunk in chunks(entries, 1000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
if sendlogresult == None:
- sys.exit(1)
+ return 1
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
+ logging.error("backup_sendlog: %s", sendlogresult)
+ return 1
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
+ s.status("INFO: sending log: %d" % verifiedsize)
timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ logging.info("log sent")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ logging.info("missing entries: %d", len(missingentries))
fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
+ logging.info("fetching missing entries")
with requests.sessions.Session() as session:
for missingentry_chunk in chunks(missingentries, 100):
missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
@@ -127,59 +126,79 @@ def merge_backup(args, config, localconfig, secondaries):
own_key, paths,
hashes_and_entries, session)
if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry_merge:", sendentryresult
- sys.exit(1)
+ logging.error("sendentry_merge: %s", sendentryresult)
+ return 1
fetched_entries += len(missingentry_hashes)
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: fetching missing entries: %d" % fetched_entries)
timing_point(timing, "send missing")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
- verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
- tree_size)
+ try:
+ verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
+ tree_size)
+ except requests.exceptions.ConnectionError, e:
+ logging.error("connection error when verifying root at %s", nodename)
+ return 1
if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
- sys.exit(1)
+ logging.error("verifyroot: %s", verifyrootresult)
+ return 1
secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", \
- hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
- sys.exit(1)
+ logging.error("secondary root hash was %s, expected %s",
+ hexencode(secondary_root_hash),
+ hexencode(root_hash))
+ return 1
timing_point(timing, "verifyroot")
setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size)
backuppath = mergedb + "/verified." + nodename
backupdata = {"tree_size": tree_size,
"sha256_root_hash": hexencode(root_hash)}
- #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata
+ logging.debug("writing to %s: %s", backuppath, backupdata)
write_file(backuppath, backupdata)
if args.timing:
- print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_backup: %s", timing["deltatimes"])
+
+ return 0
def main():
"""
- Read logorder file up until what's indicated by fetched file and
- build the tree.
+ Wait until 'fetched' exists and read it.
+
+ Read 'logorder' up until what's indicated by 'fetched' and build the
+ tree.
Distribute entries to all secondaries, write tree size and tree head
- to backup.<secondary> files as each secondary is verified to have
+ to 'backup.<secondary>' files as each secondary is verified to have
the entries.
- Sleep some and start over.
+ If `--mergeinterval', wait until 'fetched' is updated and read it
+ and start over from the point where 'logorder' is read.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_backup.lock"
+ fetched_path = mergedb + "/fetched"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_backup.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
all_secondaries = \
[n for n in config.get('mergenodes', []) if \
n['name'] != config['primarymergenode']]
- paths = localconfig["paths"]
create_ssl_context(cafile=paths["https_cacertfile"])
if len(args.node) == 0:
@@ -187,12 +206,20 @@ def main():
else:
nodes = [n for n in all_secondaries if n["name"] in args.node]
+ if args.mergeinterval is None:
+ return merge_backup(args, config, localconfig, nodes)
+
+ fetched_statinfo = waitforfile(fetched_path)
+
while True:
- merge_backup(args, config, localconfig, nodes)
- if args.interval is None:
- break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ err = merge_backup(args, config, localconfig, nodes)
+ if err != 0:
+ return err
+ fetched_statinfo_old = fetched_statinfo
+ while fetched_statinfo == fetched_statinfo_old:
+ sleep(args.mergeinterval / 30)
+ fetched_statinfo = stat(fetched_path)
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index a9b5c60..7a13bfa 100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -9,12 +9,15 @@
#
import sys
import json
+import logging
from time import sleep
from base64 import b64encode, b64decode
+from os import stat
from certtools import timing_point, \
create_ssl_context
from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \
- sendsth, sendlog, sendentry, parse_args, perm
+ sendsth, sendlog, sendentry, parse_args, perm, waitforfile, \
+ flock_ex_or_fail, Status
def merge_dist(args, localconfig, frontendnodes, timestamp):
paths = localconfig["paths"]
@@ -25,17 +28,19 @@ def merge_dist(args, localconfig, frontendnodes, timestamp):
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
sthfile = mergedb + "/sth"
+ statusfile = mergedb + "/merge_dist.status"
+ s = Status(statusfile)
create_ssl_context(cafile=paths["https_cacertfile"])
timing = timing_point()
try:
sth = json.loads(open(sthfile, 'r').read())
except (IOError, ValueError):
- print >>sys.stderr, "No valid STH file found in", sthfile
+ logging.warning("No valid STH file found in %s", sthfile)
return timestamp
if sth['timestamp'] < timestamp:
- print >>sys.stderr, "New STH file older than the previous one:", \
- sth['timestamp'], "<", timestamp
+ logging.warning("New STH file older than the previous one: %d < %d",
+ sth['timestamp'], timestamp)
return timestamp
if sth['timestamp'] == timestamp:
return timestamp
@@ -49,96 +54,122 @@ def merge_dist(args, localconfig, frontendnodes, timestamp):
nodename = frontendnode["name"]
timing = timing_point()
- print >>sys.stderr, "distributing for node", nodename
- sys.stderr.flush()
- curpos = get_curpos(nodename, nodeaddress, own_key, paths)
+ logging.info("distributing for node %s", nodename)
+ ok, curpos = get_curpos(nodename, nodeaddress, own_key, paths)
+ if not ok:
+ logging.error("get_curpos: %s", curpos)
+ continue
timing_point(timing, "get curpos")
- print >>sys.stderr, "current position", curpos
- sys.stderr.flush()
+ logging.info("current position %d", curpos)
entries = [b64encode(entry) for entry in logorder[curpos:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
+ logging.info("sending log: %d", len(entries))
+ sendlog_fail = False
for chunk in chunks(entries, 1000):
for trynumber in range(5, 0, -1):
- sendlogresult = sendlog(nodename, nodeaddress,
- own_key, paths,
- {"start": curpos, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- sleep(10)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
+ ok, sendlogresult = sendlog(nodename, nodeaddress, own_key, paths,
+ {"start": curpos, "hashes": chunk})
+ if ok:
+ break
+ sleep(10)
+ logging.warning("tries left: %d", trynumber)
+ if not ok or sendlogresult.get("result") != "ok":
+ logging.error("sendlog: %s", sendlogresult)
+ sendlog_fail = True
break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
curpos += len(chunk)
- print >>sys.stderr, curpos,
- sys.stderr.flush()
- print >>sys.stderr
+ s.status("INFO: sendlog %d" % curpos)
timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ if sendlog_fail:
+ logging.error("sendlog failed for %s", nodename)
+ continue
missingentries = get_missingentries(nodename, nodeaddress, own_key,
paths)
timing_point(timing, "get missing")
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ logging.info("sending missing entries: %d", len(missingentries))
sent_entries = 0
- print >>sys.stderr, "send missing entries",
- sys.stderr.flush()
+ sendentry_fail = False
for missingentry in missingentries:
ehash = b64decode(missingentry)
- sendentryresult = sendentry(nodename, nodeaddress, own_key, paths,
- chainsdb.get(ehash), ehash)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry:", sendentryresult
- sys.exit(1)
+ ok, sendentryresult = sendentry(nodename, nodeaddress, own_key, paths,
+ chainsdb.get(ehash), ehash)
+ if not ok or sendentryresult.get("result") != "ok":
+ logging.error("sendentry: %s", sendentryresult)
+ sendentry_fail = True
+ break
sent_entries += 1
if sent_entries % 1000 == 0:
- print >>sys.stderr, sent_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: sendentry %d" % sent_entries)
timing_point(timing, "send missing")
+ if sendentry_fail:
+ logging.error("sendentry failed for %s", nodename)
+ continue
- print >>sys.stderr, "sending sth to node", nodename
- sys.stderr.flush()
- sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
- if sendsthresult["result"] != "ok":
- print >>sys.stderr, "sendsth:", sendsthresult
- sys.exit(1)
+ logging.info("sending sth to node %s", nodename)
+ sendsth_fail = False
+ ok, sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
+ if not ok or sendsthresult.get("result") != "ok":
+ logging.error("sendsth: %s", sendsthresult)
+ sendsth_fail = True
timing_point(timing, "send sth")
if args.timing:
- print >>sys.stderr, "timing: merge_dist:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_dist: %s", timing["deltatimes"])
+
+ if sendsth_fail:
+ logging.error("sendsth failed for %s", nodename)
+ continue
return timestamp
def main():
"""
+ Wait until 'sth' exists and read it.
+
Distribute missing entries and the STH to all frontend nodes.
+
+ If `--mergeinterval', wait until 'sth' is updated and read it and
+ start distributing again.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_dist.lock"
timestamp = 0
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_dist.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
if len(args.node) == 0:
nodes = config["frontendnodes"]
else:
nodes = [n for n in config["frontendnodes"] if n["name"] in args.node]
+ if args.mergeinterval is None:
+ if merge_dist(args, localconfig, nodes, timestamp) < 0:
+ return 1
+ return 0
+
+ sth_path = localconfig["paths"]["mergedb"] + "/sth"
+ sth_statinfo = waitforfile(sth_path)
while True:
- timestamp = merge_dist(args, localconfig, nodes, timestamp)
- if args.interval is None:
- break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ if merge_dist(args, localconfig, nodes, timestamp) < 0:
+ return 1
+ sth_statinfo_old = sth_statinfo
+ while sth_statinfo == sth_statinfo_old:
+ sleep(args.mergeinterval / 30)
+ sth_statinfo = stat(sth_path)
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index db274a3..7973fae 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -10,10 +10,11 @@
import sys
import struct
import subprocess
+import logging
from time import sleep
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
- hexencode, parse_args, perm
+ hexencode, parse_args, perm, flock_ex_or_fail, Status
from certtools import timing_point, write_file, create_ssl_context
def merge_fetch(args, config, localconfig):
@@ -21,6 +22,8 @@ def merge_fetch(args, config, localconfig):
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
logorderfile = mergedb + "/logorder"
+ statusfile = mergedb + "/merge_fetch.status"
+ s = Status(statusfile)
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -37,8 +40,7 @@ def merge_fetch(args, config, localconfig):
entries_to_fetch = {}
for storagenode in storagenodes:
- print >>sys.stderr, "getting new entries from", storagenode["name"]
- sys.stderr.flush()
+ logging.info("getting new entries from %s", storagenode["name"])
new_entries_per_node[storagenode["name"]] = \
set(get_new_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
@@ -48,8 +50,7 @@ def merge_fetch(args, config, localconfig):
timing_point(timing, "get new entries")
new_entries -= certsinlog
- print >>sys.stderr, "adding", len(new_entries), "entries"
- sys.stderr.flush()
+ logging.info("adding %d entries", len(new_entries))
for ehash in new_entries:
for storagenode in storagenodes:
@@ -63,9 +64,8 @@ def merge_fetch(args, config, localconfig):
added_entries = 0
for storagenode in storagenodes:
- print >>sys.stderr, "getting %d entries from %s:" % \
- (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
- sys.stderr.flush()
+ nentries = len(entries_to_fetch[storagenode["name"]])
+ logging.info("getting %d entries from %s", nentries, storagenode["name"])
for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
entries = get_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
@@ -78,21 +78,17 @@ def merge_fetch(args, config, localconfig):
logorder.append(ehash)
certsinlog.add(ehash)
added_entries += 1
- print >>sys.stderr, added_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: getting %d entries from %s: %d" %
+ (nentries, storagenode["name"], added_entries))
chainsdb.commit()
fsync_logorder(logorderfile)
timing_point(timing, "add entries")
- print >>sys.stderr, "added", added_entries, "entries"
- sys.stderr.flush()
+ logging.info("added %d entries", added_entries)
verifycert.communicate(struct.pack("I", 0))
if args.timing:
- print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_fetch: %s", timing["deltatimes"])
tree_size = len(logorder)
if tree_size == 0:
@@ -102,28 +98,43 @@ def merge_fetch(args, config, localconfig):
def main():
"""
- Fetch new entries from all storage nodes.
+ Fetch new entries from all storage nodes, in sequence.
- Indicate current position by writing the index in the logorder file
- (0-based) to the 'fetched' file.
+ Indicate the current position by writing the hash and its 'logorder'
+ index, 0-based, to 'fetched'.
- Sleep some and start over.
+ Sleep some and start over, or exit if there's no `--mergeinterval'.
"""
args, config, localconfig = parse_args()
paths = localconfig["paths"]
mergedb = paths["mergedb"]
currentsizefile = mergedb + "/fetched"
+ lockfile = mergedb + "/.merge_fetch.lock"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_fetch.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
create_ssl_context(cafile=paths["https_cacertfile"])
while True:
logsize, last_hash = merge_fetch(args, config, localconfig)
currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)}
- #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize
+ logging.debug("writing to %s: %s", currentsizefile, currentsize)
write_file(currentsizefile, currentsize)
- if args.interval is None:
+ if args.mergeinterval is None:
break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ logging.debug("sleeping %d seconds", args.mergeinterval / 10)
+ sleep(args.mergeinterval / 10)
+
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/merge_sth.py b/tools/merge_sth.py
index f4aec53..97f6e24 100755
--- a/tools/merge_sth.py
+++ b/tools/merge_sth.py
@@ -9,12 +9,13 @@
#
import sys
import json
-import urllib2
import time
import requests
+import logging
from base64 import b64encode
+from datetime import datetime, timedelta
from mergetools import parse_args, get_nfetched, hexencode, hexdecode, \
- get_logorder, get_sth
+ get_logorder, get_sth, flock_ex_or_fail
from certtools import create_ssl_context, get_public_key_from_file, \
timing_point, create_sth_signature, write_file, check_sth_signature, \
build_merkle_tree
@@ -39,6 +40,7 @@ def merge_sth(args, config, localconfig):
trees = [{'tree_size': get_nfetched(currentsizefile, logorderfile),
'sha256_root_hash': ''}]
+ logging.debug("starting point, trees: %s", trees)
for mergenode in mergenodes:
if mergenode["name"] == config["primarymergenode"]:
continue
@@ -49,28 +51,29 @@ def merge_sth(args, config, localconfig):
tree = {'tree_size': 0, "sha256_root_hash": ''}
trees.append(tree)
trees.sort(key=lambda e: e['tree_size'], reverse=True)
- #print >>sys.stderr, "DEBUG: trees:", trees
+ logging.debug("trees: %s", trees)
if backupquorum > len(trees) - 1:
- print >>sys.stderr, "backup quorum > number of secondaries:", \
- backupquorum, ">", len(trees) - 1
- return
+ logging.error("backup quorum > number of secondaries: %d > %d",
+ backupquorum, len(trees) - 1)
+ return -1
tree_size = trees[backupquorum]['tree_size']
root_hash = hexdecode(trees[backupquorum]['sha256_root_hash'])
- #print >>sys.stderr, "DEBUG: tree size candidate at backupquorum", backupquorum, ":", tree_size
+ logging.debug("tree size candidate at backupquorum %d: %d", backupquorum,
+ tree_size)
cur_sth = get_sth(sthfile)
if tree_size < cur_sth['tree_size']:
- print >>sys.stderr, "candidate tree < current tree:", \
- tree_size, "<", cur_sth['tree_size']
- return
+ logging.info("candidate tree < current tree: %d < %d",
+ tree_size, cur_sth['tree_size'])
+ return 0
assert tree_size >= 0 # Don't read logorder without limit.
logorder = get_logorder(logorderfile, tree_size)
timing_point(timing, "get logorder")
if tree_size == -1:
tree_size = len(logorder)
- print >>sys.stderr, "new tree size will be", tree_size
+ logging.info("new tree size will be %d", tree_size)
root_hash_calc = build_merkle_tree(logorder)[-1][0]
assert root_hash == '' or root_hash == root_hash_calc
@@ -87,11 +90,10 @@ def merge_sth(args, config, localconfig):
key=own_key)
break
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, e.response
- sys.stderr.flush()
+ logging.warning("create_sth_signature error: %s", e.response)
if tree_head_signature == None:
- print >>sys.stderr, "Could not contact any signing nodes"
- sys.exit(1)
+ logging.error("Could not contact any signing nodes")
+ return 0
sth = {"tree_size": tree_size, "timestamp": timestamp,
"sha256_root_hash": b64encode(root_hash),
@@ -100,34 +102,59 @@ def merge_sth(args, config, localconfig):
check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
timing_point(timing, "build sth")
- print hexencode(root_hash), timestamp, tree_size
- sys.stdout.flush()
+ logging.info("new root: %s %d %d", hexencode(root_hash), timestamp, tree_size)
write_file(sthfile, sth)
if args.timing:
- print >>sys.stderr, "timing: merge_sth:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_sth: %s", timing["deltatimes"])
+
+ return 0
def main():
"""
- Read file 'sth' to get current tree size, assuming zero if file not
+ Read 'sth' to get the current tree size, assuming zero if file not
found.
Read tree sizes from the backup.<secondary> files, put them in a
- list and sort it. Let new tree size equal list[backup-quorum]. Barf
- on a new tree size smaller than the currently published tree size.
+ list and sort the list. Let new tree size be list[backup-quorum]. If
+ the new tree size is smaller than the currently published tree size,
+ stop here.
+
+ Decide on a timestamp, build an STH and write it to 'sth'.
- Decide on a timestamp, build an STH and write it to file 'sth'.
+ Sleep some and start over, or exit if there's no `--mergeinterval'.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_sth.lock"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_sth.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
while True:
- merge_sth(args, config, localconfig)
- if args.interval is None:
+ merge_start_time = datetime.now()
+ ret = merge_sth(args, config, localconfig)
+ if ret < 0:
+ return 1
+ if args.mergeinterval is None:
break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- time.sleep(args.interval)
+ sleep = (merge_start_time + timedelta(seconds=args.mergeinterval) -
+ datetime.now()).seconds
+ if sleep > 0:
+ logging.debug("sleeping %d seconds", sleep)
+ time.sleep(sleep)
+
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 80fbf0b..5471fe4 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -10,11 +10,15 @@ import json
import yaml
import argparse
import requests
+import time
+import fcntl
+import errno
+import logging
try:
import permdb
except ImportError:
pass
-from certtools import get_leaf_hash, http_request, get_leaf_hash
+from certtools import get_leaf_hash, http_request
def parselogrow(row):
return base64.b16decode(row, casefold=True)
@@ -33,7 +37,7 @@ def get_nfetched(currentsizefile, logorderfile):
try:
limit = json.loads(open(currentsizefile).read())
except (IOError, ValueError):
- return -1
+ return 0
if limit['index'] >= 0:
with open(logorderfile, 'r') as f:
f.seek(limit['index']*65)
@@ -207,12 +211,10 @@ def get_curpos(node, baseurl, own_key, paths):
publickeydir=paths["publickeys"])
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"position"]
- print >>sys.stderr, "ERROR: currentposition", parsed_result
- sys.exit(1)
+ return True, parsed_result[u"position"]
+ return False, parsed_result
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: currentposition", e.response
- sys.exit(1)
+ return False, e.response
def get_verifiedsize(node, baseurl, own_key, paths):
try:
@@ -234,19 +236,16 @@ def sendlog(node, baseurl, own_key, paths, submission):
result = http_request(baseurl + "plop/v1/frontend/sendlog",
json.dumps(submission), key=own_key,
verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendlog", e.response
- sys.stderr.flush()
- return None
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(submission)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def backup_sendlog(node, baseurl, own_key, paths, submission):
try:
@@ -274,18 +273,16 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash):
json.dumps({"entry":base64.b64encode(entry),
"treeleafhash":base64.b64encode(ehash)}),
key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry", e.reponse
- sys.exit(1)
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, ehash
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(ehash)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def sendentry_merge(node, baseurl, own_key, paths, entry, ehash):
return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)])
@@ -304,7 +301,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, ehash
+ print >>sys.stderr, hash
print >>sys.stderr, "======= RESPONSE ======="
print >>sys.stderr, result
print >>sys.stderr, "========================"
@@ -316,18 +313,16 @@ def sendsth(node, baseurl, own_key, paths, submission):
result = http_request(baseurl + "plop/v1/frontend/sendsth",
json.dumps(submission), key=own_key,
verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendsth", e.response
- sys.exit(1)
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(submission)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def verifyroot(node, baseurl, own_key, paths, treesize):
try:
@@ -403,10 +398,17 @@ def parse_args():
required=True)
parser.add_argument('--localconfig', help="Local configuration",
required=True)
- parser.add_argument('--interval', type=int, metavar="n",
- help="Repeate every N seconds")
+ # FIXME: verify that 0 < N < 1d
+ parser.add_argument('--mergeinterval', type=int, metavar="n",
+ help="Merge every N seconds")
parser.add_argument("--timing", action='store_true',
help="Print timing information")
+ parser.add_argument("--pidfile", type=str, metavar="file",
+ help="Store PID in FILE")
+ parser.add_argument("--logdir", type=str, default=".", metavar="dir",
+ help="Write logfiles in DIR [default: .]")
+ parser.add_argument("--loglevel", type=str, default="DEBUG", metavar="level",
+ help="Log level, one of DEBUG, INFO, WARNING, ERROR, CRITICAL [default: DEBUG]")
args = parser.parse_args()
config = yaml.load(open(args.config))
@@ -421,6 +423,32 @@ def perm(dbtype, path):
return PermDB(path)
assert False
+def waitforfile(path):
+ statinfo = None
+ while statinfo is None:
+ try:
+ statinfo = os.stat(path)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ time.sleep(1)
+ return statinfo
+
+def flock_ex_or_fail(path):
+ try:
+ fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB)
+ except IOError, e:
+ if e.errno != errno.EWOULDBLOCK:
+ raise
+ return False
+ return True
+
+class Status:
+ def __init__(self, path):
+ self.path = path
+ def status(self, s):
+ open(self.path, 'w').write(s)
+
class FileDB:
def __init__(self, path):
self.path = path
diff --git a/tools/testcase1.py b/tools/testcase1.py
index 81d589a..98b9179 100755
--- a/tools/testcase1.py
+++ b/tools/testcase1.py
@@ -13,6 +13,7 @@ import struct
import hashlib
import itertools
import os.path
+from time import sleep
from certtools import *
baseurls = [sys.argv[1]]
@@ -20,6 +21,9 @@ logpublickeyfile = sys.argv[2]
cacertfile = sys.argv[3]
toolsdir = os.path.dirname(sys.argv[0])
testdir = sys.argv[4]
+do_merge = True
+if len(sys.argv) > 5 and sys.argv[5] == '--nomerge':
+ do_merge = False
certfiles = [toolsdir + ("/testcerts/cert%d.txt" % e) for e in range(1, 6)]
@@ -121,7 +125,7 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl):
assert_equal(len(entries), 1, "get_entries", quiet=True)
fetched_entry = entries["entries"][0]
merkle_tree_leaf = pack_mtl(timestamp, chain[0])
- leaf_input = base64.decodestring(fetched_entry["leaf_input"])
+ leaf_input = base64.decodestring(fetched_entry["leaf_input"])
assert_equal(leaf_input, merkle_tree_leaf, "entry", nodata=True, quiet=True)
extra_data = base64.decodestring(fetched_entry["extra_data"])
certchain = decode_certificate_chain(extra_data)
@@ -148,8 +152,14 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl):
len(submittedcertchain))
def merge():
- return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg",
- "--localconfig", testdir + "/catlfish-test-local-merge.cfg"])
+ if do_merge:
+ return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg",
+ "--localconfig", testdir + "/catlfish-test-local-merge.cfg"])
+ else:
+ n = 60
+ print "testcase1.py: sleeping", n, "seconds waiting for merge"
+ sleep(n)
+ return 0
mergeresult = merge()
assert_equal(mergeresult, 0, "merge", quiet=True, fatal=True)