summaryrefslogtreecommitdiff
path: root/merge/src/merge_backup.erl
blob: bf20f238fe44c6870a303f60f3c35791391fe44b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
%%% Copyright (c) 2017, NORDUnet A/S.
%%% See LICENSE for licensing information.

-module(merge_backup).
-behaviour(gen_server).

-export([start_link/1]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
         code_change/3]).

-record(state, {
          timer :: reference(),
          node_name :: string(),
          node_address :: string()
         }).

start_link(Args) ->
    gen_server:start_link(?MODULE, Args, []).

init([Name, Address]) ->
    lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]),
    Timer = erlang:start_timer(1000, self(), backup),
    {ok, #state{timer = Timer, node_name = Name, node_address = Address}}.

handle_call(stop, _From, State) ->
    {stop, normal, stopped, State}.
handle_cast(_Request, State) ->
    {noreply, State}.

handle_info({timeout, _Timer, backup}, State) ->
    backup(merge_util:nfetched(), State).

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(Reason, #state{timer = Timer}) ->
    lager:info("~p terminating: ~p", [?MODULE, Reason]),
    erlang:cancel_timer(Timer),
    ok.

%%%%%%%%%%%%%%%%%%%%

backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) ->
    lager:debug("~p: logorder size ~B", [NodeName, Size]),
    ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards".
    try
        {ok, VerifiedSize} = verified_size(NodeAddress),
        lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]),
        case VerifiedSize == Size of
            true ->
                TreeHead = ht:root(Size - 1),
                ok = check_root(NodeAddress, Size, TreeHead),
                ok = write_backupfile(NodeName, Size, TreeHead);
            false ->
                true = VerifiedSize < Size, % Secondary ahead of primary?
                ok = do_backup(NodeName, NodeAddress, VerifiedSize, Size - VerifiedSize)
        end
    catch
        throw:{request_error, SubErrType, DebugTag, Error} ->
            lager:error("~s: ~p: ~p", [DebugTag, SubErrType, Error])
    end,
    Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 10)),
    {noreply,
     State#state{timer = erlang:start_timer(Wait * 1000, self(), backup)}}.

do_backup(_, _, _, 0) ->
    ok;
do_backup(NodeName, NodeAddress, Start, NTotal) ->
    N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)),
    Hashes = index:getrange(logorder, Start, Start + N - 1),
    ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)),
    {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress),
    HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded),
    ok = merge_util:sendentries(NodeAddress, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)),
    Size = Start + N,
    TreeHead = ht:root(Size - 1),
    ok = check_root(NodeAddress, Size, TreeHead),
    ok = setverifiedsize(NodeAddress, Size),
    ok = write_backupfile(NodeName, Size, TreeHead),
    true = NTotal >= N,
    do_backup(NodeName, NodeAddress, Size, NTotal - N).

write_backupfile(NodeName, TreeSize, TreeHead) ->
    {ok, BasePath} = application:get_env(plop, verified_path),
    Path = BasePath ++ "." ++ NodeName,
    Content = mochijson2:encode({[{"tree_size", TreeSize},
                                  {"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}),
    atomic:replacefile(Path, Content).

check_root(NodeAddress, Size, TreeHead) ->
    {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size),
    case TreeHeadToVerify == TreeHead of
        true ->
            ok;
        false ->
            lager:error("~p: ~B: secondary merge root ~p != ~p",
                        [NodeAddress, Size, TreeHeadToVerify, TreeHead]),
            root_mismatch
    end.

verifyroot(NodeAddress, TreeSize) ->
    DebugTag = io_lib:format("verifyroot ~B", [TreeSize]),
    URL = NodeAddress ++ "verifyroot",
    Headers = [{"Content-Type", "text/json"}],
    RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})),
    case merge_util:request(DebugTag, URL, Headers, RequestBody) of
        {<<"ok">>, PropList} ->
            {ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))};
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.

verified_size(NodeAddress) ->
    DebugTag = "verifiedsize",
    URL = NodeAddress ++ "verifiedsize",
    case merge_util:request(DebugTag, URL) of
        {<<"ok">>, PropList} ->
            {ok, proplists:get_value(<<"size">>, PropList)};
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.

setverifiedsize(NodeAddress, Size) ->
    DebugTag = io_lib:format("setverifiedsize ~B", [Size]),
    URL = NodeAddress ++ "setverifiedsize",
    Headers = [{"Content-Type", "text/json"}],
    RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})),
    case merge_util:request(DebugTag, URL, Headers, RequestBody) of
        {<<"ok">>, _} ->
            ok;
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.