summaryrefslogtreecommitdiff
path: root/tools/merge_fetch.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-11-27 15:48:56 +0100
committerLinus Nordberg <linus@nordu.net>2016-11-27 15:48:56 +0100
commitec61afe24f5d1189654b7b751dad47ab640f21dc (patch)
treed0e7f166fca6a9cd854aea97669eb422ca390ed3 /tools/merge_fetch.py
parent96f10ffff8e2820c9523f8af71ac56996b6e31a4 (diff)
Fix a crash bug; improve IPC; use cycle().
Workers: - Consume all messages each round, just as control process does. Control process: - Use itertools.cycle for the circular list. - Fix bug where an entry could be fetched multiple times, crashing when updating fetch_dict.
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-xtools/merge_fetch.py80
1 files changed, 40 insertions, 40 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index a31143e..d5a514b 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -16,6 +16,7 @@ import logging
from time import sleep
from multiprocessing import Process, Pipe
from random import Random
+from itertools import cycle
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
hexencode, hexdecode, parse_args, perm, flock_ex_or_fail, Status, \
@@ -107,15 +108,18 @@ def merge_fetch_worker(args, localconfig, storagenode, pipe):
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
localconfig["nodename"]))
+ name = storagenode["name"]
+ address = storagenode["address"]
+ url = "https://%s/" % address
# NOTE: We should probably verifycert.communicate(struct.pack("I",0))
# to ask the verifycert process to quit nicely.
verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
to_fetch = set()
- timeout = max(3, args.mergeinterval / 10)
while True:
- if pipe.poll(timeout):
+ ## Read all messages from parent.
+ while pipe.poll(0):
msg = pipe.recv()
if len(msg) < 2:
continue
@@ -124,26 +128,26 @@ def merge_fetch_worker(args, localconfig, storagenode, pipe):
if cmd == 'FETCH':
to_fetch.add(ehash)
+ ## Fetch entries from node.
if to_fetch:
- logging.info("%s: fetching %d entries", storagenode["name"],
- len(to_fetch))
+ logging.info("%s: fetching %d entries", name, len(to_fetch))
with requests.sessions.Session() as session:
for chunk in chunks(list(to_fetch), 100):
- entries = get_entries(storagenode["name"],
- "https://%s/" % storagenode["address"],
- own_key, paths, chunk, session=session)
+ entries = get_entries(name, url, own_key, paths, chunk,
+ session=session)
for ehash in chunk:
entry = entries[ehash]
verify_entry(verifycert, entry, ehash)
pipe.send(('FETCHED', ehash, entry))
to_fetch.remove(ehash)
- new_entries = get_new_entries(storagenode["name"],
- "https://%s/" % storagenode["address"],
- own_key, paths)
- for ehash in new_entries:
+ ## Ask node for more entries.
+ for ehash in get_new_entries(name, url, own_key, paths):
pipe.send(('NEWENTRY', ehash))
+ ## Wait some.
+ sleep(max(3, args.mergeinterval / 10))
+
def term(signal, arg):
terminate_child_procs()
sys.exit(1)
@@ -180,12 +184,15 @@ def merge_fetch_parallel(args, config, localconfig):
entries_in_log = set(logorder) # Hashes are binary.
# Entries to fetch, kept in both a set and a dict. The dict is
# keyed on hashes (binary) and contains randomised lists of nodes
- # to fetch from.
+ # to fetch from. Note that the dict keeps entries until they're
+ # successfully fetched while the set is temporary within one
+ # iteration of the loop.
fetch_set = set()
fetch_dict = {}
+
while procs:
- # Poll worker processes and handle messages.
- assert(not fetch_set)
+ ## Poll worker processes and handle messages.
+ assert not fetch_set
newentry = []
for name, pipe, p in procs.values():
if not p.is_alive():
@@ -202,50 +209,43 @@ def merge_fetch_parallel(args, config, localconfig):
ehash = msg[1]
if cmd == 'NEWENTRY':
logging.info("NEWENTRY at %s: %s", name, hexencode(ehash))
- fetch_set.add(ehash)
+ if not ehash in fetch_dict: # Don't fetch twice.
+ fetch_set.add(ehash)
elif cmd == 'FETCHED':
if len(msg) != 3:
continue
entry = msg[2]
logging.info("FETCHED from %s: %s", name, hexencode(ehash))
- chainsdb.add(ehash, entry) # Commit later.
- newentry.append(ehash) # Writing to logorderfile later.
+ chainsdb.add(ehash, entry)
+ newentry.append(ehash) # Writing to logorderfile after loop.
logorder.append(hexencode(ehash))
entries_in_log.add(ehash)
- if ehash in fetch_set:
- fetch_set.remove(ehash)
del fetch_dict[ehash]
chainsdb.commit()
for ehash in newentry:
add_to_logorder(logorderfile, ehash)
fsync_logorder(logorderfile)
- # Ask workers to fetch entries.
+ ## Ask workers to fetch new entries.
logging.debug("nof entries to fetch including entries in log: %d",
len(fetch_set))
fetch_set -= entries_in_log
logging.info("entries to fetch: %d", len(fetch_set))
- # Add each entry in fetch_set to fetch_dict, key bing the hash
- # and value being a list of storage nodes, in randomised
- # order.
- for e in fetch_set:
- if not e in fetch_dict:
- l = procs.values()
- rand.shuffle(l)
- fetch_dict[e] = l
- # For each entry to fetch, treat its list of nodes as a
- # circular list and ask the one in the front to fetch the
+ # Add entries to be fetched to fetch_dict, with the hash as
+ # key and value being a cyclic iterator of list of storage
+ # nodes, in randomised order. Ask next node to fetch the
# entry.
while fetch_set:
- ehash = fetch_set.pop()
- nodes = fetch_dict[ehash]
- node = nodes.pop(0)
- fetch_dict[ehash] = nodes + [node]
- name, pipe, p = node
- logging.info("asking %s to fetch %s", name, hexencode(ehash))
- pipe.send(('FETCH', ehash))
-
- # Update the 'fetched' file.
+ e = fetch_set.pop()
+ if not e in fetch_dict:
+ l = list(procs.values())
+ rand.shuffle(l)
+ fetch_dict[e] = cycle(l)
+ name, pipe, _ = fetch_dict[e].next()
+ logging.info("asking %s to FETCH %s", name, hexencode(e))
+ pipe.send(('FETCH', e))
+
+ ## Update the 'fetched' file.
logsize = len(logorder)
if logsize == 0:
last_hash = ''
@@ -257,7 +257,7 @@ def merge_fetch_parallel(args, config, localconfig):
currentsizefilecontent = newcontent
write_file(currentsizefile, currentsizefilecontent)
- # Wait some.
+ ## Wait some.
sleep(1)
return 0