summaryrefslogtreecommitdiff
path: root/merge/src/merge_util.erl
blob: c76d05f03a0b053aff9b08a138cd5fdef17d6296 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
%%% Copyright (c) 2017, NORDUnet A/S.
%%% See LICENSE for licensing information.

-module(merge_util).
-export([sendlog/5, sendentries/4, missingentries/2]).
-export([request/3, request/5]).
-export([readfile/1, nfetched/0]).

request(DebugTag, URL, NodeName) ->
    request(DebugTag, URL, NodeName, [], <<>>).

request(DebugTag, URL, NodeName, Headers, RequestBody) ->
    case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of
        {error, Err} ->
            statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(io_lib:format("~w", [Err]))),
            throw({request_error, request, DebugTag, Err});
        {failure, {none, StatusCode, none}, _RespHeaders, _Body}  ->
            statusreport:report_multi("merge_errors", NodeName, "http_error", StatusCode),
            throw({request_error, failure, DebugTag, StatusCode});
        {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 ->
            case (catch mochijson2:decode(Body)) of
                {error, Err} ->
                    statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(Err)),
                    throw({request_error, decode, DebugTag, Err});
                {struct, PropList} ->
                    statusreport:report_multi("merge_errors", NodeName, "http_error", 200),
                    {proplists:get_value(<<"result">>, PropList), PropList}
            end
    end.

sendlog(NodeAddress, NodeName, Start, Hashes, Chunksize) ->
    lager:debug("sending log: start=~B, N=~B, chunksize=~B", [Start, length(Hashes), Chunksize]),
    sendlog_chunk(NodeAddress, NodeName, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize).

sendlog_chunk(_, _, _, {[], _}, _) ->
    ok;
sendlog_chunk(NodeAddress, NodeName, Start, {Chunk, Rest}, Chunksize) ->
    lager:debug("sending log chunk: start=~B, N=~B", [Start, length(Chunk)]),
    ok = sendlog_request(NodeAddress, NodeName, Start, Chunk),
    sendlog_chunk(NodeAddress, NodeName, Start + length(Chunk),
                  lists:split(min(Chunksize, length(Rest)), Rest), Chunksize).

sendlog_request(NodeAddress, NodeName, Start, Hashes) ->
    DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]),
    URL = NodeAddress ++ "sendlog",
    Headers = [{"Content-Type", "text/json"}],
    EncodedHashes = [base64:encode(H) || H <- Hashes],
    RequestBody = list_to_binary(mochijson2:encode({[{"start", Start},
                                                     {"hashes", EncodedHashes}]})),
    case request(DebugTag, URL, NodeName, Headers, RequestBody) of
        {<<"ok">>, _} ->
            ok;
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.

missingentries(NodeAddress, NodeName) ->
    DebugTag = "missingentries",
    URL = NodeAddress ++ "missingentries",
    case request(DebugTag, URL, NodeName) of
        {<<"ok">>, PropList} ->
            {ok, proplists:get_value(<<"entries">>, PropList)};
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.

sendentries(NodeAddress, NodeName, Hashes, Chunksize) ->
    lager:debug("sending entries: N=~B, chunksize=~B", [length(Hashes), Chunksize]),
    {ChunkOfHashes, RestOfHashes} = lists:split(min(Chunksize, length(Hashes)), Hashes),
    sendentries_chunk(NodeAddress, NodeName, {ChunkOfHashes, RestOfHashes}, Chunksize).

sendentries_chunk(_, _, {[], _}, _) ->
    ok;
sendentries_chunk(NodeAddress, NodeName, {Chunk, Rest}, Chunksize) ->
    lager:debug("sending entries chunk: N=~B", [length(Chunk)]),
    HashesAndEntries = lists:zip(Chunk, lists:map(fun db:entry_for_leafhash/1, Chunk)),
    case lists:keysearch(noentry, 2, HashesAndEntries) of
        false ->
            ok = sendentries_request(NodeAddress, NodeName, HashesAndEntries),
            sendentries_chunk(NodeAddress, NodeName,
                              lists:split(min(Chunksize, length(Rest)), Rest),
                              Chunksize);
        Missing ->
            lager:error("Entries not in db: ~p", [Missing]),
            {error, entrynotindb}
    end.

sendentries_request(NodeAddress, NodeName, HashesAndEntries) ->
    DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]),
    URL = NodeAddress ++ "sendentry",
    Headers = [{"Content-Type", "text/json"}],
    L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]),
    RequestBody = list_to_binary(L),
    case request(DebugTag, URL, NodeName, Headers, RequestBody) of
        {<<"ok">>, _} ->
            ok;
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.

readfile(FileInConfig) ->
    case application:get_env(plop, FileInConfig) of
        {ok, File} ->
            case atomic:readfile(File) of
                noentry ->
                    noentry;
                Contents ->
                    mochijson2:decode(Contents)
            end;
        undefined ->
            noentry
    end.

nfetched() ->
    {Index, Hash} = read_fetched(),
    case Index >= 0 of
        true ->
            ok = verify_logorder_entry(Index, binary_to_list(Hash));
        false ->
            ok
    end,
    Index + 1.

read_fetched() ->
    case merge_util:readfile(fetched_path) of
        noentry ->
            {-1, <<>>};
        {struct, PropList} ->
            {proplists:get_value(<<"index">>, PropList),
             proplists:get_value(<<"hash">>, PropList)}
    end.

verify_logorder_entry(Index, HashAsString) ->
    case hex:bin_to_hexstr(index:get(logorder, Index)) of
        HashAsString ->
            ok;
        Mismatch ->
            lager:error("hash in fetched file ~p doesn't match logorder[~B]=~p",
                        [HashAsString, Index, Mismatch]),
            mismatch
    end.