diff options
author | Linus Nordberg <linus@nordu.net> | 2017-01-27 15:11:42 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2017-01-30 09:54:40 +0100 |
commit | e99900eb05cdb2f5fecb01c987396b49a0a31aa0 (patch) | |
tree | 17b3f480f2086e692e091337c53be9e558c59e06 /merge/src | |
parent | 85b20a1a07e0eb9e4a7c0cedc169b2ad210b30b6 (diff) |
Parallelised merge, distribution phase.
Diffstat (limited to 'merge/src')
-rw-r--r-- | merge/src/merge_app.erl | 12 | ||||
-rw-r--r-- | merge/src/merge_dist.erl | 198 | ||||
-rw-r--r-- | merge/src/merge_dist_sup.erl | 31 | ||||
-rw-r--r-- | merge/src/merge_sup.erl | 21 |
4 files changed, 262 insertions, 0 deletions
diff --git a/merge/src/merge_app.erl b/merge/src/merge_app.erl new file mode 100644 index 0000000..cd68d81 --- /dev/null +++ b/merge/src/merge_app.erl @@ -0,0 +1,12 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(normal, Args) -> + merge_sup:start_link(Args). + +stop(_State) -> + ok. diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl new file mode 100644 index 0000000..54aaa00 --- /dev/null +++ b/merge/src/merge_dist.erl @@ -0,0 +1,198 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_dist). +-behaviour(gen_server). + +-export([start_link/1]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-record(state, { + timer :: reference(), + node_address :: string(), + sth_timestamp :: non_neg_integer() + }). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +init(Node) -> + lager:info("~p:~p: starting", [?MODULE, Node]), + Timer = erlang:start_timer(1000, self(), dist), + {ok, #state{timer = Timer, + node_address = Node, + sth_timestamp = 0}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}. +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({timeout, _Timer, dist}, + #state{node_address = NodeAddress, + sth_timestamp = PrevTimestamp} = State) -> + case plop:sth() of + noentry -> + Timer = erlang:start_timer(1000, self(), dist), + {noreply, State#state{timer = Timer}}; + {struct, PropList} = STH -> + Treesize = proplists:get_value(<<"tree_size">>, PropList), + Timestamp = proplists:get_value(<<"timestamp">>, PropList), + RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), + Signature = base64:decode(proplists:get_value(<<"tree_head_signature">>, PropList)), + Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 60)), + case Timestamp > PrevTimestamp of + true -> + true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature), + ok = dist(NodeAddress, min(Treesize, index:indexsize(logorder))), + ok = publish_sth(NodeAddress, STH), + lager:info("~p: Published STH with size ~B and timestamp ~p.", [NodeAddress, Treesize, Timestamp]), + {noreply, State#state{timer = erlang:start_timer(Wait * 1000, self(), dist), + sth_timestamp = Timestamp}}; + false -> + lager:debug("~p: STH timestamp ~p <= ~p, waiting ~B seconds.", [NodeAddress, Timestamp, PrevTimestamp, Wait]), + {noreply, State#state{timer = erlang:start_timer(Wait * 1000, self(), dist)}} + end + end. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(Reason, #state{timer = Timer}) -> + lager:info("~p terminating: ~p", [?MODULE, Reason]), + erlang:cancel_timer(Timer), + ok. + +publish_sth(NodeAddress, STH) -> + DebugTag = "publish-sth", + URL = NodeAddress ++ "publish-sth", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode(STH)), + case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of + {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> + case (catch mochijson2:decode(Body)) of + {error, E} -> + lager:error("json parse error: ~p", [E]), + {error, E}; + {struct, PropList} -> + case proplists:get_value(<<"result">>, PropList) of + <<"ok">> -> ok; + Error -> {error, Error} + end + end + end. + +dist(NodeAddress, Size) -> + %% TODO: Lots of things here might fail bc node is not reachable, + %% in which case crashing is ok but restarting should wait some, + %% so what now? + {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), + true = Size >= VerifiedSize, + dist(NodeAddress, VerifiedSize, Size - VerifiedSize). + +dist(_, _, 0) -> + ok; +dist(NodeAddress, Size, NTotal) -> + DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), + N = min(DistMaxWindow, NTotal), + Hashes = index:getrange(logorder, Size, Size + N - 1), + ok = frontend_sendlog(NodeAddress, Size, Hashes), + ok = frontend_send_missing_entries(NodeAddress, Hashes), + {ok, NewSize} = frontend_verify_entries(NodeAddress, Size + N), + lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Size]), + true = NTotal >= NewSize - Size, + dist(NodeAddress, NewSize, NTotal - (NewSize - Size)). + +frontend_sendlog(NodeAddress, Start, Hashes) -> + SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), + frontend_sendlog_chunk(NodeAddress, Start, + lists:split(min(SendlogChunksize, length(Hashes)), Hashes), + SendlogChunksize). + +frontend_sendlog_chunk(_, _, {[], _}, _) -> + ok; +frontend_sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> + ok = frontend_sendlog_request(NodeAddress, Start, Chunk), + frontend_sendlog_chunk(NodeAddress, Start + length(Chunk), + lists:split(min(Chunksize, length(Rest)), Rest), + Chunksize). + +frontend_sendlog_request(NodeAddress, Start, Hashes) -> + DebugTag = io_lib:format("frontend sendlog ~B:~B", [Start, length(Hashes)]), + URL = NodeAddress ++ "sendlog", + Headers = [{"Content-Type", "text/json"}], + EncodedHashes = [base64:encode(H) || H <- Hashes], + RequestBody = list_to_binary(mochijson2:encode({[{"start", Start}, + {"hashes", EncodedHashes}]})), + case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of + {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> + case (catch mochijson2:decode(Body)) of + {error, E} -> + lager:error("json parse error: ~p", [E]), + {error, E}; + {struct, PropList} -> + case proplists:get_value(<<"result">>, PropList) of + <<"ok">> -> ok; + Error -> {error, Error} + end + end + end. + +frontend_send_missing_entries(NodeAddress, Hashes) -> + SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), + {ChunkOfHashes, RestOfHashes} = lists:split(min(SendentriesChunksize, length(Hashes)), Hashes), + frontend_send_entries_chunk(NodeAddress, {ChunkOfHashes, RestOfHashes}, SendentriesChunksize). + +frontend_send_entries_chunk(_, {[], _}, _) -> + ok; +frontend_send_entries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> + HashesAndEntries = lists:zip(Chunk, [db:entry_for_leafhash(H) || H <- Chunk]), + ok = frontend_send_entries_request(NodeAddress, HashesAndEntries), + frontend_send_entries_chunk(NodeAddress, + lists:split(min(Chunksize, length(Rest)), Rest), + Chunksize). + +frontend_send_entries_request(NodeAddress, HashesAndEntries) -> + DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]), + URL = NodeAddress ++ "sendentry", + Headers = [{"Content-Type", "text/json"}], + L = mochijson2:encode( + [[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || + {H, E} <- HashesAndEntries]), + RequestBody = list_to_binary(L), + case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of + {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> + case (catch mochijson2:decode(Body)) of + {error, E} -> + lager:error("json parse error: ~p", [E]), + {error, E}; + {struct, PropList} -> + case proplists:get_value(<<"result">>, PropList) of + <<"ok">> -> ok; + Error -> {error, Error} + end + end + end. + +frontend_get_verifiedsize(NodeAddress) -> + frontend_verify_entries(NodeAddress, 0). + +frontend_verify_entries(NodeAddress, Size) -> + DebugTag = io_lib:format("verify-entries ~B", [Size]), + URL = NodeAddress ++ "verify-entries", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), + case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of + {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> + case (catch mochijson2:decode(Body)) of + {error, E} -> + lager:error("json parse error: ~p", [E]), + {error, E}; + {struct, PropList} -> + case proplists:get_value(<<"result">>, PropList) of + <<"ok">> -> {ok, proplists:get_value(<<"verified">>, PropList)}; + Error -> {error, Error} + end + end + end. diff --git a/merge/src/merge_dist_sup.erl b/merge/src/merge_dist_sup.erl new file mode 100644 index 0000000..a4f7237 --- /dev/null +++ b/merge/src/merge_dist_sup.erl @@ -0,0 +1,31 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_dist_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link([]) -> + {ok, Nodes} = application:get_env(plop, frontend_nodes), + lager:info("starting merge dist for frontend nodes: ~p", [Nodes]), + {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []), + Children = + lists:map(fun(Node) -> + lager:debug("starting dist worker: ~p", [Node]), + {ok, Child} = + supervisor:start_child(?MODULE, [Node]), + Child + end, Nodes), + lager:debug("supervisor ~p started dist workers: ~p", [Pid, Children]), + {ok, Pid}. + +init(_Args) -> + {ok, + {{simple_one_for_one, 3, 10}, + [ + {ignored, + {merge_dist, start_link, []}, + permanent, 10000, worker, + [merge_dist]} + ]}}. diff --git a/merge/src/merge_sup.erl b/merge/src/merge_sup.erl new file mode 100644 index 0000000..124fb12 --- /dev/null +++ b/merge/src/merge_sup.erl @@ -0,0 +1,21 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link(_Args) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, LogorderPath} = application:get_env(plop, index_path), + {ok, + {{one_for_one, 3, 10}, + [ + {the_logorder, {index, start_link, [logorder, LogorderPath]}, + permanent, 10000, worker, [index]}, + {merge_dist_sup, {merge_dist_sup, start_link, [[]]}, + transient, infinity, supervisor, [merge_dist_sup]} + ]}}. |