summaryrefslogtreecommitdiff
path: root/merge/src/merge_dist.erl
blob: 25e13ec2f593fee7d7a8c7485c01f97eab8d5de5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
%%% Copyright (c) 2017, NORDUnet A/S.
%%% See LICENSE for licensing information.

-module(merge_dist).
-behaviour(gen_server).

-export([start_link/1]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
         code_change/3]).

-record(state, {
          timer :: reference(),
          node_address :: string(),
          sth_timestamp :: non_neg_integer()
         }).

start_link(Args) ->
    gen_server:start_link(?MODULE, Args, []).

init(Node) ->
    lager:info("~p:~p: starting", [?MODULE, Node]),
    Timer = erlang:start_timer(1000, self(), dist),
    {ok, #state{timer = Timer,
                node_address = Node,
                sth_timestamp = 0}}.

handle_call(stop, _From, State) ->
    {stop, normal, stopped, State}.
handle_cast(_Request, State) ->
    {noreply, State}.

handle_info({timeout, _Timer, dist}, State) ->
    dist(plop:sth(), State).

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(Reason, #state{timer = Timer}) ->
    lager:info("~p terminating: ~p", [?MODULE, Reason]),
    erlang:cancel_timer(Timer),
    ok.

%%%%%%%%%%%%%%%%%%%%

dist(noentry, State) ->
    Timer = erlang:start_timer(1000, self(), dist),
    {noreply, State#state{timer = Timer}};
dist({struct, PropList} = STH,
     #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) ->
    Treesize = proplists:get_value(<<"tree_size">>, PropList),
    Timestamp = proplists:get_value(<<"timestamp">>, PropList),
    RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)),
    Signature = base64:decode(proplists:get_value(<<"tree_head_signature">>, PropList)),
    Logordersize = index:indexsize(logorder),
    TS = case Timestamp > LastTimestamp of
             true ->
                 true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature),
                 try
                     lager:info("~p: starting dist, sth at ~B, logorder at ~B",
                                [NodeAddress, Treesize, Logordersize]),
                     ok = do_dist(NodeAddress, min(Treesize, Logordersize)),
                     ok = publish_sth(NodeAddress, STH),
                     lager:info("~p: Published STH with size ~B and timestamp " ++
                                    "~p.", [NodeAddress, Treesize, Timestamp]),
                     Timestamp
                 catch
                     throw:{request_error, SubErrType, DebugTag, Error} ->
                         lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error]),
                         LastTimestamp
                 end;
             false ->
                 lager:debug("~p: STH timestamp ~p <= ~p, waiting.",
                             [NodeAddress, Timestamp, LastTimestamp]),
                 LastTimestamp
         end,
    Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 60)),
    {noreply,
     State#state{timer = erlang:start_timer(Wait * 1000, self(), dist),
                 sth_timestamp = TS}}.

%% @doc Has nonlocal return because of throw further down in
%% merge_util:request/4.
do_dist(NodeAddress, Size) ->
    {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress),
    lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]),
    true = VerifiedSize =< Size,
    do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize).

do_dist(_, _, 0) ->
    ok;
do_dist(NodeAddress, Start, NTotal) ->
    DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000),
    N = min(DistMaxWindow, NTotal),
    Hashes = index:getrange(logorder, Start, Start + N - 1),
    SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000),
    SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100),
    ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize),
    ok = merge_util:sendentries(NodeAddress, Hashes, SendentriesChunksize),
    {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N),
    lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Start]),
    true = NTotal >= NewSize - Start,
    do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)).

frontend_get_verifiedsize(NodeAddress) ->
    frontend_verify_entries(NodeAddress, 0).

frontend_verify_entries(NodeAddress, Size) ->
    DebugTag = io_lib:format("verify-entries ~B", [Size]),
    URL = NodeAddress ++ "verify-entries",
    Headers = [{"Content-Type", "text/json"}],
    RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})),
    case merge_util:request(DebugTag, URL, Headers, RequestBody) of
        {<<"ok">>, PropList} ->
            {ok, proplists:get_value(<<"verified">>, PropList)};
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.

publish_sth(NodeAddress, STH) ->
    DebugTag = "publish-sth",
    URL = NodeAddress ++ "publish-sth",
    Headers = [{"Content-Type", "text/json"}],
    RequestBody = list_to_binary(mochijson2:encode(STH)),
    case merge_util:request(DebugTag, URL, Headers, RequestBody) of
        {<<"ok">>, _} ->
            ok;
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.