%%% Copyright (c) 2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(merge_dist). -behaviour(gen_server). -export([start_link/1]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -record(state, { timer :: reference(), node_address :: string(), sth_timestamp :: non_neg_integer() }). start_link(Args) -> gen_server:start_link(?MODULE, Args, []). init(Node) -> lager:info("~p:~p: starting", [?MODULE, Node]), Timer = erlang:start_timer(1000, self(), dist), {ok, #state{timer = Timer, node_address = Node, sth_timestamp = 0}}. handle_call(stop, _From, State) -> {stop, normal, stopped, State}. handle_cast(_Request, State) -> {noreply, State}. handle_info({timeout, _Timer, dist}, State) -> dist(plop:sth(), State). code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(Reason, #state{timer = Timer}) -> lager:info("~p terminating: ~p", [?MODULE, Reason]), erlang:cancel_timer(Timer), ok. %%%%%%%%%%%%%%%%%%%% dist(noentry, State) -> Timer = erlang:start_timer(1000, self(), dist), {noreply, State#state{timer = Timer}}; dist({struct, PropList} = STH, #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) -> Treesize = proplists:get_value(<<"tree_size">>, PropList), Timestamp = proplists:get_value(<<"timestamp">>, PropList), RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), Signature = base64:decode(proplists:get_value(<<"tree_head_signature">>, PropList)), Logordersize = index:indexsize(logorder), TS = case Timestamp > LastTimestamp of true -> true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature), try lager:info("~p: starting dist, sth at ~B, logorder at ~B", [NodeAddress, Treesize, Logordersize]), ok = do_dist(NodeAddress, min(Treesize, Logordersize)), ok = publish_sth(NodeAddress, STH), lager:info("~p: Published STH with size ~B and timestamp " ++ "~p.", [NodeAddress, Treesize, Timestamp]), Timestamp catch throw:{request_error, SubErrType, DebugTag, Error} -> lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error]), LastTimestamp end; false -> lager:debug("~p: STH timestamp ~p <= ~p, waiting.", [NodeAddress, Timestamp, LastTimestamp]), LastTimestamp end, Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 60)), {noreply, State#state{timer = erlang:start_timer(Wait * 1000, self(), dist), sth_timestamp = TS}}. %% @doc Has nonlocal return because of throw further down in %% merge_util:request/4. do_dist(NodeAddress, Size) -> {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]), true = VerifiedSize =< Size, do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize). do_dist(_, _, 0) -> ok; do_dist(NodeAddress, Start, NTotal) -> DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), N = min(DistMaxWindow, NTotal), Hashes = index:getrange(logorder, Start, Start + N - 1), SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), ok = merge_util:sendentries(NodeAddress, Hashes, SendentriesChunksize), {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N), lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Start]), true = NTotal >= NewSize - Start, do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)). frontend_get_verifiedsize(NodeAddress) -> frontend_verify_entries(NodeAddress, 0). frontend_verify_entries(NodeAddress, Size) -> DebugTag = io_lib:format("verify-entries ~B", [Size]), URL = NodeAddress ++ "verify-entries", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), case merge_util:request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"verified">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. publish_sth(NodeAddress, STH) -> DebugTag = "publish-sth", URL = NodeAddress ++ "publish-sth", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode(STH)), case merge_util:request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end.