summaryrefslogtreecommitdiff
path: root/merge/src/merge_fetch_fetch.erl
diff options
context:
space:
mode:
Diffstat (limited to 'merge/src/merge_fetch_fetch.erl')
-rw-r--r--merge/src/merge_fetch_fetch.erl72
1 files changed, 67 insertions, 5 deletions
diff --git a/merge/src/merge_fetch_fetch.erl b/merge/src/merge_fetch_fetch.erl
index b2fadd9..2ba7e9a 100644
--- a/merge/src/merge_fetch_fetch.erl
+++ b/merge/src/merge_fetch_fetch.erl
@@ -7,19 +7,81 @@
-export([start_link/1]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
+-export([loop/2]).
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
-init({Name, _Address}) ->
- lager:info("~p:~p: starting", [?MODULE, Name]),
- {ok, []}.
+init({Name, Address}) ->
+ lager:info("~p:~p starting", [?MODULE, Name]),
+ ChildPid = spawn_link(?MODULE, loop, [Name, Address]),
+ {ok, ChildPid}.
+
+decode_entry({struct, EncodedEntry}) ->
+ Hash = base64:decode(proplists:get_value(<<"hash">>, EncodedEntry)),
+ Entry = base64:decode(proplists:get_value(<<"entry">>, EncodedEntry)),
+ {Hash, Entry}.
+
+get_entries(_, _, []) ->
+ {ok, []};
+get_entries(NodeName, NodeAddress, Hashes) ->
+ DebugTag = "getentry",
+ URL = NodeAddress ++ "getentry",
+ EncodedHashes = lists:map(fun (H) -> {"hash", base64:encode(H)} end, Hashes),
+ Params = hackney_url:qs(EncodedHashes),
+ URLWithParams = [URL, "?", Params],
+ case merge_util:request(DebugTag, binary_to_list(iolist_to_binary(URLWithParams)), NodeName) of
+ {<<"ok">>, PropList} ->
+ Entries = lists:map(fun (S) -> decode_entry(S) end, proplists:get_value(<<"entries">>, PropList)),
+ {ok, Entries};
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+loop(Name, Address) ->
+ receive after 1000 -> ok end,
+ lager:info("~p:~p: asking for entries to get from ~p",
+ [?MODULE, Name, Address]),
+ Hashes = merge_fetch_ctrl:entriestofetch(Name, 10),
+ {ok, Entries} = get_entries(Name, Address, Hashes),
+ lists:foreach(fun ({Hash, Entry}) ->
+
+ try
+ case plop:verify_entry(Entry) of
+ {ok, Hash} ->
+ ok;
+ {ok, DifferentLeafHash} ->
+ lager:error("leaf hash not correct: requested hash is ~p " ++
+ "and contents are ~p",
+ [hex:bin_to_hexstr(Hash),
+ hex:bin_to_hexstr(DifferentLeafHash)]),
+ throw({request_error, result, "", differentleafhash});
+ {error, Reason} ->
+ lager:error("verification failed: ~p", [Reason]),
+ throw({request_error, result, "", verificationfailed})
+ end
+ catch
+ Type:What ->
+ [CrashFunction | Stack] = erlang:get_stacktrace(),
+ lager:error("Crash: ~p ~p~n~p~n~p~n",
+ [Type, What, CrashFunction, Stack]),
+ throw(What)
+ end,
+
+ db:add_entry_sync(Hash, Entry),
+
+ merge_fetch_ctrl:fetchstatus(Hash, success),
+ end, Entries),
+ loop(Name, Address).
+
%% TODO: if we crash here, we restart all of fetch -- spawn child proc
%% for the actual fetching?
-handle_call(stop, _From, State) ->
- {stop, normal, stopped, State}.
+handle_call(stop, _From, ChildPid) ->
+ lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]),
+ exit(ChildPid, stop),
+ {stop, normal, stopped, nil}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->