summaryrefslogtreecommitdiff
path: root/tools/merge_backup.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/merge_backup.py')
-rwxr-xr-xtools/merge_backup.py121
1 files changed, 71 insertions, 50 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index e7cce26..f25b22a 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -8,17 +8,19 @@
# See catlfish/doc/merge.txt for more about the merge process.
#
import sys
-import base64
import select
import requests
+import errno
+import logging
from time import sleep
from base64 import b64encode, b64decode
+from os import stat
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
from mergetools import chunks, backup_sendlog, get_logorder, \
get_verifiedsize, get_missingentriesforbackup, \
hexencode, setverifiedsize, sendentries_merge, verifyroot, \
- get_nfetched, parse_args, perm
+ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status
def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
for trynumber in range(5, 0, -1):
@@ -29,57 +31,49 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
if trynumber == 1:
return None
select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
+ logging.info("tries left: %d", trynumber)
continue
return sendlogresult
sendlog_discover_chunksize = 100000
-def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths):
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
+def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths,
+ statusupdates):
+ logging.info("sending log")
for chunk in chunks(entries, 1000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
if sendlogresult == None:
sys.exit(1)
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
+ logging.error("backup_sendlog: %s", sendlogresult)
sys.exit(1)
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ statusupdates.status("PROG sending log: %d" % verifiedsize)
+ logging.info("log sent")
-def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing):
+def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb,
+ timing, statusupdates):
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ logging.info("about to send %d missing entries", len(missingentries))
fetched_entries = 0
- print >>sys.stderr, "sending missing entries",
- sys.stderr.flush()
with requests.sessions.Session() as session:
for missingentry_chunk in chunks(missingentries, 100):
- missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
- hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
+ missingentry_hashes = [b64decode(missingentry) for missingentry in missingentry_chunk]
+ hashes_and_entries = [(ehash, chainsdb.get(ehash)) for ehash in missingentry_hashes]
sendentryresult = sendentries_merge(nodename, nodeaddress,
own_key, paths,
hashes_and_entries, session)
if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentries_merge:", sendentryresult
+ logging.error("sendentries_merge: %s", sendentryresult)
sys.exit(1)
fetched_entries += len(missingentry_hashes)
- #print >>sys.stderr, fetched_entries,
- #sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ statusupdates.status("PROG sending missing entries: %d" %
+ fetched_entries)
timing_point(timing, "send missing")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
@@ -93,17 +87,16 @@ def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timin
verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
tree_size)
if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
+ logging.error("verifyroot: %s", verifyrootresult)
sys.exit(1)
- secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
+ secondary_root_hash = b64decode(verifyrootresult["root_hash"])
if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", \
- hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
+ logging.error("secondary root hash was %s while expected was %s",
+ hexencode(secondary_root_hash), hexencode(root_hash))
sys.exit(1)
timing_point(timing, "verifyroot")
return root_hash
-
+
def merge_backup(args, config, localconfig, secondaries):
maxwindow = localconfig.get("maxwindow", 1000)
paths = localconfig["paths"]
@@ -114,6 +107,8 @@ def merge_backup(args, config, localconfig, secondaries):
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
currentsizefile = mergedb + "/fetched"
+ statusfile = mergedb + "/merge_backup.status"
+ s = Status(statusfile)
timing = timing_point()
nfetched = get_nfetched(currentsizefile, logorderfile)
@@ -128,12 +123,10 @@ def merge_backup(args, config, localconfig, secondaries):
nodeaddress = "https://%s/" % secondary["address"]
nodename = secondary["name"]
timing = timing_point()
- print >>sys.stderr, "backing up to node", nodename
- sys.stderr.flush()
+ logging.info("backing up to node %s", nodename)
verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
timing_point(timing, "get verified size")
- print >>sys.stderr, "verified size", verifiedsize
- sys.stderr.flush()
+ logging.info("verified size %d", verifiedsize)
if verifiedsize == tree_size:
root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing)
@@ -142,10 +135,10 @@ def merge_backup(args, config, localconfig, secondaries):
uptopos = min(verifiedsize + maxwindow, tree_size)
entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
- sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths)
+ sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s)
timing_point(timing, "sendlog")
- fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing)
+ fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s)
root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing)
@@ -155,29 +148,48 @@ def merge_backup(args, config, localconfig, secondaries):
backuppath = mergedb + "/verified." + nodename
backupdata = {"tree_size": tree_size,
"sha256_root_hash": hexencode(root_hash)}
- #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata
+ logging.debug("writing to %s: %s", backuppath, backupdata)
write_file(backuppath, backupdata)
if args.timing:
- print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_backup: %s", timing["deltatimes"])
+
+ return 0
def main():
"""
- Read logorder file up until what's indicated by fetched file and
- build the tree.
+ Wait until 'fetched' exists and read it.
+
+ Read 'logorder' up until what's indicated by 'fetched' and build the
+ tree.
Distribute entries to all secondaries, write tree size and tree head
- to backup.<secondary> files as each secondary is verified to have
+ to 'backup.<secondary>' files as each secondary is verified to have
the entries.
- Sleep some and start over.
+ If `--mergeinterval', wait until 'fetched' is updated and read it
+ and start over from the point where 'logorder' is read.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_backup.lock"
+ fetched_path = mergedb + "/fetched"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_backup.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
all_secondaries = \
[n for n in config.get('mergenodes', []) if \
n['name'] != config['primarymergenode']]
- paths = localconfig["paths"]
create_ssl_context(cafile=paths["https_cacertfile"])
if len(args.node) == 0:
@@ -185,12 +197,21 @@ def main():
else:
nodes = [n for n in all_secondaries if n["name"] in args.node]
+ if args.mergeinterval is None:
+ return merge_backup(args, config, localconfig, nodes)
+
+ fetched_statinfo = waitforfile(fetched_path)
+
while True:
- merge_backup(args, config, localconfig, nodes)
- if args.interval is None:
- break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ err = merge_backup(args, config, localconfig, nodes)
+ if err:
+ return err
+ fetched_statinfo_old = fetched_statinfo
+ while fetched_statinfo == fetched_statinfo_old:
+ sleep(max(3, args.mergeinterval / 10))
+ fetched_statinfo = stat(fetched_path)
+
+ return 0
if __name__ == '__main__':
sys.exit(main())