summaryrefslogtreecommitdiff
path: root/merge
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2017-01-27 15:11:42 +0100
committerLinus Nordberg <linus@nordu.net>2017-02-01 11:14:41 +0100
commit829ab97fccb991832445862ec8246197a225ecec (patch)
tree8152c9e6855537661e7b1521ab45df62fd825e15 /merge
parent9d3d360e9bde66c0b2c37a196635bcfe26872ebe (diff)
Parallelised merge, distribution phase.
Diffstat (limited to 'merge')
-rw-r--r--merge/ebin/merge.app13
-rw-r--r--merge/src/merge_app.erl12
-rw-r--r--merge/src/merge_dist.erl191
-rw-r--r--merge/src/merge_dist_sup.erl29
-rw-r--r--merge/src/merge_sup.erl21
5 files changed, 266 insertions, 0 deletions
diff --git a/merge/ebin/merge.app b/merge/ebin/merge.app
new file mode 100644
index 0000000..2dfbda7
--- /dev/null
+++ b/merge/ebin/merge.app
@@ -0,0 +1,13 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+%%% Application resource file for merge, see app(5).
+
+{application, merge,
+ [{description, "Plop merge"},
+ {vsn, "0.10.0-dev"},
+ {modules, [merge_app, merge_dist, merge_dist_sup, merge_sup]},
+ {applications, [kernel, stdlib, lager, plop]},
+ {registered, [merge_dist, merge_dist_sup, merge_sup]},
+ {mod, {merge_app, []}}
+ ]}.
diff --git a/merge/src/merge_app.erl b/merge/src/merge_app.erl
new file mode 100644
index 0000000..cd68d81
--- /dev/null
+++ b/merge/src/merge_app.erl
@@ -0,0 +1,12 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(normal, Args) ->
+ merge_sup:start_link(Args).
+
+stop(_State) ->
+ ok.
diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl
new file mode 100644
index 0000000..8d3dc2b
--- /dev/null
+++ b/merge/src/merge_dist.erl
@@ -0,0 +1,191 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_dist).
+-behaviour(gen_server).
+
+-export([start_link/1]).
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+
+-record(state, {
+ timer :: reference(),
+ node_address :: string(),
+ sth_timestamp :: non_neg_integer()
+ }).
+
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
+
+init(Node) ->
+ lager:info("~p:~p: starting", [?MODULE, Node]),
+ Timer = erlang:start_timer(1000, self(), dist),
+ {ok, #state{timer = Timer,
+ node_address = Node,
+ sth_timestamp = 0}}.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State}.
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({timeout, _Timer, dist}, State) ->
+ dist(plop:sth(), State).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(Reason, #state{timer = Timer}) ->
+ lager:info("~p terminating: ~p", [?MODULE, Reason]),
+ erlang:cancel_timer(Timer),
+ ok.
+
+%%%%%%%%%%%%%%%%%%%%
+
+dist(noentry, State) ->
+ Timer = erlang:start_timer(1000, self(), dist),
+ {noreply, State#state{timer = Timer}};
+dist({struct, PropList} = STH,
+ #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) ->
+ Treesize = proplists:get_value(<<"tree_size">>, PropList),
+ Timestamp = proplists:get_value(<<"timestamp">>, PropList),
+ RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)),
+ Signature = base64:decode(proplists:get_value(<<"tree_head_signature">>, PropList)),
+ TS = case Timestamp > LastTimestamp of
+ true ->
+ true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature),
+ try
+ ok = do_dist(NodeAddress, min(Treesize, index:indexsize(logorder))),
+ ok = publish_sth(NodeAddress, STH),
+ lager:info("~p: Published STH with size ~B and timestamp " ++
+ "~p.", [NodeAddress, Treesize, Timestamp]),
+ Timestamp
+ catch
+ throw:{request_error, SubErrType, DebugTag, Error} ->
+ lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error]),
+ LastTimestamp
+ end;
+ false ->
+ lager:debug("~p: STH timestamp ~p <= ~p, waiting.",
+ [NodeAddress, Timestamp, LastTimestamp]),
+ LastTimestamp
+ end,
+ Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 60)),
+ {noreply,
+ State#state{timer = erlang:start_timer(Wait * 1000, self(), dist),
+ sth_timestamp = TS}}.
+
+%% @doc Has nonlocal return because of throw further down in do_request/4.
+do_dist(NodeAddress, Size) ->
+ {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress),
+ true = Size >= VerifiedSize,
+ do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize).
+
+do_dist(_, _, 0) ->
+ ok;
+do_dist(NodeAddress, Size, NTotal) ->
+ DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000),
+ N = min(DistMaxWindow, NTotal),
+ Hashes = index:getrange(logorder, Size, Size + N - 1),
+ ok = frontend_sendlog(NodeAddress, Size, Hashes),
+ ok = frontend_send_missing_entries(NodeAddress, Hashes),
+ {ok, NewSize} = frontend_verify_entries(NodeAddress, Size + N),
+ lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Size]),
+ true = NTotal >= NewSize - Size,
+ do_dist(NodeAddress, NewSize, NTotal - (NewSize - Size)).
+
+frontend_sendlog(NodeAddress, Start, Hashes) ->
+ SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000),
+ frontend_sendlog_chunk(NodeAddress, Start, lists:split(min(SendlogChunksize, length(Hashes)), Hashes), SendlogChunksize).
+
+frontend_sendlog_chunk(_, _, {[], _}, _) ->
+ ok;
+frontend_sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) ->
+ ok = frontend_sendlog_request(NodeAddress, Start, Chunk),
+ frontend_sendlog_chunk(NodeAddress, Start + length(Chunk),
+ lists:split(min(Chunksize, length(Rest)), Rest), Chunksize).
+
+frontend_sendlog_request(NodeAddress, Start, Hashes) ->
+ DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]),
+ URL = NodeAddress ++ "sendlog",
+ Headers = [{"Content-Type", "text/json"}],
+ EncodedHashes = [base64:encode(H) || H <- Hashes],
+ RequestBody = list_to_binary(mochijson2:encode({[{"start", Start},
+ {"hashes", EncodedHashes}]})),
+ case do_request(DebugTag, URL, Headers, RequestBody) of
+ {<<"ok">>, _} ->
+ ok;
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+frontend_send_missing_entries(NodeAddress, Hashes) ->
+ SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100),
+ {ChunkOfHashes, RestOfHashes} = lists:split(min(SendentriesChunksize, length(Hashes)), Hashes),
+ frontend_send_entries_chunk(NodeAddress,
+ {ChunkOfHashes, RestOfHashes},
+ SendentriesChunksize).
+
+frontend_send_entries_chunk(_, {[], _}, _) ->
+ ok;
+frontend_send_entries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) ->
+ HashesAndEntries = lists:zip(Chunk, [db:entry_for_leafhash(H) || H <- Chunk]),
+ ok = frontend_send_entries_request(NodeAddress, HashesAndEntries),
+ frontend_send_entries_chunk(NodeAddress,
+ lists:split(min(Chunksize, length(Rest)), Rest),
+ Chunksize).
+
+frontend_send_entries_request(NodeAddress, HashesAndEntries) ->
+ DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]),
+ URL = NodeAddress ++ "sendentry",
+ Headers = [{"Content-Type", "text/json"}],
+ L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]),
+ RequestBody = list_to_binary(L),
+ case do_request(DebugTag, URL, Headers, RequestBody) of
+ {<<"ok">>, _} ->
+ ok;
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+frontend_get_verifiedsize(NodeAddress) ->
+ frontend_verify_entries(NodeAddress, 0).
+
+frontend_verify_entries(NodeAddress, Size) ->
+ DebugTag = io_lib:format("verify-entries ~B", [Size]),
+ URL = NodeAddress ++ "verify-entries",
+ Headers = [{"Content-Type", "text/json"}],
+ RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})),
+ case do_request(DebugTag, URL, Headers, RequestBody) of
+ {<<"ok">>, PropList} ->
+ {ok, proplists:get_value(<<"verified">>, PropList)};
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+publish_sth(NodeAddress, STH) ->
+ DebugTag = "publish-sth",
+ URL = NodeAddress ++ "publish-sth",
+ Headers = [{"Content-Type", "text/json"}],
+ RequestBody = list_to_binary(mochijson2:encode(STH)),
+ case do_request(DebugTag, URL, Headers, RequestBody) of
+ {<<"ok">>, _} ->
+ ok;
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+do_request(DebugTag, URL, Headers, RequestBody) ->
+ case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of
+ {error, Err} ->
+ throw({request_error, request, DebugTag, Err});
+ {failure, {none, StatusCode, none}, _RespHeaders, _Body} ->
+ throw({request_error, failure, DebugTag, StatusCode});
+ {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 ->
+ case (catch mochijson2:decode(Body)) of
+ {error, Err} ->
+ throw({request_error, decode, DebugTag, Err});
+ {struct, PropList} ->
+ {proplists:get_value(<<"result">>, PropList), PropList}
+ end
+ end.
diff --git a/merge/src/merge_dist_sup.erl b/merge/src/merge_dist_sup.erl
new file mode 100644
index 0000000..050ddc5
--- /dev/null
+++ b/merge/src/merge_dist_sup.erl
@@ -0,0 +1,29 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_dist_sup).
+-behaviour(supervisor).
+
+-export([start_link/1, init/1]).
+
+start_link([]) ->
+ {ok, Nodes} = application:get_env(plop, frontend_nodes),
+ lager:info("starting merge dist for frontend nodes: ~p", [Nodes]),
+ {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
+ lists:map(fun(Node) ->
+ lager:debug("starting dist worker: ~p", [Node]),
+ {ok, Child} =
+ supervisor:start_child(?MODULE, [Node]),
+ Child
+ end, Nodes),
+ {ok, Pid}.
+
+init(_Args) ->
+ {ok,
+ {{simple_one_for_one, 3, 10},
+ [
+ {ignored,
+ {merge_dist, start_link, []},
+ permanent, 10000, worker,
+ [merge_dist]}
+ ]}}.
diff --git a/merge/src/merge_sup.erl b/merge/src/merge_sup.erl
new file mode 100644
index 0000000..124fb12
--- /dev/null
+++ b/merge/src/merge_sup.erl
@@ -0,0 +1,21 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_sup).
+-behaviour(supervisor).
+
+-export([start_link/1, init/1]).
+
+start_link(_Args) ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ {ok, LogorderPath} = application:get_env(plop, index_path),
+ {ok,
+ {{one_for_one, 3, 10},
+ [
+ {the_logorder, {index, start_link, [logorder, LogorderPath]},
+ permanent, 10000, worker, [index]},
+ {merge_dist_sup, {merge_dist_sup, start_link, [[]]},
+ transient, infinity, supervisor, [merge_dist_sup]}
+ ]}}.