diff options
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-x | tools/merge_fetch.py | 87 |
1 files changed, 55 insertions, 32 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index 5fdb7d0..c66dc03 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -103,6 +103,12 @@ def merge_fetch_sequenced(args, config, localconfig): else: return (tree_size, logorder[tree_size-1]) +def read_parent_messages(pipe, to_fetch): + while pipe.poll(): + cmd, ehash = pipe.recv() + if cmd == 'FETCH': + to_fetch.add(ehash) + def merge_fetch_worker(args, localconfig, storagenode, pipe): paths = localconfig["paths"] own_key = (localconfig["nodename"], @@ -119,10 +125,7 @@ def merge_fetch_worker(args, localconfig, storagenode, pipe): to_fetch = set() while True: ## Read all messages from parent. - while pipe.poll(): - cmd, ehash = pipe.recv() - if cmd == 'FETCH': - to_fetch.add(ehash) + read_parent_messages(pipe, to_fetch) ## Fetch entries from node. if to_fetch: @@ -136,13 +139,16 @@ def merge_fetch_worker(args, localconfig, storagenode, pipe): verify_entry(verifycert, entry, ehash) pipe.send(('FETCHED', ehash, entry)) to_fetch.remove(ehash) + read_parent_messages(pipe, to_fetch) # Drain pipe. ## Ask node for more entries. for ehash in get_new_entries(name, url, own_key, paths): pipe.send(('NEWENTRY', ehash)) + read_parent_messages(pipe, to_fetch) # Drain pipe. - ## Wait some. - sleep(max(3, args.mergeinterval / 10)) + ## Wait some if nothing to do. + if not to_fetch: + sleep(max(3, args.mergeinterval / 10)) def term(signal, arg): terminate_child_procs() @@ -158,6 +164,34 @@ def newworker(name, args): logging.debug("%s started, pid %d", name, p.pid) return (name, my_conn, p) +def read_worker_messages(procs, messages, args, localconfig): + for name, pipe, p, storagenode in procs.values(): + if not p.is_alive(): + logging.warning("%s is gone, restarting", name) + procs[name] = \ + newworker(name, [args, localconfig, storagenode]) + (storagenode,) + continue + while pipe.poll(): + messages.append((name, pipe.recv())) + +def process_worker_message(name, msg, fetch_dict, fetch_set, chainsdb, newentry, + logorder, entries_in_log): + cmd = msg[0] + ehash = msg[1] + if cmd == 'NEWENTRY': + logging.info("NEWENTRY at %s: %s", name, hexencode(ehash)) + if not ehash in fetch_dict: # Don't fetch twice. + fetch_set.add(ehash) + elif cmd == 'FETCHED': + entry = msg[2] + logging.info("FETCHED from %s: %s", name, hexencode(ehash)) + chainsdb.add(ehash, entry) + newentry.append(ehash) # Writing to logorderfile after loop. + logorder.append(hexencode(ehash)) + entries_in_log.add(ehash) + if ehash in fetch_dict: + del fetch_dict[ehash] + def merge_fetch_parallel(args, config, localconfig): paths = localconfig["paths"] storagenodes = config["storagenodes"] @@ -187,34 +221,21 @@ def merge_fetch_parallel(args, config, localconfig): fetch_set = set() fetch_dict = {} + messages = [] + while procs: ## Poll worker processes and handle messages. assert not fetch_set newentry = [] - for name, pipe, p, storagenode in procs.values(): - if not p.is_alive(): - logging.warning("%s is gone, restarting", name) - procs[name] = \ - newworker(name, - [args, localconfig, storagenode]) + (storagenode,) - continue - logging.info("polling %s", name) - while pipe.poll(): - msg = pipe.recv() - cmd = msg[0] - ehash = msg[1] - if cmd == 'NEWENTRY': - logging.info("NEWENTRY at %s: %s", name, hexencode(ehash)) - if not ehash in fetch_dict: # Don't fetch twice. - fetch_set.add(ehash) - elif cmd == 'FETCHED': - entry = msg[2] - logging.info("FETCHED from %s: %s", name, hexencode(ehash)) - chainsdb.add(ehash, entry) - newentry.append(ehash) # Writing to logorderfile after loop. - logorder.append(hexencode(ehash)) - entries_in_log.add(ehash) - del fetch_dict[ehash] + + # Drain pipe, then process messages. + read_worker_messages(procs, messages, args, localconfig) + for name, msg in messages: + process_worker_message(name, msg, fetch_dict, fetch_set, chainsdb, + newentry, logorder, entries_in_log) + messages = [] + + # Commit to chains database and update 'logorder' file. chainsdb.commit() for ehash in newentry: add_to_logorder(logorderfile, ehash) @@ -238,6 +259,7 @@ def merge_fetch_parallel(args, config, localconfig): name, pipe, _, _ = fetch_dict[e].next() logging.info("asking %s to FETCH %s", name, hexencode(e)) pipe.send(('FETCH', e)) + read_worker_messages(procs, messages, args, localconfig) # Drain pipe. ## Update the 'fetched' file. logsize = len(logorder) @@ -251,8 +273,9 @@ def merge_fetch_parallel(args, config, localconfig): currentsizefilecontent = newcontent write_file(currentsizefile, currentsizefilecontent) - ## Wait some. - sleep(1) + ## Wait some if nothing to do. + if not messages: + sleep(1) return 0 |