From 0070a4f70dd78f1f8aacb0657c741a2c311a7f32 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Thu, 2 Feb 2017 02:25:42 +0100 Subject: Parallelised merge, backup phase. --- merge/src/merge_backup.erl | 167 +++++++++++++++++++++++++++++++++++++++++ merge/src/merge_backup_sup.erl | 23 ++++++ merge/src/merge_dist.erl | 90 +++------------------- merge/src/merge_sup.erl | 2 + merge/src/merge_util.erl | 74 ++++++++++++++++++ src/plop_httputil.erl | 13 +++- 6 files changed, 289 insertions(+), 80 deletions(-) create mode 100644 merge/src/merge_backup.erl create mode 100644 merge/src/merge_backup_sup.erl create mode 100644 merge/src/merge_util.erl diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl new file mode 100644 index 0000000..bd75608 --- /dev/null +++ b/merge/src/merge_backup.erl @@ -0,0 +1,167 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_backup). +-behaviour(gen_server). + +-export([start_link/1]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-record(state, { + timer :: reference(), + node_name :: string(), + node_address :: string() + }). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +init([Name, Address]) -> + lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]), + Timer = erlang:start_timer(1000, self(), backup), + {ok, #state{timer = Timer, node_name = Name, node_address = Address}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}. +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({timeout, _Timer, backup}, State) -> + backup(fetched(), State). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(Reason, #state{timer = Timer}) -> + lager:info("~p terminating: ~p", [?MODULE, Reason]), + erlang:cancel_timer(Timer), + ok. + +%%%%%%%%%%%%%%%%%%%% + +backup(noentry, State) -> + {noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}}; +backup({struct, Fetched}, State) -> + Index = proplists:get_value(<<"index">>, Fetched), + Hash = proplists:get_value(<<"hash">>, Fetched), + backup(Index, Hash, State). + +backup(-1, _, State) -> + {noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}}; +backup(Index, Hash, + #state{node_name = NodeName, node_address = NodeAddress} = State) -> + ok = verify_logorder_and_fetched_consistency(Index, Hash), + Size = index:indexsize(logorder), + lager:debug("~p: logorder size ~B", [NodeName, Size]), + ht:load_tree(Size - 1), + try + {ok, VerifiedSize} = verified_size(NodeAddress), + lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]), + case VerifiedSize == Size of + true -> + TreeHead = ht:root(Size - 1), + ok = check_root(NodeAddress, Size, TreeHead), + ok = write_backupfile(NodeName, Size, TreeHead); + false -> + true = VerifiedSize < Size, % Secondary ahead of primary? + ok = do_backup(NodeName, NodeAddress, VerifiedSize, Size - VerifiedSize) + end + catch + throw:{request_error, SubErrType, DebugTag, Error} -> + lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error]) + end, + Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 10)), + {noreply, + State#state{timer = erlang:start_timer(Wait * 1000, self(), backup)}}. + +do_backup(_, _, _, 0) -> + ok; +do_backup(NodeName, NodeAddress, Start, NTotal) -> + N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)), + Hashes = index:getrange(logorder, Start, Start + N - 1), + ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), + ok = merge_util:sendentries(NodeAddress, Hashes, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), + Size = Start + N, + TreeHead = ht:root(Size - 1), + ok = check_root(NodeAddress, Size, TreeHead), + ok = setverifiedsize(NodeAddress, Size), + ok = write_backupfile(NodeName, Size, TreeHead), + true = NTotal >= N, + do_backup(NodeName, NodeAddress, Size, NTotal - N). + +write_backupfile(NodeName, TreeSize, TreeHead) -> + {ok, BasePath} = application:get_env(plop, verified_path), + Path = BasePath ++ "." ++ NodeName, + Content = mochijson2:encode({[{"tree_size", TreeSize}, + {"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}), + atomic:replacefile(Path, Content). + +check_root(NodeAddress, Size, TreeHead) -> + {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size), + case TreeHeadToVerify == TreeHead of + true -> + ok; + false -> + lager:error("~p: ~B: secondary merge root ~p != ~p", + [NodeAddress, Size, TreeHeadToVerify, TreeHead]), + root_mismatch + end. + +verifyroot(NodeAddress, TreeSize) -> + DebugTag = io_lib:format("verifyroot ~B", [TreeSize]), + URL = NodeAddress ++ "verifyroot", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})), + case merge_util:request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, PropList} -> + {ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +verified_size(NodeAddress) -> + DebugTag = "verifiedsize", + URL = NodeAddress ++ "verifiedsize", + case merge_util:request(DebugTag, URL) of + {<<"ok">>, PropList} -> + {ok, proplists:get_value(<<"size">>, PropList)}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +setverifiedsize(NodeAddress, Size) -> + DebugTag = io_lib:format("setverifiedsize ~B", [Size]), + URL = NodeAddress ++ "setverifiedsize", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})), + case merge_util:request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, _} -> + ok; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +fetched() -> + case application:get_env(plop, fetched_path) of + {ok, FetchedFile} -> + case atomic:readfile(FetchedFile) of + noentry -> + noentry; + Contents -> + mochijson2:decode(Contents) + end; + undefined -> + noentry + end. + +verify_logorder_and_fetched_consistency(Index, Hash) -> + HashString = binary_to_list(Hash), + case hex:bin_to_hexstr(index:get(logorder, Index)) of + HashString -> + ok; + Mismatch -> + lager:error("fetched file hash=~p doesn't match logorder[~B]=~p", + [HashString, Index, Mismatch]), + fetched_mismatch + end. diff --git a/merge/src/merge_backup_sup.erl b/merge/src/merge_backup_sup.erl new file mode 100644 index 0000000..3c77a8f --- /dev/null +++ b/merge/src/merge_backup_sup.erl @@ -0,0 +1,23 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_backup_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link([]) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init(_Args) -> + {ok, {{one_for_one, 3, 10}, children()}}. + +child_spec({Name, Address}) -> + {Name, {merge_backup, start_link, [[Name, Address]]}, + permanent, 10000, worker, [merge_backup]}. + +children() -> + {ok, Nodes} = plopconfig:get_env(merge_secondaries), + {Names, _Addrs} = lists:unzip(Nodes), + lager:info("Starting merge backup for secondary merge nodes: ~p", [Names]), + [child_spec(Node) || Node <- Nodes]. diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index 8d3dc2b..4aa94aa 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -75,7 +75,8 @@ dist({struct, PropList} = STH, State#state{timer = erlang:start_timer(Wait * 1000, self(), dist), sth_timestamp = TS}}. -%% @doc Has nonlocal return because of throw further down in do_request/4. +%% @doc Has nonlocal return because of throw further down in +%% merge_util:request/4. do_dist(NodeAddress, Size) -> {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), true = Size >= VerifiedSize, @@ -83,70 +84,18 @@ do_dist(NodeAddress, Size) -> do_dist(_, _, 0) -> ok; -do_dist(NodeAddress, Size, NTotal) -> +do_dist(NodeAddress, Start, NTotal) -> DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), N = min(DistMaxWindow, NTotal), - Hashes = index:getrange(logorder, Size, Size + N - 1), - ok = frontend_sendlog(NodeAddress, Size, Hashes), - ok = frontend_send_missing_entries(NodeAddress, Hashes), - {ok, NewSize} = frontend_verify_entries(NodeAddress, Size + N), - lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Size]), - true = NTotal >= NewSize - Size, - do_dist(NodeAddress, NewSize, NTotal - (NewSize - Size)). - -frontend_sendlog(NodeAddress, Start, Hashes) -> + Hashes = index:getrange(logorder, Start, Start + N - 1), SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), - frontend_sendlog_chunk(NodeAddress, Start, lists:split(min(SendlogChunksize, length(Hashes)), Hashes), SendlogChunksize). - -frontend_sendlog_chunk(_, _, {[], _}, _) -> - ok; -frontend_sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> - ok = frontend_sendlog_request(NodeAddress, Start, Chunk), - frontend_sendlog_chunk(NodeAddress, Start + length(Chunk), - lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). - -frontend_sendlog_request(NodeAddress, Start, Hashes) -> - DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]), - URL = NodeAddress ++ "sendlog", - Headers = [{"Content-Type", "text/json"}], - EncodedHashes = [base64:encode(H) || H <- Hashes], - RequestBody = list_to_binary(mochijson2:encode({[{"start", Start}, - {"hashes", EncodedHashes}]})), - case do_request(DebugTag, URL, Headers, RequestBody) of - {<<"ok">>, _} -> - ok; - Err -> - throw({request_error, result, DebugTag, Err}) - end. - -frontend_send_missing_entries(NodeAddress, Hashes) -> SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), - {ChunkOfHashes, RestOfHashes} = lists:split(min(SendentriesChunksize, length(Hashes)), Hashes), - frontend_send_entries_chunk(NodeAddress, - {ChunkOfHashes, RestOfHashes}, - SendentriesChunksize). - -frontend_send_entries_chunk(_, {[], _}, _) -> - ok; -frontend_send_entries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> - HashesAndEntries = lists:zip(Chunk, [db:entry_for_leafhash(H) || H <- Chunk]), - ok = frontend_send_entries_request(NodeAddress, HashesAndEntries), - frontend_send_entries_chunk(NodeAddress, - lists:split(min(Chunksize, length(Rest)), Rest), - Chunksize). - -frontend_send_entries_request(NodeAddress, HashesAndEntries) -> - DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]), - URL = NodeAddress ++ "sendentry", - Headers = [{"Content-Type", "text/json"}], - L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]), - RequestBody = list_to_binary(L), - case do_request(DebugTag, URL, Headers, RequestBody) of - {<<"ok">>, _} -> - ok; - Err -> - throw({request_error, result, DebugTag, Err}) - end. + ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), + ok = merge_util:sendentries(NodeAddress, Hashes, SendentriesChunksize), + {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N), + lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Start]), + true = NTotal >= NewSize - Start, + do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)). frontend_get_verifiedsize(NodeAddress) -> frontend_verify_entries(NodeAddress, 0). @@ -156,7 +105,7 @@ frontend_verify_entries(NodeAddress, Size) -> URL = NodeAddress ++ "verify-entries", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), - case do_request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"verified">>, PropList)}; Err -> @@ -168,24 +117,9 @@ publish_sth(NodeAddress, STH) -> URL = NodeAddress ++ "publish-sth", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode(STH)), - case do_request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end. - -do_request(DebugTag, URL, Headers, RequestBody) -> - case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of - {error, Err} -> - throw({request_error, request, DebugTag, Err}); - {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> - throw({request_error, failure, DebugTag, StatusCode}); - {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> - case (catch mochijson2:decode(Body)) of - {error, Err} -> - throw({request_error, decode, DebugTag, Err}); - {struct, PropList} -> - {proplists:get_value(<<"result">>, PropList), PropList} - end - end. diff --git a/merge/src/merge_sup.erl b/merge/src/merge_sup.erl index 124fb12..d20abf9 100644 --- a/merge/src/merge_sup.erl +++ b/merge/src/merge_sup.erl @@ -16,6 +16,8 @@ init([]) -> [ {the_logorder, {index, start_link, [logorder, LogorderPath]}, permanent, 10000, worker, [index]}, + {merge_backup_sup, {merge_backup_sup, start_link, [[]]}, + transient, infinity, supervisor, [merge_backup_sup]}, {merge_dist_sup, {merge_dist_sup, start_link, [[]]}, transient, infinity, supervisor, [merge_dist_sup]} ]}}. diff --git a/merge/src/merge_util.erl b/merge/src/merge_util.erl new file mode 100644 index 0000000..a6b3ac9 --- /dev/null +++ b/merge/src/merge_util.erl @@ -0,0 +1,74 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_util). +-export([sendlog/4, sendentries/3]). +-export([request/2, request/4]). + +request(DebugTag, URL) -> + request(DebugTag, URL, [], <<>>). + +request(DebugTag, URL, Headers, RequestBody) -> + case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of + {error, Err} -> + throw({request_error, request, DebugTag, Err}); + {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> + throw({request_error, failure, DebugTag, StatusCode}); + {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> + case (catch mochijson2:decode(Body)) of + {error, Err} -> + throw({request_error, decode, DebugTag, Err}); + {struct, PropList} -> + {proplists:get_value(<<"result">>, PropList), PropList} + end + end. + +sendlog(NodeAddress, Start, Hashes, Chunksize) -> + sendlog_chunk(NodeAddress, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize). + +sendlog_chunk(_, _, {[], _}, _) -> + ok; +sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> + ok = sendlog_request(NodeAddress, Start, Chunk), + sendlog_chunk(NodeAddress, Start + length(Chunk), + lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). + +sendlog_request(NodeAddress, Start, Hashes) -> + DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]), + URL = NodeAddress ++ "sendlog", + Headers = [{"Content-Type", "text/json"}], + EncodedHashes = [base64:encode(H) || H <- Hashes], + RequestBody = list_to_binary(mochijson2:encode({[{"start", Start}, + {"hashes", EncodedHashes}]})), + case request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, _} -> + ok; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +sendentries(NodeAddress, Hashes, Chunksize) -> + {ChunkOfHashes, RestOfHashes} = lists:split(min(Chunksize, length(Hashes)), Hashes), + sendentries_chunk(NodeAddress, {ChunkOfHashes, RestOfHashes}, Chunksize). + +sendentries_chunk(_, {[], _}, _) -> + ok; +sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> + HashesAndEntries = lists:zip(Chunk, [db:entry_for_leafhash(H) || H <- Chunk]), + ok = sendentries_request(NodeAddress, HashesAndEntries), + sendentries_chunk(NodeAddress, + lists:split(min(Chunksize, length(Rest)), Rest), + Chunksize). + +sendentries_request(NodeAddress, HashesAndEntries) -> + DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]), + URL = NodeAddress ++ "sendentry", + Headers = [{"Content-Type", "text/json"}], + L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]), + RequestBody = list_to_binary(L), + case request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, _} -> + ok; + Err -> + throw({request_error, result, DebugTag, Err}) + end. diff --git a/src/plop_httputil.erl b/src/plop_httputil.erl index b4188e7..e428297 100644 --- a/src/plop_httputil.erl +++ b/src/plop_httputil.erl @@ -68,11 +68,20 @@ read_and_verify_cacertfile(Filename) -> CorrectHash = CalculatedHash, Der. +request(DebugTag, URL, Headers, <<>>) -> + request(DebugTag, URL, Headers, <<>>, get); request(DebugTag, URL, Headers, RequestBody) -> + request(DebugTag, URL, Headers, RequestBody, post). + +request(DebugTag, URL, Headers, RequestBody, Method) -> Starttime = os:timestamp(), ParsedURL = hackney_url:parse_url(URL), CACertFile = application:get_env(plop, https_cacertfile, none), CACert = read_and_verify_cacertfile(CACertFile), + MethodString = case Method of + get -> "GET"; + post -> "POST" + end, #hackney_url{path = Path, host = Host} = ParsedURL, lager:debug("~s: sending http request to ~p", [DebugTag, URL]), @@ -87,8 +96,8 @@ request(DebugTag, URL, Headers, RequestBody) -> [DebugTag, URL]), {ok, StatusCode, RespHeaders, ClientRef} = hackney:send_request(ConnRef, - {post, Path, - add_auth("POST", Path, Headers, + {Method, Path, + add_auth(MethodString, Path, Headers, RequestBody), RequestBody}), lager:debug("~s: received headers for ~p: ~p", -- cgit v1.1