%%% Copyright (c) 2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(merge_fetch_fetch). -behaviour(gen_server). -export([start_link/1]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -export([loop/2]). start_link(Args) -> gen_server:start_link(?MODULE, Args, []). init({Name, Address}) -> lager:info("~p:~p starting", [?MODULE, Name]), ChildPid = spawn_link(?MODULE, loop, [Name, Address]), {ok, ChildPid}. decode_entry({struct, EncodedEntry}) -> Hash = base64:decode(proplists:get_value(<<"hash">>, EncodedEntry)), Entry = base64:decode(proplists:get_value(<<"entry">>, EncodedEntry)), {Hash, Entry}. get_entries(_, _, []) -> {ok, []}; get_entries(NodeName, NodeAddress, Hashes) -> DebugTag = "getentry", URL = NodeAddress ++ "getentry", EncodedHashes = lists:map(fun (H) -> {"hash", base64:encode(H)} end, Hashes), Params = hackney_url:qs(EncodedHashes), URLWithParams = [URL, "?", Params], case merge_util:request(DebugTag, binary_to_list(iolist_to_binary(URLWithParams)), NodeName) of {<<"ok">>, PropList} -> Entries = lists:map(fun (S) -> decode_entry(S) end, proplists:get_value(<<"entries">>, PropList)), {ok, Entries}; Err -> throw({request_error, result, DebugTag, Err}) end. loop(Name, Address) -> receive after 1000 -> ok end, lager:info("~p:~p: asking for entries to get from ~p", [?MODULE, Name, Address]), Hashes = merge_fetch_ctrl:entriestofetch(Name, 10), {ok, Entries} = get_entries(Name, Address, Hashes), lists:foreach(fun ({Hash, Entry}) -> try case plop:verify_entry(Entry) of {ok, Hash} -> ok; {ok, DifferentLeafHash} -> lager:error("leaf hash not correct: requested hash is ~p " ++ "and contents are ~p", [hex:bin_to_hexstr(Hash), hex:bin_to_hexstr(DifferentLeafHash)]), throw({request_error, result, "", differentleafhash}); {error, Reason} -> lager:error("verification failed: ~p", [Reason]), throw({request_error, result, "", verificationfailed}) end catch Type:What -> [CrashFunction | Stack] = erlang:get_stacktrace(), lager:error("Crash: ~p ~p~n~p~n~p~n", [Type, What, CrashFunction, Stack]), throw(What) end, db:add_entry_sync(Hash, Entry), merge_fetch_ctrl:fetchstatus(Hash, success), end, Entries), loop(Name, Address). %% TODO: if we crash here, we restart all of fetch -- spawn child proc %% for the actual fetching? handle_call(stop, _From, ChildPid) -> lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]), exit(ChildPid, stop), {stop, normal, stopped, nil}. handle_cast(_Request, State) -> {noreply, State}. handle_info(_Info, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, _State) -> lager:info("~p terminating", [?MODULE]), ok.