%%% Copyright (c) 2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(merge_dist). -behaviour(gen_server). -export([start_link/1]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -record(state, { timer :: reference(), node_address :: string(), sth_timestamp :: non_neg_integer() }). start_link(Args) -> gen_server:start_link(?MODULE, Args, []). init(Node) -> lager:info("~p:~p: starting", [?MODULE, Node]), Timer = erlang:start_timer(1000, self(), dist), {ok, #state{timer = Timer, node_address = Node, sth_timestamp = 0}}. handle_call(stop, _From, State) -> {stop, normal, stopped, State}. handle_cast(_Request, State) -> {noreply, State}. handle_info({timeout, _Timer, dist}, State) -> dist(plop:sth(), State). code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(Reason, #state{timer = Timer}) -> lager:info("~p terminating: ~p", [?MODULE, Reason]), erlang:cancel_timer(Timer), ok. %%%%%%%%%%%%%%%%%%%% dist(noentry, State) -> Timer = erlang:start_timer(1000, self(), dist), {noreply, State#state{timer = Timer}}; dist({struct, PropList} = STH, #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) -> Treesize = proplists:get_value(<<"tree_size">>, PropList), Timestamp = proplists:get_value(<<"timestamp">>, PropList), RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), Signature = base64:decode(proplists:get_value(<<"tree_head_signature">>, PropList)), TS = case Timestamp > LastTimestamp of true -> true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature), try ok = do_dist(NodeAddress, min(Treesize, index:indexsize(logorder))), ok = publish_sth(NodeAddress, STH), lager:info("~p: Published STH with size ~B and timestamp " ++ "~p.", [NodeAddress, Treesize, Timestamp]), Timestamp catch throw:{request_error, SubErrType, DebugTag, Error} -> lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error]), LastTimestamp end; false -> lager:debug("~p: STH timestamp ~p <= ~p, waiting.", [NodeAddress, Timestamp, LastTimestamp]), LastTimestamp end, Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 60)), {noreply, State#state{timer = erlang:start_timer(Wait * 1000, self(), dist), sth_timestamp = TS}}. %% @doc Has nonlocal return because of throw further down in do_request/4. do_dist(NodeAddress, Size) -> {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), true = Size >= VerifiedSize, do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize). do_dist(_, _, 0) -> ok; do_dist(NodeAddress, Size, NTotal) -> DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), N = min(DistMaxWindow, NTotal), Hashes = index:getrange(logorder, Size, Size + N - 1), ok = frontend_sendlog(NodeAddress, Size, Hashes), ok = frontend_send_missing_entries(NodeAddress, Hashes), {ok, NewSize} = frontend_verify_entries(NodeAddress, Size + N), lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Size]), true = NTotal >= NewSize - Size, do_dist(NodeAddress, NewSize, NTotal - (NewSize - Size)). frontend_sendlog(NodeAddress, Start, Hashes) -> SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), frontend_sendlog_chunk(NodeAddress, Start, lists:split(min(SendlogChunksize, length(Hashes)), Hashes), SendlogChunksize). frontend_sendlog_chunk(_, _, {[], _}, _) -> ok; frontend_sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> ok = frontend_sendlog_request(NodeAddress, Start, Chunk), frontend_sendlog_chunk(NodeAddress, Start + length(Chunk), lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). frontend_sendlog_request(NodeAddress, Start, Hashes) -> DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]), URL = NodeAddress ++ "sendlog", Headers = [{"Content-Type", "text/json"}], EncodedHashes = [base64:encode(H) || H <- Hashes], RequestBody = list_to_binary(mochijson2:encode({[{"start", Start}, {"hashes", EncodedHashes}]})), case do_request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end. frontend_send_missing_entries(NodeAddress, Hashes) -> SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), {ChunkOfHashes, RestOfHashes} = lists:split(min(SendentriesChunksize, length(Hashes)), Hashes), frontend_send_entries_chunk(NodeAddress, {ChunkOfHashes, RestOfHashes}, SendentriesChunksize). frontend_send_entries_chunk(_, {[], _}, _) -> ok; frontend_send_entries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> HashesAndEntries = lists:zip(Chunk, [db:entry_for_leafhash(H) || H <- Chunk]), ok = frontend_send_entries_request(NodeAddress, HashesAndEntries), frontend_send_entries_chunk(NodeAddress, lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). frontend_send_entries_request(NodeAddress, HashesAndEntries) -> DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]), URL = NodeAddress ++ "sendentry", Headers = [{"Content-Type", "text/json"}], L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]), RequestBody = list_to_binary(L), case do_request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end. frontend_get_verifiedsize(NodeAddress) -> frontend_verify_entries(NodeAddress, 0). frontend_verify_entries(NodeAddress, Size) -> DebugTag = io_lib:format("verify-entries ~B", [Size]), URL = NodeAddress ++ "verify-entries", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), case do_request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"verified">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. publish_sth(NodeAddress, STH) -> DebugTag = "publish-sth", URL = NodeAddress ++ "publish-sth", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode(STH)), case do_request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end. do_request(DebugTag, URL, Headers, RequestBody) -> case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of {error, Err} -> throw({request_error, request, DebugTag, Err}); {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> throw({request_error, failure, DebugTag, StatusCode}); {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> case (catch mochijson2:decode(Body)) of {error, Err} -> throw({request_error, decode, DebugTag, Err}); {struct, PropList} -> {proplists:get_value(<<"result">>, PropList), PropList} end end.