%%% Copyright (c) 2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(merge_fetch_ctrl). -behaviour(gen_server). -export([start_link/1]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -export([newentries/2]). -export([entriestofetch/2, fetchstatus/2]). -import(lists, [keydelete/3, keymember/3, keyreplace/4, member/2, reverse/1, filter/2, split/2, filtermap/2]). -include_lib("eunit/include/eunit.hrl"). -define(MISSINGENTRIES_TABLE, missingentries). %% tofetch contains entries that are not already fetched. Each list %% member is on the form {Hash, BeingFetchedAt} with BeingFetchedAt %% being a node or undefined. -record(state, {tofetch :: [{binary(), atom()}]}). %%%%%%%%%%%%%%%%%%%% %% @doc Update the missingentries table and the tofetch list with new %% entries in List, available at Node. newentries(List, Node) -> %% not_fetched() is called here, in caller context, before calling %% the server. gen_server:call(?MODULE, {newentries, not_fetched(List), Node}). %% @doc Return list of entries for Node to fetch, but no more than %% NMAx of them. -spec entriestofetch(atom(), non_neg_integer()) -> list(). entriestofetch(Node, NMax) -> gen_server:call(?MODULE, {entriestofetch, Node, NMax}). %% Update the missingentries table and the tofetch list to reflect %% that the fetching of Entry has been either succesfull or has %% failed. -spec fetchstatus(binary(), success|failure) -> ok. fetchstatus(Entry, Status) -> % FIXME: Rename function. gen_server:call(?MODULE, {fetchstatus, Entry, Status}). %%%%%%%%%%%%%%%%%%%% start_link(Args) -> gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []). init(StorageNodes) -> lager:info("~p starting with storagenodes ~p", [?MODULE, StorageNodes]), ok = init_missingentries(protected), ok = init_fetchers(StorageNodes), {ok, #state{tofetch = []}}. handle_call(stop, _From, State) -> {stop, normal, stopped, State}; %% @doc Add entries in List to table (missingentries) and list (tofetch). %% NOTE: Traversing List twice. handle_call({newentries, List, Node}, _From, #state{tofetch = ToFetch} = State) -> [missingentries_update(H, Node) || H <- List], NewEntries = filtermap(fun(Hash) -> case keymember(Hash, 1, ToFetch) of true -> false; false -> {true, {Hash, undefined}} end end, List), {noreply, State#state{tofetch = [ToFetch | NewEntries]}}; handle_call({entriestofetch, Node, NMax}, _From, #state{tofetch = ToFetch} = State) -> {NewToFetch, Entries} = etf(ToFetch, Node, NMax), {reply, Entries, State#state{tofetch = NewToFetch}}; handle_call({fetchstatus, Entry, success}, _From, #state{tofetch = ToFetch} = State) -> NewToFetch = fetch_success(ToFetch, Entry), {noreply, State#state{tofetch = NewToFetch}}; handle_call({fetchstatus, Entry, failure}, _From, #state{tofetch = ToFetch} = State) -> NewToFetch = fetch_failure(ToFetch, Entry), {noreply, State#state{tofetch = NewToFetch}}. handle_cast(_Request, State) -> {noreply, State}. handle_info(_Info, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, _State) -> lager:info("~p terminating", [?MODULE]), ok. %%%%%%%%%%%%%%%%%%%% -spec fetch_success(list(), binary()) -> list(). fetch_success(ToFetch, Entry) -> true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, []}), keydelete(Entry, 1, ToFetch). -spec fetch_failure(list(), binary()) -> list(). fetch_failure(ToFetch, Entry) -> [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {Entry, '$1'}), true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, reverse([hd(Nodes) | reverse(tl(Nodes))])}), keyreplace(Entry, 1, ToFetch, {Entry, undefined}). %% @doc Return list of entries to fetch for Node and an updated %% tofetch list. -spec etf([{binary(), atom()}], atom(), non_neg_integer()) -> {[{binary(), atom()}], [binary()]}. etf(ToFetch, Node, NMax) -> etf(ToFetch, Node, NMax, [], []). etf(ToFetchRest, _Node, 0, AccToFetch, AccEntries) -> {[reverse(AccToFetch) | ToFetchRest], reverse(AccEntries)}; etf([], Node, _NMax, AccToFetch, AccEntries) -> etf([], Node, 0, AccToFetch, AccEntries); etf([H|T], Node, NMax, AccToFetch, AccEntries) -> {Entry, BeingFetchedBy} = H, {Acc1, Acc2} = case BeingFetchedBy of undefined -> [[PresentAtNodes]] = ets:match(?MISSINGENTRIES_TABLE, {Entry, '$1'}), case member(Node, PresentAtNodes) of true -> % Present at Node -- update and add. lager:debug("Good entry for node ~p: ~p", [Node, Entry]), {[{Entry, Node} | AccToFetch], [Entry | AccEntries]}; false -> % Not present at Node -- move along. lager:debug("Ignoring entry not @ ~p: ~p", [Node, Entry]), {[H | AccToFetch], AccEntries} end; _ -> % Already being fetched -- move along. lager:debug("Ignoring entry already being fetched: ~p", [Entry]), {[H | AccToFetch], AccEntries} end, etf(T, Node, NMax - 1, Acc1, Acc2). not_fetched(List) -> filter(fun(H) -> case ets:match(?MISSINGENTRIES_TABLE, {H, '$1'}) of [[[]]] -> false; % Match: Empty list. _ -> true end end, List). insert_at_random_pos(First, Elem, List) -> Last = length(List) + 1, case First >= Last of true -> List ++ [Elem]; false -> case crypto:rand_uniform(First, Last + 1) of Last -> List ++ [Elem]; N -> {L1, L2} = split(N - 1, List), L1 ++ [Elem] ++ L2 end end. %% @doc Update missingentries table Tab, return the list of nodes %% having the entry. -spec missingentries_update(binary(), atom()) -> [atom()]. missingentries_update(Hash, Node) -> case ets:match(?MISSINGENTRIES_TABLE, {Hash, '$1'}) of [] -> missingentries_add(Hash, [Node]); [[Nodes]] -> NewNodes = insert_at_random_pos(2, Node, Nodes), true = ets:update_element(?MISSINGENTRIES_TABLE, Hash, {2, NewNodes}), NewNodes end. %% @doc Add hash (Hash) [fixme remove: or list of hashes (List)] to missingentries %% table, with Nodes being the value. %% We're wrapping adding and testing existence of entries to the table %% since we're (soon to be) storing differently sized tuples, in order %% to save one word (8 octets on a 64-bit system) per entry (saves %% 800MB of RAM in a log with 100M entries). -spec missingentries_add(binary(), [atom()]) -> [atom()]. missingentries_add(Hash, Nodes) when is_binary(Hash) -> ets:insert(?MISSINGENTRIES_TABLE, {Hash, Nodes}), Nodes. -spec init_missingentries(ets:access()) -> ok | {error, term()}. init_missingentries(ETSAccess) -> case ets:info(?MISSINGENTRIES_TABLE) of undefined -> ok; _ -> ets:delete(?MISSINGENTRIES_TABLE) end, ets:new(?MISSINGENTRIES_TABLE, [set, named_table, ETSAccess]), add_entries(0, index:indexsize(logorder)). add_entries(_Start, 0) -> ok; add_entries(Start, Count) -> Chunksize = min(Count, 100000), Entries = index:getrange(logorder, Start, Start + Chunksize - 1), lists:foreach(fun (Entry) -> %% [] means entry has been fetched. missingentries_add(Entry, []) end, Entries), add_entries(Start + Chunksize, Count - Chunksize). init_fetchers(StorageNodes) -> lists:foreach(fun(StorageNode) -> spawn_link(merge_fetch_fetch, start_link, [StorageNode]) end, StorageNodes), ok. %%%%%%%%%%%%%%%%%%%% %% eunit tests. %% First few entries in test/testdata/merge/logorder, in binary form. -define(LOGORDER_1, hex:hexstr_to_bin("5806F2A019465B1BCA6694DDFF05F99307D3A25CB842123110221802B1CD1813")). -define(LOGORDER_2, hex:hexstr_to_bin("E87519AE80C0E1385D114FAF7E5BC9C4811FD0E0E155F28C026E25F0F14C7715")). -define(LOGORDER_3, hex:hexstr_to_bin("224B10E05E3A1E38D34A0122E4B844E838A9D671288345FC77D245246A8D822F")). -define(LOGORDER_4, hex:hexstr_to_bin("06529FA6ADA25188C527D610BE2FD6C16ED57796ACAA4CCA9B90D1FF2BE8ACD7")). test_setup() -> index:init_module(), index:start_link(logorder, "test/testdata/merge/logorder"), %% Table needs to be public in tests since this is run in a %% different process than the tests themselves. ok = init_missingentries(public), %% #1 is already fetched. missingentries_update(?LOGORDER_2, node1), % #2 at node1. missingentries_update(?LOGORDER_3, node1), % #3 at node1, missingentries_update(?LOGORDER_3, node2), % ... node2, missingentries_update(?LOGORDER_3, node3), % ... and node3. missingentries_update(?LOGORDER_4, node1), missingentries_update(?LOGORDER_4, node3), %% lager:debug("Nodes for LOGORDER_3: ~p", %% [element(2, hd(ets:match_object(Tab, {?LOGORDER_3, '$1'})))]), %% lager:debug("ETS size, bytes: ~p", %% [ets:info(Tab, memory) * %% erlang:system_info(wordsize)]), %% Return ToFetch list. [ {?LOGORDER_4, undefined}, % Not being fetched. {?LOGORDER_2, undefined}, % Not being fetched. {?LOGORDER_3, node1} % Currently being fetched by node1. ]. test_cleanup(_) -> true = ets:delete(?MISSINGENTRIES_TABLE), index:stop(logorder). init_test_() -> {setup, fun test_setup/0, fun test_cleanup/1, fun(_ToFetch) -> [ ?_assertEqual(1000, ets:info(?MISSINGENTRIES_TABLE, size)), ?_assertEqual(1, length(not_fetched([crypto:hash(sha256, <<>>)]))), ?_assertEqual(0, length(not_fetched([?LOGORDER_1]))), ?_assertEqual(1, length(not_fetched([?LOGORDER_2]))), ?_assertEqual(2, length(not_fetched([crypto:hash(sha256, <<>>), ?LOGORDER_1, ?LOGORDER_2]))), ?_assert(member(node1, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))), ?_assert(member(node2, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))), ?_assert(member(node3, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))) ] end}. etf_test_() -> {setup, fun test_setup/0, fun test_cleanup/1, fun(ToFetch) -> [ ?_assertEqual([?LOGORDER_4, ?LOGORDER_2], element(2, etf(ToFetch, node1, 10))), % node1 can fetch #2 and #4 (in reversed order). ?_assertEqual([], element(2, etf(ToFetch, node1, 0))), % Limit == 0. ?_assertEqual([], element(2, etf(ToFetch, node2, 10))), % node2 has nothing to fetch. ?_assertEqual([?LOGORDER_4], element(2, etf(ToFetch, node3, 10))) % node3 can fetch #4 (but only because we don't remember that #2 got it above). ] end}. fetchstatus_success_test_() -> {setup, fun test_setup/0, fun test_cleanup/1, fun(ToFetch) -> [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}), [ ?_assert(ets:member(?MISSINGENTRIES_TABLE, ?LOGORDER_3)), ?_assertEqual(3, length(Nodes)), ?_assertEqual([{?LOGORDER_4, undefined}, {?LOGORDER_2, undefined}], fetch_success(ToFetch, ?LOGORDER_3)), ?_assertEqual([[[]]], ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'})) ] end}. fetchstatus_failure_test_() -> {setup, fun test_setup/0, fun test_cleanup/1, fun(ToFetch) -> [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}), [ ?_assertEqual([{?LOGORDER_4, undefined}, {?LOGORDER_2, undefined}, {?LOGORDER_3, undefined}], fetch_failure(ToFetch, ?LOGORDER_3)), ?_assertEqual([[tl(Nodes) ++ [hd(Nodes)]]], ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'})) ] end}.