diff options
Diffstat (limited to 'merge/src/merge_fetch_fetch.erl')
-rw-r--r-- | merge/src/merge_fetch_fetch.erl | 72 |
1 files changed, 67 insertions, 5 deletions
diff --git a/merge/src/merge_fetch_fetch.erl b/merge/src/merge_fetch_fetch.erl index b2fadd9..2ba7e9a 100644 --- a/merge/src/merge_fetch_fetch.erl +++ b/merge/src/merge_fetch_fetch.erl @@ -7,19 +7,81 @@ -export([start_link/1]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). +-export([loop/2]). start_link(Args) -> gen_server:start_link(?MODULE, Args, []). -init({Name, _Address}) -> - lager:info("~p:~p: starting", [?MODULE, Name]), - {ok, []}. +init({Name, Address}) -> + lager:info("~p:~p starting", [?MODULE, Name]), + ChildPid = spawn_link(?MODULE, loop, [Name, Address]), + {ok, ChildPid}. + +decode_entry({struct, EncodedEntry}) -> + Hash = base64:decode(proplists:get_value(<<"hash">>, EncodedEntry)), + Entry = base64:decode(proplists:get_value(<<"entry">>, EncodedEntry)), + {Hash, Entry}. + +get_entries(_, _, []) -> + {ok, []}; +get_entries(NodeName, NodeAddress, Hashes) -> + DebugTag = "getentry", + URL = NodeAddress ++ "getentry", + EncodedHashes = lists:map(fun (H) -> {"hash", base64:encode(H)} end, Hashes), + Params = hackney_url:qs(EncodedHashes), + URLWithParams = [URL, "?", Params], + case merge_util:request(DebugTag, binary_to_list(iolist_to_binary(URLWithParams)), NodeName) of + {<<"ok">>, PropList} -> + Entries = lists:map(fun (S) -> decode_entry(S) end, proplists:get_value(<<"entries">>, PropList)), + {ok, Entries}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +loop(Name, Address) -> + receive after 1000 -> ok end, + lager:info("~p:~p: asking for entries to get from ~p", + [?MODULE, Name, Address]), + Hashes = merge_fetch_ctrl:entriestofetch(Name, 10), + {ok, Entries} = get_entries(Name, Address, Hashes), + lists:foreach(fun ({Hash, Entry}) -> + + try + case plop:verify_entry(Entry) of + {ok, Hash} -> + ok; + {ok, DifferentLeafHash} -> + lager:error("leaf hash not correct: requested hash is ~p " ++ + "and contents are ~p", + [hex:bin_to_hexstr(Hash), + hex:bin_to_hexstr(DifferentLeafHash)]), + throw({request_error, result, "", differentleafhash}); + {error, Reason} -> + lager:error("verification failed: ~p", [Reason]), + throw({request_error, result, "", verificationfailed}) + end + catch + Type:What -> + [CrashFunction | Stack] = erlang:get_stacktrace(), + lager:error("Crash: ~p ~p~n~p~n~p~n", + [Type, What, CrashFunction, Stack]), + throw(What) + end, + + db:add_entry_sync(Hash, Entry), + + merge_fetch_ctrl:fetchstatus(Hash, success), + end, Entries), + loop(Name, Address). + %% TODO: if we crash here, we restart all of fetch -- spawn child proc %% for the actual fetching? -handle_call(stop, _From, State) -> - {stop, normal, stopped, State}. +handle_call(stop, _From, ChildPid) -> + lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]), + exit(ChildPid, stop), + {stop, normal, stopped, nil}. handle_cast(_Request, State) -> {noreply, State}. handle_info(_Info, State) -> |