summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2017-07-06 18:15:08 +0200
committerLinus Nordberg <linus@nordu.net>2017-07-06 18:15:08 +0200
commit489df8ecaf16ca7429eb15b31ffbe6f686f5b0d1 (patch)
tree0bf75621b35d7cecd06f4c12d054e9f9ec47f7a6
parent05235680257a54f112c44c7854b3afe8300acd40 (diff)
-rw-r--r--merge/src/merge_fetch_ctrl.erl308
-rw-r--r--merge/src/merge_fetch_fetch.erl31
-rw-r--r--merge/src/merge_fetch_newentries.erl38
-rw-r--r--merge/src/merge_fetch_newentries_sup.erl30
-rw-r--r--merge/src/merge_fetch_sup.erl29
5 files changed, 436 insertions, 0 deletions
diff --git a/merge/src/merge_fetch_ctrl.erl b/merge/src/merge_fetch_ctrl.erl
new file mode 100644
index 0000000..6c055df
--- /dev/null
+++ b/merge/src/merge_fetch_ctrl.erl
@@ -0,0 +1,308 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_fetch_ctrl).
+-behaviour(gen_server).
+
+-export([start_link/1]).
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+-export([newentries/2]).
+-export([entriestofetch/2, fetchstatus/2]).
+
+-import(lists, [keydelete/3, keymember/3, keyreplace/4, member/2, reverse/1,
+ filter/2, split/2, filtermap/2]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(MISSINGENTRIES_TABLE, missingentries).
+
+%% tofetch contains entries that are not already fetched. Each list
+%% member is on the form {Hash, BeingFetchedAt} with BeingFetchedAt
+%% being a node or undefined.
+-record(state, {tofetch :: [{binary(), atom()}]}).
+
+%%%%%%%%%%%%%%%%%%%%
+%% @doc Update the missingentries table and the tofetch list with new
+%% entries in List, available at Node.
+newentries(List, Node) ->
+ %% not_fetched() is called here, in caller context, before calling
+ %% the server.
+ gen_server:call(?MODULE, {newentries, not_fetched(List), Node}).
+
+%% @doc Return list of entries for Node to fetch, but no more than
+%% NMAx of them.
+-spec entriestofetch(atom(), non_neg_integer()) -> list().
+entriestofetch(Node, NMax) ->
+ gen_server:call(?MODULE, {entriestofetch, Node, NMax}).
+
+%% Update the missingentries table and the tofetch list to reflect
+%% that the fetching of Entry has been either succesfull or has
+%% failed.
+-spec fetchstatus(binary(), success|failure) -> ok.
+fetchstatus(Entry, Status) -> % FIXME: Rename function.
+ gen_server:call(?MODULE, {fetchstatus, Entry, Status}).
+
+%%%%%%%%%%%%%%%%%%%%
+start_link(Args) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []).
+
+init(StorageNodes) ->
+ lager:info("~p starting with storagenodes ~p", [?MODULE, StorageNodes]),
+ ok = init_missingentries(protected),
+ ok = init_fetchers(StorageNodes),
+ {ok, #state{tofetch = []}}.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
+
+%% @doc Add entries in List to table (missingentries) and list (tofetch).
+%% NOTE: Traversing List twice.
+handle_call({newentries, List, Node}, _From, #state{tofetch = ToFetch} = State) ->
+ [missingentries_update(H, Node) || H <- List],
+ NewEntries =
+ filtermap(fun(Hash) ->
+ case keymember(Hash, 1, ToFetch) of
+ true ->
+ false;
+ false ->
+ {true, {Hash, undefined}}
+ end
+ end, List),
+ {noreply, State#state{tofetch = [ToFetch | NewEntries]}};
+
+handle_call({entriestofetch, Node, NMax}, _From,
+ #state{tofetch = ToFetch} = State) ->
+ {NewToFetch, Entries} = etf(ToFetch, Node, NMax),
+ {reply, Entries, State#state{tofetch = NewToFetch}};
+
+handle_call({fetchstatus, Entry, success}, _From,
+ #state{tofetch = ToFetch} = State) ->
+ NewToFetch = fetch_success(ToFetch, Entry),
+ {noreply, State#state{tofetch = NewToFetch}};
+
+handle_call({fetchstatus, Entry, failure}, _From,
+ #state{tofetch = ToFetch} = State) ->
+ NewToFetch = fetch_failure(ToFetch, Entry),
+ {noreply, State#state{tofetch = NewToFetch}}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+terminate(_Reason, _State) ->
+ lager:info("~p terminating", [?MODULE]),
+ ok.
+
+%%%%%%%%%%%%%%%%%%%%
+-spec fetch_success(list(), binary()) -> list().
+fetch_success(ToFetch, Entry) ->
+ true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, []}),
+ keydelete(Entry, 1, ToFetch).
+
+-spec fetch_failure(list(), binary()) -> list().
+fetch_failure(ToFetch, Entry) ->
+ [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {Entry, '$1'}),
+ true = ets:insert(?MISSINGENTRIES_TABLE, {Entry,
+ reverse([hd(Nodes) | reverse(tl(Nodes))])}),
+ keyreplace(Entry, 1, ToFetch, {Entry, undefined}).
+
+%% @doc Return list of entries to fetch for Node and an updated
+%% tofetch list.
+-spec etf([{binary(), atom()}], atom(), non_neg_integer()) ->
+ {[{binary(), atom()}], [binary()]}.
+etf(ToFetch, Node, NMax) ->
+ etf(ToFetch, Node, NMax, [], []).
+etf(ToFetchRest, _Node, 0, AccToFetch, AccEntries) ->
+ {[reverse(AccToFetch) | ToFetchRest], reverse(AccEntries)};
+etf([], Node, _NMax, AccToFetch, AccEntries) ->
+ etf([], Node, 0, AccToFetch, AccEntries);
+etf([H|T], Node, NMax, AccToFetch, AccEntries) ->
+ {Entry, BeingFetchedBy} = H,
+ {Acc1, Acc2} =
+ case BeingFetchedBy of
+ undefined ->
+ [[PresentAtNodes]] = ets:match(?MISSINGENTRIES_TABLE,
+ {Entry, '$1'}),
+ case member(Node, PresentAtNodes) of
+ true -> % Present at Node -- update and add.
+ lager:debug("Good entry for node ~p: ~p", [Node, Entry]),
+ {[{Entry, Node} | AccToFetch], [Entry | AccEntries]};
+ false -> % Not present at Node -- move along.
+ lager:debug("Ignoring entry not @ ~p: ~p", [Node, Entry]),
+ {[H | AccToFetch], AccEntries}
+ end;
+ _ -> % Already being fetched -- move along.
+ lager:debug("Ignoring entry already being fetched: ~p", [Entry]),
+ {[H | AccToFetch], AccEntries}
+ end,
+ etf(T, Node, NMax - 1, Acc1, Acc2).
+
+not_fetched(List) ->
+ filter(fun(H) -> case ets:match(?MISSINGENTRIES_TABLE, {H, '$1'}) of
+ [[[]]] -> false; % Match: Empty list.
+ _ -> true
+ end
+ end,
+ List).
+
+insert_at_random_pos(First, Elem, List) ->
+ Last = length(List) + 1,
+ case First >= Last of
+ true ->
+ List ++ [Elem];
+ false ->
+ case crypto:rand_uniform(First, Last + 1) of
+ Last ->
+ List ++ [Elem];
+ N ->
+ {L1, L2} = split(N - 1, List),
+ L1 ++ [Elem] ++ L2
+ end
+ end.
+
+%% @doc Update missingentries table Tab, return the list of nodes
+%% having the entry.
+-spec missingentries_update(binary(), atom()) -> [atom()].
+missingentries_update(Hash, Node) ->
+ case ets:match(?MISSINGENTRIES_TABLE, {Hash, '$1'}) of
+ [] ->
+ missingentries_add(Hash, [Node]);
+ [[Nodes]] ->
+ NewNodes = insert_at_random_pos(2, Node, Nodes),
+ true = ets:update_element(?MISSINGENTRIES_TABLE, Hash, {2, NewNodes}),
+ NewNodes
+ end.
+
+%% @doc Add hash (Hash) [fixme remove: or list of hashes (List)] to missingentries
+%% table, with Nodes being the value.
+%% We're wrapping adding and testing existence of entries to the table
+%% since we're (soon to be) storing differently sized tuples, in order
+%% to save one word (8 octets on a 64-bit system) per entry (saves
+%% 800MB of RAM in a log with 100M entries).
+-spec missingentries_add(binary(), [atom()]) -> [atom()].
+missingentries_add(Hash, Nodes) when is_binary(Hash) ->
+ ets:insert(?MISSINGENTRIES_TABLE, {Hash, Nodes}),
+ Nodes.
+
+-spec init_missingentries(ets:access()) -> ok | {error, term()}.
+init_missingentries(ETSAccess) ->
+ case ets:info(?MISSINGENTRIES_TABLE) of
+ undefined -> ok;
+ _ -> ets:delete(?MISSINGENTRIES_TABLE)
+ end,
+ ets:new(?MISSINGENTRIES_TABLE, [set, named_table, ETSAccess]),
+ add_entries(0, index:indexsize(logorder)).
+
+add_entries(_Start, 0) ->
+ ok;
+add_entries(Start, Count) ->
+ Chunksize = min(Count, 100000),
+ Entries = index:getrange(logorder, Start, Start + Chunksize - 1),
+ lists:foreach(fun (Entry) ->
+ %% [] means entry has been fetched.
+ missingentries_add(Entry, [])
+ end, Entries),
+ add_entries(Start + Chunksize, Count - Chunksize).
+
+init_fetchers(StorageNodes) ->
+ lists:foreach(fun(StorageNode) ->
+ spawn_link(merge_fetch_fetch, start_link,
+ [StorageNode])
+ end, StorageNodes),
+ ok.
+
+%%%%%%%%%%%%%%%%%%%%
+%% eunit tests.
+
+%% First few entries in test/testdata/merge/logorder, in binary form.
+-define(LOGORDER_1, hex:hexstr_to_bin("5806F2A019465B1BCA6694DDFF05F99307D3A25CB842123110221802B1CD1813")).
+-define(LOGORDER_2, hex:hexstr_to_bin("E87519AE80C0E1385D114FAF7E5BC9C4811FD0E0E155F28C026E25F0F14C7715")).
+-define(LOGORDER_3, hex:hexstr_to_bin("224B10E05E3A1E38D34A0122E4B844E838A9D671288345FC77D245246A8D822F")).
+-define(LOGORDER_4, hex:hexstr_to_bin("06529FA6ADA25188C527D610BE2FD6C16ED57796ACAA4CCA9B90D1FF2BE8ACD7")).
+
+test_setup() ->
+ index:init_module(),
+ index:start_link(logorder, "test/testdata/merge/logorder"),
+
+ %% Table needs to be public in tests since this is run in a
+ %% different process than the tests themselves.
+ ok = init_missingentries(public),
+
+ %% #1 is already fetched.
+ missingentries_update(?LOGORDER_2, node1), % #2 at node1.
+ missingentries_update(?LOGORDER_3, node1), % #3 at node1,
+ missingentries_update(?LOGORDER_3, node2), % ... node2,
+ missingentries_update(?LOGORDER_3, node3), % ... and node3.
+ missingentries_update(?LOGORDER_4, node1),
+ missingentries_update(?LOGORDER_4, node3),
+
+ %% lager:debug("Nodes for LOGORDER_3: ~p",
+ %% [element(2, hd(ets:match_object(Tab, {?LOGORDER_3, '$1'})))]),
+ %% lager:debug("ETS size, bytes: ~p",
+ %% [ets:info(Tab, memory) *
+ %% erlang:system_info(wordsize)]),
+
+ %% Return ToFetch list.
+ [
+ {?LOGORDER_4, undefined}, % Not being fetched.
+ {?LOGORDER_2, undefined}, % Not being fetched.
+ {?LOGORDER_3, node1} % Currently being fetched by node1.
+ ].
+
+test_cleanup(_) ->
+ true = ets:delete(?MISSINGENTRIES_TABLE),
+ index:stop(logorder).
+
+init_test_() ->
+ {setup, fun test_setup/0, fun test_cleanup/1,
+ fun(_ToFetch) ->
+ [
+ ?_assertEqual(1000, ets:info(?MISSINGENTRIES_TABLE, size)),
+ ?_assertEqual(1, length(not_fetched([crypto:hash(sha256, <<>>)]))),
+ ?_assertEqual(0, length(not_fetched([?LOGORDER_1]))),
+ ?_assertEqual(1, length(not_fetched([?LOGORDER_2]))),
+ ?_assertEqual(2, length(not_fetched([crypto:hash(sha256, <<>>), ?LOGORDER_1, ?LOGORDER_2]))),
+ ?_assert(member(node1, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))),
+ ?_assert(member(node2, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))),
+ ?_assert(member(node3, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'})))))
+ ]
+ end}.
+
+etf_test_() ->
+ {setup, fun test_setup/0, fun test_cleanup/1,
+ fun(ToFetch) ->
+ [
+ ?_assertEqual([?LOGORDER_4, ?LOGORDER_2], element(2, etf(ToFetch, node1, 10))), % node1 can fetch #2 and #4 (in reversed order).
+ ?_assertEqual([], element(2, etf(ToFetch, node1, 0))), % Limit == 0.
+ ?_assertEqual([], element(2, etf(ToFetch, node2, 10))), % node2 has nothing to fetch.
+ ?_assertEqual([?LOGORDER_4], element(2, etf(ToFetch, node3, 10))) % node3 can fetch #4 (but only because we don't remember that #2 got it above).
+ ]
+ end}.
+
+fetchstatus_success_test_() ->
+ {setup, fun test_setup/0, fun test_cleanup/1,
+ fun(ToFetch) ->
+ [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}),
+ [
+ ?_assert(ets:member(?MISSINGENTRIES_TABLE, ?LOGORDER_3)),
+ ?_assertEqual(3, length(Nodes)),
+ ?_assertEqual([{?LOGORDER_4, undefined}, {?LOGORDER_2, undefined}],
+ fetch_success(ToFetch, ?LOGORDER_3)),
+ ?_assertEqual([[[]]], ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))
+ ]
+ end}.
+
+fetchstatus_failure_test_() ->
+ {setup, fun test_setup/0, fun test_cleanup/1,
+ fun(ToFetch) ->
+ [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}),
+ [
+ ?_assertEqual([{?LOGORDER_4, undefined}, {?LOGORDER_2, undefined}, {?LOGORDER_3, undefined}],
+ fetch_failure(ToFetch, ?LOGORDER_3)),
+ ?_assertEqual([[tl(Nodes) ++ [hd(Nodes)]]],
+ ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))
+ ]
+ end}.
diff --git a/merge/src/merge_fetch_fetch.erl b/merge/src/merge_fetch_fetch.erl
new file mode 100644
index 0000000..b2fadd9
--- /dev/null
+++ b/merge/src/merge_fetch_fetch.erl
@@ -0,0 +1,31 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_fetch_fetch).
+-behaviour(gen_server).
+
+-export([start_link/1]).
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
+
+init({Name, _Address}) ->
+ lager:info("~p:~p: starting", [?MODULE, Name]),
+ {ok, []}.
+
+%% TODO: if we crash here, we restart all of fetch -- spawn child proc
+%% for the actual fetching?
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State}.
+handle_cast(_Request, State) ->
+ {noreply, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+terminate(_Reason, _State) ->
+ lager:info("~p terminating", [?MODULE]),
+ ok.
diff --git a/merge/src/merge_fetch_newentries.erl b/merge/src/merge_fetch_newentries.erl
new file mode 100644
index 0000000..b45aaec
--- /dev/null
+++ b/merge/src/merge_fetch_newentries.erl
@@ -0,0 +1,38 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_fetch_newentries).
+-behaviour(gen_server).
+
+-export([start_link/1, loop/3]).
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
+
+init({Name, Address, Period}) ->
+ lager:info("~p:~p starting", [?MODULE, Name]),
+ ChildPid = spawn_link(?MODULE, loop, [Name, Address, Period]),
+ {ok, ChildPid}.
+
+handle_call(stop, _From, ChildPid) ->
+ lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]),
+ exit(ChildPid, stop),
+ {stop, normal, stopped, nil}.
+
+loop(Name, Address, Period) ->
+ lager:info("~p:~p: asking storage node at ~p for missing entries",
+ [?MODULE, Name, Address]),
+ receive after Period -> ok end,
+ loop(Name, Address, Period).
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+terminate(_Reason, _State) ->
+ lager:info("~p terminating", [?MODULE]),
+ ok.
diff --git a/merge/src/merge_fetch_newentries_sup.erl b/merge/src/merge_fetch_newentries_sup.erl
new file mode 100644
index 0000000..bd33e60
--- /dev/null
+++ b/merge/src/merge_fetch_newentries_sup.erl
@@ -0,0 +1,30 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_fetch_newentries_sup).
+-behaviour(supervisor).
+
+-export([start_link/1, init/1]).
+
+start_link(Nodes) ->
+ {ok, Pid} =
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []),
+ Children =
+ lists:map(fun({NodeName, NodeAddress}) ->
+ lager:info("starting newentry worker: ~p", [NodeName]),
+
+ {ok, Child} = supervisor:start_child(
+ ?MODULE,
+ [{NodeName, NodeAddress, 3000}]),
+ Child
+ end, Nodes),
+ lager:debug("~p started newentry workers: ~p", [Pid, Children]),
+ {ok, Pid}.
+
+init([]) ->
+ {ok,
+ {{simple_one_for_one, 3, 10},
+ [{ignored,
+ {merge_fetch_newentries, start_link, []},
+ permanent, 10000, worker,
+ [merge_fetch_newentries]}]}}.
diff --git a/merge/src/merge_fetch_sup.erl b/merge/src/merge_fetch_sup.erl
new file mode 100644
index 0000000..6dd1735
--- /dev/null
+++ b/merge/src/merge_fetch_sup.erl
@@ -0,0 +1,29 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_fetch_sup).
+-behaviour(supervisor).
+
+-export([start_link/1, init/1]).
+
+start_link(Args) ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
+
+init(StorageNodes) ->
+ lager:debug("starting with storage nodes: ~p", [StorageNodes]),
+ {ok,
+ {{one_for_all, 3, 10},
+ [
+ {merge_fetch_newentries_sup,
+ {merge_fetch_newentries_sup, start_link, [StorageNodes]},
+ transient, infinity, supervisor,
+ [merge_fetch_newentries_sup]},
+ {merge_fetch_ctrl,
+ {merge_fetch_ctrl, start_link, [StorageNodes]},
+ permanent, 10000, worker,
+ [merge_fetch_ctrl]},
+ {merge_fetch_fetch_sup,
+ {merge_fetch_fetch_sup, start_link, [StorageNodes]},
+ transient, infinity, supervisor,
+ [merge_fetch_fetch_sup]}
+ ]}}.