summaryrefslogtreecommitdiff
path: root/merge/src/merge_backup.erl
diff options
context:
space:
mode:
Diffstat (limited to 'merge/src/merge_backup.erl')
-rw-r--r--merge/src/merge_backup.erl167
1 files changed, 167 insertions, 0 deletions
diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl
new file mode 100644
index 0000000..bd75608
--- /dev/null
+++ b/merge/src/merge_backup.erl
@@ -0,0 +1,167 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_backup).
+-behaviour(gen_server).
+
+-export([start_link/1]).
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+
+-record(state, {
+ timer :: reference(),
+ node_name :: string(),
+ node_address :: string()
+ }).
+
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
+
+init([Name, Address]) ->
+ lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]),
+ Timer = erlang:start_timer(1000, self(), backup),
+ {ok, #state{timer = Timer, node_name = Name, node_address = Address}}.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State}.
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({timeout, _Timer, backup}, State) ->
+ backup(fetched(), State).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(Reason, #state{timer = Timer}) ->
+ lager:info("~p terminating: ~p", [?MODULE, Reason]),
+ erlang:cancel_timer(Timer),
+ ok.
+
+%%%%%%%%%%%%%%%%%%%%
+
+backup(noentry, State) ->
+ {noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}};
+backup({struct, Fetched}, State) ->
+ Index = proplists:get_value(<<"index">>, Fetched),
+ Hash = proplists:get_value(<<"hash">>, Fetched),
+ backup(Index, Hash, State).
+
+backup(-1, _, State) ->
+ {noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}};
+backup(Index, Hash,
+ #state{node_name = NodeName, node_address = NodeAddress} = State) ->
+ ok = verify_logorder_and_fetched_consistency(Index, Hash),
+ Size = index:indexsize(logorder),
+ lager:debug("~p: logorder size ~B", [NodeName, Size]),
+ ht:load_tree(Size - 1),
+ try
+ {ok, VerifiedSize} = verified_size(NodeAddress),
+ lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]),
+ case VerifiedSize == Size of
+ true ->
+ TreeHead = ht:root(Size - 1),
+ ok = check_root(NodeAddress, Size, TreeHead),
+ ok = write_backupfile(NodeName, Size, TreeHead);
+ false ->
+ true = VerifiedSize < Size, % Secondary ahead of primary?
+ ok = do_backup(NodeName, NodeAddress, VerifiedSize, Size - VerifiedSize)
+ end
+ catch
+ throw:{request_error, SubErrType, DebugTag, Error} ->
+ lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error])
+ end,
+ Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 10)),
+ {noreply,
+ State#state{timer = erlang:start_timer(Wait * 1000, self(), backup)}}.
+
+do_backup(_, _, _, 0) ->
+ ok;
+do_backup(NodeName, NodeAddress, Start, NTotal) ->
+ N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)),
+ Hashes = index:getrange(logorder, Start, Start + N - 1),
+ ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)),
+ ok = merge_util:sendentries(NodeAddress, Hashes, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)),
+ Size = Start + N,
+ TreeHead = ht:root(Size - 1),
+ ok = check_root(NodeAddress, Size, TreeHead),
+ ok = setverifiedsize(NodeAddress, Size),
+ ok = write_backupfile(NodeName, Size, TreeHead),
+ true = NTotal >= N,
+ do_backup(NodeName, NodeAddress, Size, NTotal - N).
+
+write_backupfile(NodeName, TreeSize, TreeHead) ->
+ {ok, BasePath} = application:get_env(plop, verified_path),
+ Path = BasePath ++ "." ++ NodeName,
+ Content = mochijson2:encode({[{"tree_size", TreeSize},
+ {"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}),
+ atomic:replacefile(Path, Content).
+
+check_root(NodeAddress, Size, TreeHead) ->
+ {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size),
+ case TreeHeadToVerify == TreeHead of
+ true ->
+ ok;
+ false ->
+ lager:error("~p: ~B: secondary merge root ~p != ~p",
+ [NodeAddress, Size, TreeHeadToVerify, TreeHead]),
+ root_mismatch
+ end.
+
+verifyroot(NodeAddress, TreeSize) ->
+ DebugTag = io_lib:format("verifyroot ~B", [TreeSize]),
+ URL = NodeAddress ++ "verifyroot",
+ Headers = [{"Content-Type", "text/json"}],
+ RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})),
+ case merge_util:request(DebugTag, URL, Headers, RequestBody) of
+ {<<"ok">>, PropList} ->
+ {ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))};
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+verified_size(NodeAddress) ->
+ DebugTag = "verifiedsize",
+ URL = NodeAddress ++ "verifiedsize",
+ case merge_util:request(DebugTag, URL) of
+ {<<"ok">>, PropList} ->
+ {ok, proplists:get_value(<<"size">>, PropList)};
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+setverifiedsize(NodeAddress, Size) ->
+ DebugTag = io_lib:format("setverifiedsize ~B", [Size]),
+ URL = NodeAddress ++ "setverifiedsize",
+ Headers = [{"Content-Type", "text/json"}],
+ RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})),
+ case merge_util:request(DebugTag, URL, Headers, RequestBody) of
+ {<<"ok">>, _} ->
+ ok;
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+fetched() ->
+ case application:get_env(plop, fetched_path) of
+ {ok, FetchedFile} ->
+ case atomic:readfile(FetchedFile) of
+ noentry ->
+ noentry;
+ Contents ->
+ mochijson2:decode(Contents)
+ end;
+ undefined ->
+ noentry
+ end.
+
+verify_logorder_and_fetched_consistency(Index, Hash) ->
+ HashString = binary_to_list(Hash),
+ case hex:bin_to_hexstr(index:get(logorder, Index)) of
+ HashString ->
+ ok;
+ Mismatch ->
+ lager:error("fetched file hash=~p doesn't match logorder[~B]=~p",
+ [HashString, Index, Mismatch]),
+ fetched_mismatch
+ end.