diff options
Diffstat (limited to 'merge/src/merge_backup.erl')
-rw-r--r-- | merge/src/merge_backup.erl | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl new file mode 100644 index 0000000..bd75608 --- /dev/null +++ b/merge/src/merge_backup.erl @@ -0,0 +1,167 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_backup). +-behaviour(gen_server). + +-export([start_link/1]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-record(state, { + timer :: reference(), + node_name :: string(), + node_address :: string() + }). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +init([Name, Address]) -> + lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]), + Timer = erlang:start_timer(1000, self(), backup), + {ok, #state{timer = Timer, node_name = Name, node_address = Address}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}. +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({timeout, _Timer, backup}, State) -> + backup(fetched(), State). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(Reason, #state{timer = Timer}) -> + lager:info("~p terminating: ~p", [?MODULE, Reason]), + erlang:cancel_timer(Timer), + ok. + +%%%%%%%%%%%%%%%%%%%% + +backup(noentry, State) -> + {noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}}; +backup({struct, Fetched}, State) -> + Index = proplists:get_value(<<"index">>, Fetched), + Hash = proplists:get_value(<<"hash">>, Fetched), + backup(Index, Hash, State). + +backup(-1, _, State) -> + {noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}}; +backup(Index, Hash, + #state{node_name = NodeName, node_address = NodeAddress} = State) -> + ok = verify_logorder_and_fetched_consistency(Index, Hash), + Size = index:indexsize(logorder), + lager:debug("~p: logorder size ~B", [NodeName, Size]), + ht:load_tree(Size - 1), + try + {ok, VerifiedSize} = verified_size(NodeAddress), + lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]), + case VerifiedSize == Size of + true -> + TreeHead = ht:root(Size - 1), + ok = check_root(NodeAddress, Size, TreeHead), + ok = write_backupfile(NodeName, Size, TreeHead); + false -> + true = VerifiedSize < Size, % Secondary ahead of primary? + ok = do_backup(NodeName, NodeAddress, VerifiedSize, Size - VerifiedSize) + end + catch + throw:{request_error, SubErrType, DebugTag, Error} -> + lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error]) + end, + Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 10)), + {noreply, + State#state{timer = erlang:start_timer(Wait * 1000, self(), backup)}}. + +do_backup(_, _, _, 0) -> + ok; +do_backup(NodeName, NodeAddress, Start, NTotal) -> + N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)), + Hashes = index:getrange(logorder, Start, Start + N - 1), + ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), + ok = merge_util:sendentries(NodeAddress, Hashes, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), + Size = Start + N, + TreeHead = ht:root(Size - 1), + ok = check_root(NodeAddress, Size, TreeHead), + ok = setverifiedsize(NodeAddress, Size), + ok = write_backupfile(NodeName, Size, TreeHead), + true = NTotal >= N, + do_backup(NodeName, NodeAddress, Size, NTotal - N). + +write_backupfile(NodeName, TreeSize, TreeHead) -> + {ok, BasePath} = application:get_env(plop, verified_path), + Path = BasePath ++ "." ++ NodeName, + Content = mochijson2:encode({[{"tree_size", TreeSize}, + {"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}), + atomic:replacefile(Path, Content). + +check_root(NodeAddress, Size, TreeHead) -> + {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size), + case TreeHeadToVerify == TreeHead of + true -> + ok; + false -> + lager:error("~p: ~B: secondary merge root ~p != ~p", + [NodeAddress, Size, TreeHeadToVerify, TreeHead]), + root_mismatch + end. + +verifyroot(NodeAddress, TreeSize) -> + DebugTag = io_lib:format("verifyroot ~B", [TreeSize]), + URL = NodeAddress ++ "verifyroot", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})), + case merge_util:request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, PropList} -> + {ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +verified_size(NodeAddress) -> + DebugTag = "verifiedsize", + URL = NodeAddress ++ "verifiedsize", + case merge_util:request(DebugTag, URL) of + {<<"ok">>, PropList} -> + {ok, proplists:get_value(<<"size">>, PropList)}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +setverifiedsize(NodeAddress, Size) -> + DebugTag = io_lib:format("setverifiedsize ~B", [Size]), + URL = NodeAddress ++ "setverifiedsize", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})), + case merge_util:request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, _} -> + ok; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +fetched() -> + case application:get_env(plop, fetched_path) of + {ok, FetchedFile} -> + case atomic:readfile(FetchedFile) of + noentry -> + noentry; + Contents -> + mochijson2:decode(Contents) + end; + undefined -> + noentry + end. + +verify_logorder_and_fetched_consistency(Index, Hash) -> + HashString = binary_to_list(Hash), + case hex:bin_to_hexstr(index:get(logorder, Index)) of + HashString -> + ok; + Mismatch -> + lager:error("fetched file hash=~p doesn't match logorder[~B]=~p", + [HashString, Index, Mismatch]), + fetched_mismatch + end. |