summaryrefslogtreecommitdiff
path: root/merge/src
diff options
context:
space:
mode:
Diffstat (limited to 'merge/src')
-rw-r--r--merge/src/merge_fetch_ctrl.erl24
-rw-r--r--merge/src/merge_fetch_fetch.erl72
-rw-r--r--merge/src/merge_fetch_fetch_sup.erl30
-rw-r--r--merge/src/merge_fetch_newentries.erl26
-rw-r--r--merge/src/merge_sup.erl5
5 files changed, 144 insertions, 13 deletions
diff --git a/merge/src/merge_fetch_ctrl.erl b/merge/src/merge_fetch_ctrl.erl
index 6c055df..49fe043 100644
--- a/merge/src/merge_fetch_ctrl.erl
+++ b/merge/src/merge_fetch_ctrl.erl
@@ -69,7 +69,7 @@ handle_call({newentries, List, Node}, _From, #state{tofetch = ToFetch} = State)
{true, {Hash, undefined}}
end
end, List),
- {noreply, State#state{tofetch = [ToFetch | NewEntries]}};
+ {reply, ok, State#state{tofetch = ToFetch ++ NewEntries}};
handle_call({entriestofetch, Node, NMax}, _From,
#state{tofetch = ToFetch} = State) ->
@@ -79,12 +79,12 @@ handle_call({entriestofetch, Node, NMax}, _From,
handle_call({fetchstatus, Entry, success}, _From,
#state{tofetch = ToFetch} = State) ->
NewToFetch = fetch_success(ToFetch, Entry),
- {noreply, State#state{tofetch = NewToFetch}};
+ {reply, ok, State#state{tofetch = NewToFetch}};
handle_call({fetchstatus, Entry, failure}, _From,
#state{tofetch = ToFetch} = State) ->
NewToFetch = fetch_failure(ToFetch, Entry),
- {noreply, State#state{tofetch = NewToFetch}}.
+ {reply, ok, State#state{tofetch = NewToFetch}}.
handle_cast(_Request, State) ->
{noreply, State}.
@@ -97,8 +97,18 @@ terminate(_Reason, _State) ->
ok.
%%%%%%%%%%%%%%%%%%%%
+
+write_currentsize(CurrentSize, LastHash) ->
+ {ok, CurrentSizeFile} = application:get_env(plop, fetched_path),
+ CurrentSizeData = {struct, [{index, CurrentSize - 1}, {hash, list_to_binary(hex:bin_to_hexstr(LastHash))}]},
+ ok = atomic:replacefile(CurrentSizeFile, mochijson2:encode(CurrentSizeData)).
+
-spec fetch_success(list(), binary()) -> list().
fetch_success(ToFetch, Entry) ->
+ CurrentPos = db:indexsize(),
+ db:add_index_nosync_noreverse(Entry, CurrentPos),
+ db:index_sync(),
+ write_currentsize(CurrentPos + 1, Entry),
true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, []}),
keydelete(Entry, 1, ToFetch).
@@ -116,7 +126,7 @@ fetch_failure(ToFetch, Entry) ->
etf(ToFetch, Node, NMax) ->
etf(ToFetch, Node, NMax, [], []).
etf(ToFetchRest, _Node, 0, AccToFetch, AccEntries) ->
- {[reverse(AccToFetch) | ToFetchRest], reverse(AccEntries)};
+ {reverse(AccToFetch) ++ ToFetchRest, reverse(AccEntries)};
etf([], Node, _NMax, AccToFetch, AccEntries) ->
etf([], Node, 0, AccToFetch, AccEntries);
etf([H|T], Node, NMax, AccToFetch, AccEntries) ->
@@ -142,8 +152,10 @@ etf([H|T], Node, NMax, AccToFetch, AccEntries) ->
not_fetched(List) ->
filter(fun(H) -> case ets:match(?MISSINGENTRIES_TABLE, {H, '$1'}) of
- [[[]]] -> false; % Match: Empty list.
- _ -> true
+ [] ->
+ true; % Match: Empty list.
+ E ->
+ false
end
end,
List).
diff --git a/merge/src/merge_fetch_fetch.erl b/merge/src/merge_fetch_fetch.erl
index b2fadd9..2ba7e9a 100644
--- a/merge/src/merge_fetch_fetch.erl
+++ b/merge/src/merge_fetch_fetch.erl
@@ -7,19 +7,81 @@
-export([start_link/1]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
+-export([loop/2]).
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
-init({Name, _Address}) ->
- lager:info("~p:~p: starting", [?MODULE, Name]),
- {ok, []}.
+init({Name, Address}) ->
+ lager:info("~p:~p starting", [?MODULE, Name]),
+ ChildPid = spawn_link(?MODULE, loop, [Name, Address]),
+ {ok, ChildPid}.
+
+decode_entry({struct, EncodedEntry}) ->
+ Hash = base64:decode(proplists:get_value(<<"hash">>, EncodedEntry)),
+ Entry = base64:decode(proplists:get_value(<<"entry">>, EncodedEntry)),
+ {Hash, Entry}.
+
+get_entries(_, _, []) ->
+ {ok, []};
+get_entries(NodeName, NodeAddress, Hashes) ->
+ DebugTag = "getentry",
+ URL = NodeAddress ++ "getentry",
+ EncodedHashes = lists:map(fun (H) -> {"hash", base64:encode(H)} end, Hashes),
+ Params = hackney_url:qs(EncodedHashes),
+ URLWithParams = [URL, "?", Params],
+ case merge_util:request(DebugTag, binary_to_list(iolist_to_binary(URLWithParams)), NodeName) of
+ {<<"ok">>, PropList} ->
+ Entries = lists:map(fun (S) -> decode_entry(S) end, proplists:get_value(<<"entries">>, PropList)),
+ {ok, Entries};
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+loop(Name, Address) ->
+ receive after 1000 -> ok end,
+ lager:info("~p:~p: asking for entries to get from ~p",
+ [?MODULE, Name, Address]),
+ Hashes = merge_fetch_ctrl:entriestofetch(Name, 10),
+ {ok, Entries} = get_entries(Name, Address, Hashes),
+ lists:foreach(fun ({Hash, Entry}) ->
+
+ try
+ case plop:verify_entry(Entry) of
+ {ok, Hash} ->
+ ok;
+ {ok, DifferentLeafHash} ->
+ lager:error("leaf hash not correct: requested hash is ~p " ++
+ "and contents are ~p",
+ [hex:bin_to_hexstr(Hash),
+ hex:bin_to_hexstr(DifferentLeafHash)]),
+ throw({request_error, result, "", differentleafhash});
+ {error, Reason} ->
+ lager:error("verification failed: ~p", [Reason]),
+ throw({request_error, result, "", verificationfailed})
+ end
+ catch
+ Type:What ->
+ [CrashFunction | Stack] = erlang:get_stacktrace(),
+ lager:error("Crash: ~p ~p~n~p~n~p~n",
+ [Type, What, CrashFunction, Stack]),
+ throw(What)
+ end,
+
+ db:add_entry_sync(Hash, Entry),
+
+ merge_fetch_ctrl:fetchstatus(Hash, success),
+ end, Entries),
+ loop(Name, Address).
+
%% TODO: if we crash here, we restart all of fetch -- spawn child proc
%% for the actual fetching?
-handle_call(stop, _From, State) ->
- {stop, normal, stopped, State}.
+handle_call(stop, _From, ChildPid) ->
+ lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]),
+ exit(ChildPid, stop),
+ {stop, normal, stopped, nil}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
diff --git a/merge/src/merge_fetch_fetch_sup.erl b/merge/src/merge_fetch_fetch_sup.erl
new file mode 100644
index 0000000..fb89ab4
--- /dev/null
+++ b/merge/src/merge_fetch_fetch_sup.erl
@@ -0,0 +1,30 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_fetch_fetch_sup).
+-behaviour(supervisor).
+
+-export([start_link/1, init/1]).
+
+start_link(Nodes) ->
+ {ok, Pid} =
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []),
+ Children =
+ lists:map(fun({NodeName, NodeAddress}) ->
+ lager:info("starting fetch worker: ~p", [NodeName]),
+
+ {ok, Child} = supervisor:start_child(
+ ?MODULE,
+ [{NodeName, NodeAddress}]),
+ Child
+ end, Nodes),
+ lager:debug("~p started newentry workers: ~p", [Pid, Children]),
+ {ok, Pid}.
+
+init([]) ->
+ {ok,
+ {{simple_one_for_one, 3, 10},
+ [{ignored,
+ {merge_fetch_fetch, start_link, []},
+ permanent, 10000, worker,
+ [merge_fetch_fetch]}]}}.
diff --git a/merge/src/merge_fetch_newentries.erl b/merge/src/merge_fetch_newentries.erl
index b45aaec..befb3f7 100644
--- a/merge/src/merge_fetch_newentries.erl
+++ b/merge/src/merge_fetch_newentries.erl
@@ -21,10 +21,34 @@ handle_call(stop, _From, ChildPid) ->
exit(ChildPid, stop),
{stop, normal, stopped, nil}.
+get_newentries(NodeName, NodeAddress) ->
+ DebugTag = "fetchnewentries",
+ URL = NodeAddress ++ "fetchnewentries",
+ case merge_util:request(DebugTag, URL, NodeName) of
+ {<<"ok">>, PropList} ->
+ Entries = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"entries">>, PropList)),
+ {ok, Entries};
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
loop(Name, Address, Period) ->
+ receive after Period -> ok end,
lager:info("~p:~p: asking storage node at ~p for missing entries",
[?MODULE, Name, Address]),
- receive after Period -> ok end,
+ EntriesResult = try
+ get_newentries(Name, Address)
+ catch
+ throw:{request_error,request,Tag,Error2} ->
+ {error, Tag, Error2}
+ end,
+ case EntriesResult of
+ {ok, Entries} ->
+ lager:debug("got entries: ~p", [Entries]),
+ merge_fetch_ctrl:newentries(Entries, Name);
+ {error, _Tag, Error} ->
+ lager:info("failed to get entries from ~p: ~p", [Name, Error])
+ end,
loop(Name, Address, Period).
handle_cast(_Request, State) ->
diff --git a/merge/src/merge_sup.erl b/merge/src/merge_sup.erl
index a158077..7373de4 100644
--- a/merge/src/merge_sup.erl
+++ b/merge/src/merge_sup.erl
@@ -11,6 +11,7 @@ start_link(_Args) ->
init([]) ->
{ok, LogorderPath} = application:get_env(plop, index_path),
+ {ok, StorageNodes} = plopconfig:get_env(storage_nodes),
{ok,
{{one_for_one, 3, 10},
[
@@ -21,5 +22,7 @@ init([]) ->
{merge_dist_sup, {merge_dist_sup, start_link, [[]]},
transient, infinity, supervisor, [merge_dist_sup]},
{merge_sth, {merge_sth, start_link, [[]]},
- transient, 10000, worker, [merge_sth]}
+ transient, 10000, worker, [merge_sth]},
+ {merge_fetch_sup, {merge_fetch_sup, start_link, [StorageNodes]},
+ transient, 10000, worker, [merge_fetch]}
]}}.