From 12e08090358383c5678417ae8929fca1f03ca8bc Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 2 Mar 2017 00:27:59 +0100 Subject: Statusserver --- Emakefile | 6 ++ Makefile | 1 + merge/src/merge_backup.erl | 1 + merge/src/merge_dist.erl | 20 +++-- merge/src/merge_sth.erl | 2 + merge/src/merge_util.erl | 4 + src/http_auth.erl | 6 +- src/plop_sup.erl | 1 + src/statusreport.erl | 169 ++++++++++++++++++++++++++++++++++++ src/storage.erl | 14 +++ statsserver/ebin/statsserver.app | 13 +++ statsserver/src/statsserver.erl | 71 +++++++++++++++ statsserver/src/statsserver_app.erl | 13 +++ statsserver/src/statsserver_sup.erl | 42 +++++++++ 14 files changed, 355 insertions(+), 8 deletions(-) create mode 100644 src/statusreport.erl create mode 100644 statsserver/ebin/statsserver.app create mode 100644 statsserver/src/statsserver.erl create mode 100644 statsserver/src/statsserver_app.erl create mode 100644 statsserver/src/statsserver_sup.erl diff --git a/Emakefile b/Emakefile index a40d624..0a41ae3 100644 --- a/Emakefile +++ b/Emakefile @@ -10,3 +10,9 @@ {i, "src/"}, % For plop.hrl. {outdir, "merge/ebin/"}, {parse_transform, lager_transform}]}. +{["statsserver/src/*"], + [debug_info, + {i, "../"}, % For hackney. + {i, "src/"}, % For plop.hrl. + {outdir, "statsserver/ebin/"}, + {parse_transform, lager_transform}]}. diff --git a/Makefile b/Makefile index 2b2f69d..4e5d5b2 100644 --- a/Makefile +++ b/Makefile @@ -11,6 +11,7 @@ clean: -rm priv/fsynchelper -rm ebin/*.beam -rm merge/ebin/*.beam + -rm statsserver/ebin/*.beam dialyze: build dialyzer ebin merge/ebin tags: diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl index bf20f23..f1e9253 100644 --- a/merge/src/merge_backup.erl +++ b/merge/src/merge_backup.erl @@ -81,6 +81,7 @@ do_backup(NodeName, NodeAddress, Start, NTotal) -> do_backup(NodeName, NodeAddress, Size, NTotal - N). write_backupfile(NodeName, TreeSize, TreeHead) -> + statusreport:report("merge_backup", NodeName, "verified", TreeSize), {ok, BasePath} = application:get_env(plop, verified_path), Path = BasePath ++ "." ++ NodeName, Content = mochijson2:encode({[{"tree_size", TreeSize}, diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index f8f0c7c..da6b667 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -48,7 +48,9 @@ dist(noentry, State) -> Timer = erlang:start_timer(1000, self(), dist), {noreply, State#state{timer = Timer}}; dist({struct, PropList} = STH, - #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) -> + #state{node_address = NodeAddress, + node_name = NodeName, + sth_timestamp = LastTimestamp} = State) -> Treesize = proplists:get_value(<<"tree_size">>, PropList), Timestamp = proplists:get_value(<<"timestamp">>, PropList), RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), @@ -60,8 +62,10 @@ dist({struct, PropList} = STH, try lager:info("~p: starting dist, sth at ~B, logorder at ~B", [NodeAddress, Treesize, Logordersize]), - ok = do_dist(NodeAddress, min(Treesize, Logordersize)), + statusreport:report("merge_dist", NodeName, "targetsth", Treesize), + ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)), ok = publish_sth(NodeAddress, STH), + statusreport:report("merge_dist", NodeName, "sth", Treesize), lager:info("~p: Published STH with size ~B and timestamp " ++ "~p.", [NodeAddress, Treesize, Timestamp]), Timestamp @@ -82,21 +86,22 @@ dist({struct, PropList} = STH, %% @doc Has nonlocal return because of throw further down in %% merge_util:request/4. -do_dist(NodeAddress, Size) -> +do_dist(NodeAddress, NodeName, Size) -> {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]), true = VerifiedSize =< Size, - do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize). + do_dist(NodeAddress, NodeName, VerifiedSize, Size - VerifiedSize). -do_dist(_, _, 0) -> +do_dist(_, _, _, 0) -> ok; -do_dist(NodeAddress, Start, NTotal) -> +do_dist(NodeAddress, NodeName, Start, NTotal) -> DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), N = min(DistMaxWindow, NTotal), Hashes = index:getrange(logorder, Start, Start + N - 1), SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), + statusreport:report("merge_dist", NodeName, "sendlog", Start + N), {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress), lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]), HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), @@ -104,8 +109,9 @@ do_dist(NodeAddress, Start, NTotal) -> {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N), lager:info("~p: Done distributing ~B out of ~B entries.", [NodeAddress, NewSize-Start, NTotal]), + statusreport:report("merge_dist", NodeName, "verified", Start + N), true = NTotal >= NewSize - Start, - do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)). + do_dist(NodeAddress, NodeName, NewSize, NTotal - (NewSize - Start)). frontend_get_verifiedsize(NodeAddress) -> frontend_verify_entries(NodeAddress, 0). diff --git a/merge/src/merge_sth.erl b/merge/src/merge_sth.erl index ab1cd8f..b8ae1f9 100644 --- a/merge/src/merge_sth.erl +++ b/merge/src/merge_sth.erl @@ -66,6 +66,7 @@ make_sth(CurSize, State) -> Wait = case NewSize < CurSize of true -> + statusreport:report("merge_sth", "sth", "sth", null), lager:debug("waiting for enough backups to reach ~B, now at ~B", [CurSize, NewSize]), 1; @@ -90,6 +91,7 @@ do_make_sth(Size) -> {"sha256_root_hash", base64:encode(NewRoot)}, {"tree_head_signature", base64:encode(PackedSignature)}], ok = plop:save_sth({struct, NewSTH}), + statusreport:report("merge_sth", "sth", "sth", Size), ok; false -> lager:error("The signature we got for new tree of size ~B doesn't " ++ diff --git a/merge/src/merge_util.erl b/merge/src/merge_util.erl index 7598e40..24eba60 100644 --- a/merge/src/merge_util.erl +++ b/merge/src/merge_util.erl @@ -12,14 +12,18 @@ request(DebugTag, URL) -> request(DebugTag, URL, Headers, RequestBody) -> case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of {error, Err} -> + statusreport:report_multi("merge_errors", URL, "http_error", list_to_binary(io_lib:format("~w", [Err]))), throw({request_error, request, DebugTag, Err}); {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> + statusreport:report_multi("merge_errors", URL, "http_error", StatusCode), throw({request_error, failure, DebugTag, StatusCode}); {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> case (catch mochijson2:decode(Body)) of {error, Err} -> + statusreport:report_multi("merge_errors", URL, "http_error", list_to_binary(Err)), throw({request_error, decode, DebugTag, Err}); {struct, PropList} -> + statusreport:report_multi("merge_errors", URL, "http_error", 200), {proplists:get_value(<<"result">>, PropList), PropList} end end. diff --git a/src/http_auth.erl b/src/http_auth.erl index 2cee51f..e083a2c 100644 --- a/src/http_auth.erl +++ b/src/http_auth.erl @@ -2,7 +2,7 @@ %%% See LICENSE for licensing information. -module(http_auth). --export([verify_auth/4, create_auth/3, init_key_table/0, sign_stored/1, verify_stored/3]). +-export([verify_auth/4, create_auth/3, init_key_table/0, sign_stored/1, verify_stored/3, own_name/0]). -define(KEY_TABLE, http_auth_keys). @@ -26,6 +26,10 @@ read_key_table() -> end. +own_name() -> + {_Key, KeyName} = own_key(), + KeyName. + own_key() -> case application:get_env(plop, own_key, none) of {KeyName, _KeyFile} -> diff --git a/src/plop_sup.erl b/src/plop_sup.erl index 27f7680..15f3fe8 100644 --- a/src/plop_sup.erl +++ b/src/plop_sup.erl @@ -50,6 +50,7 @@ init([]) -> Children = [permanent_worker(the_db, {db, start_link, []}, [db]), permanent_worker(the_storagedb, {storagedb, start_link, []}), permanent_worker(fsync, {fsyncport, start_link, []}), + permanent_worker(the_statusreport, {statusreport, start_link, []}), permanent_worker(plopcontrol, {plopcontrol, start_link, []})], OptionalChildren = lists:map(fun (ServiceName) -> case ServiceName of diff --git a/src/statusreport.erl b/src/statusreport.erl new file mode 100644 index 0000000..cd5bb5a --- /dev/null +++ b/src/statusreport.erl @@ -0,0 +1,169 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statusreport). +-behaviour(gen_server). + +-export([start_link/0]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). +-export([report/4]). +-export([report_multi/4]). + +-record(state, { + timer :: none|reference(), + nodename :: string(), + statusreports :: dict:dict(), + lastsent :: integer() + }). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + process_flag(trap_exit, true), + ReportInterval = application:get_env(plop, status_report_interval, 1000), + lager:info("~p: starting", [?MODULE]), + %Timer = erlang:start_timer(1000, self(), dist), + {ok, #state{timer = none, + nodename = http_auth:own_name(), + statusreports = dict:new(), + lastsent = erlang:monotonic_time(millisecond) - ReportInterval}}. + +store_status(State, Service, Target, Variable, Status) -> + Statusreports = dict:store({Service, Target, Variable}, + {single, Status}, + State#state.statusreports), + State#state{statusreports = Statusreports}. + +dict_append_set(Key, Value, Dict) -> + AppendSet = sets:from_list([Value]), + dict:update(Key, fun ({multi, Old}) -> + {multi, sets:union(Old, AppendSet)} + end, + {multi, AppendSet}, Dict). + +store_multi_status(State, Service, Target, Variable, Status) -> + Statusreports = dict_append_set({Service, Target, Variable}, + Status, + State#state.statusreports), + State#state{statusreports = Statusreports}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_cast({report, Service, Target, Variable, Status}, State) -> + NewState = store_status(State, Service, Target, Variable, Status), + {noreply, try_send(NewState)}; +handle_cast({report_multi, Service, Target, Variable, Status}, State) -> + NewState = store_multi_status(State, Service, Target, Variable, Status), + {noreply, try_send(NewState)}. + +handle_info({timeout, _Timer, force_send}, State) -> + lager:debug("statusreport timer timeout"), + {noreply, force_send(State)}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +cancel_timer(State) -> + case State#state.timer of + none -> + none; + _ -> + erlang:cancel_timer(State#state.timer) + end, + State#state{timer = none}. + +set_timer(State) -> + case State#state.timer of + none -> + ReportInterval = application:get_env(plop, status_report_interval, 1000), + Timer = erlang:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]), + State#state{timer = Timer}; + _ -> + State + end. + +terminate(Reason, State) -> + lager:info("~p terminating: ~p", [?MODULE, Reason]), + NewState = cancel_timer(State), + case Reason of + shutdown -> + force_send(NewState); + _ -> + none + end, + ok. + + + +group_by_service(Statusreports) -> + dict:to_list( + lists:foldl( + fun ({{Service, Target, Variable}, Status}, Acc) -> + dict:append(Service, {Target, Variable, Status}, Acc) + end, dict:new(), dict:to_list(Statusreports))). + +encode_status({single, Status}) -> + Status; +encode_status({multi, Statuses}) -> + sets:to_list(Statuses). + +send(Service, Statusreports, Nodename) -> + lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), + [NodeAddress] = plopconfig:get_env(statsservers, []), + DebugTag = "statusreport", + URL = NodeAddress ++ Service, + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary( + mochijson2:encode( + [ + {struct, + [{"target", list_to_binary(Target)}, + {"source", list_to_binary(Nodename)}, + {"key", list_to_binary(Variable)}, + {"value", encode_status(Status)}]} + || {Target, Variable, Status} <- Statusreports + ])), + case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of + {error, Err} -> + lager:debug("request error ~p ~p", [DebugTag, Err]); + {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> + lager:debug("request failure ~p ~p", [DebugTag, StatusCode]); + {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> + case (catch mochijson2:decode(Body)) of + {error, Err} -> + lager:debug("error returned ~p ~p", [DebugTag, Err]); + {struct, _PropList} -> + none + end + end. + +force_send(State) -> + lists:foreach(fun ({Service, Statusreports}) -> + send(Service, Statusreports, State#state.nodename) + end, group_by_service(State#state.statusreports)), + NewState = cancel_timer(State), + NewState#state{statusreports = dict:new(), lastsent = erlang:monotonic_time(millisecond)}. + +try_send(State) -> + ReportInterval = application:get_env(plop, status_report_interval, 1000), + NextSend = State#state.lastsent + ReportInterval, + Now = erlang:monotonic_time(millisecond), + if + NextSend > Now -> + lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]), + set_timer(State); + true -> + lager:debug("status report send long enough ago"), + force_send(State) + end. + +report(Service, Target, Variable, Status) -> + lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]), + gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). + +report_multi(Service, Target, Variable, Status) -> + lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]), + gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). diff --git a/src/storage.erl b/src/storage.erl index 7498635..6114a57 100644 --- a/src/storage.erl +++ b/src/storage.erl @@ -9,6 +9,18 @@ -define(APPURL_PLOP_STORAGE, "plop/v1/storage"). +reportnewentries() -> + NodeName = http_auth:own_name(), + {LastIndexOrZero, LastHash} = storagedb:lastverifiednewentry(), + VerifiedEntries = case LastHash of + none -> + 0; + _ -> + LastIndexOrZero + 1 + end, + NewEntries = index:indexsize(newentries_db) - VerifiedEntries, + statusreport:report("storage", NodeName, "newentries", NewEntries). + request(post, ?APPURL_PLOP_STORAGE, "sendentry", Input) -> case (catch mochijson2:decode(Input)) of {error, E} -> @@ -20,6 +32,7 @@ request(post, ?APPURL_PLOP_STORAGE, "sendentry", Input) -> ok = db:add_entry_sync(TreeLeafHash, LogEntry), ok = storagedb:add(TreeLeafHash), {KeyName, Sig} = http_auth:sign_stored(plop:spt_data_from_entry(LogEntry)), + reportnewentries(), success({[{result, <<"ok">>}, {"sig", KeyName ++ ":" ++ base64:encode_to_string(Sig)} ]}) @@ -55,6 +68,7 @@ request(get, ?APPURL_PLOP_STORAGE, "fetchnewentries", _Input) -> Entries = lists:map(fun(LeafHash) -> base64:encode(LeafHash) end, NewHashes), + reportnewentries(), success({[{result, <<"ok">>}, {entries, Entries}]}); request(get, ?APPURL_PLOP_STORAGE, "getentry", Query) -> diff --git a/statsserver/ebin/statsserver.app b/statsserver/ebin/statsserver.app new file mode 100644 index 0000000..9c642ed --- /dev/null +++ b/statsserver/ebin/statsserver.app @@ -0,0 +1,13 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% Application resource file for statsserver, see app(5). + +{application, statsserver, + [{description, "Plop statsserver"}, + {vsn, "0.10.1"}, + {modules, [statsserver_app, statsserver_sup, statsserver]}, + {applications, [kernel, stdlib, lager, plop]}, + {registered, [statsserver_sup, statsserver]}, + {mod, {statsserver_app, []}} + ]}. diff --git a/statsserver/src/statsserver.erl b/statsserver/src/statsserver.erl new file mode 100644 index 0000000..1d42b27 --- /dev/null +++ b/statsserver/src/statsserver.erl @@ -0,0 +1,71 @@ +%%% Copyright (c) 2014-2016, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% @doc Frontend node API + +-module(statsserver). +-export([init_module/0]). +%% API (URL) +-export([request/4]). + +-define(APPURL_PLOP_STATUS, "plop/v1/status"). + +request(post, ?APPURL_PLOP_STATUS, Service, Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("bad input:", E); + Entries when is_list(Entries) -> + lists:foreach(fun ({struct, PropList}) -> + Target = proplists:get_value(<<"target">>, PropList), + Variable = proplists:get_value(<<"key">>, PropList), + Status = proplists:get_value(<<"value">>, PropList), + set_status(Service, Target, Variable, Status) + end, Entries) + end, + success({[{result, <<"ok">>}]}); +request(get, "", "status", Input) -> + Now = erlang:monotonic_time(millisecond), + Variables = [{struct, [ + {service, list_to_binary(Service)}, + {target, Target}, + {variable, Variable}, + {status, Status}, + {age, (Now - Timestamp) / 1000} + ]} || {{Service, Target, Variable}, Status, Timestamp} <- get_status()], + success({[{result, Variables}]}). + + +success(Data) -> + {200, [{"Content-Type", "text/json"}, {"Access-Control-Allow-Origin", "*"}], mochijson2:encode(Data)}. + +html(Text, Input) -> + {400, [{"Content-Type", "text/html"}], + io_lib:format( + "

~n" ++ + "~s~n" ++ + "~p~n" ++ + "~n", [Text, Input])}. + +-define(STATUSDB_TABLE, statsserver_statusdb). + +init_module() -> + create_statusdb(). + +create_statusdb() -> + case ets:info(?STATUSDB_TABLE) of + undefined -> + ok; + _ -> + ets:delete(?STATUSDB_TABLE) + end, + ets:new(?STATUSDB_TABLE, [set, public, named_table]). + +get_status() -> + [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')]. + +set_status(Service, Target, Variable, Status) -> + lager:info("status: ~p ~p ~p ~p", [Service, Target, Variable, Status]), + Timestamp = erlang:monotonic_time(millisecond), + true = ets:insert(?STATUSDB_TABLE, + {{Service, Target, Variable}, Status, Timestamp}), + ok. diff --git a/statsserver/src/statsserver_app.erl b/statsserver/src/statsserver_app.erl new file mode 100644 index 0000000..6caf2b7 --- /dev/null +++ b/statsserver/src/statsserver_app.erl @@ -0,0 +1,13 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statsserver_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(normal, Args) -> + statsserver:init_module(), + statsserver_sup:start_link(Args). + +stop(_State) -> + ok. diff --git a/statsserver/src/statsserver_sup.erl b/statsserver/src/statsserver_sup.erl new file mode 100644 index 0000000..6b92e35 --- /dev/null +++ b/statsserver/src/statsserver_sup.erl @@ -0,0 +1,42 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statsserver_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link(_Args) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +gen_http_config(Config, SSLOptions, SSLFlag) -> + {ChildName, IpAddress, Port, Module} = Config, + {ok, IPv4Address} = inet:parse_ipv4strict_address(IpAddress), + WebConfig = [{ip, IPv4Address}, + {port, Port}, + {ssl, SSLFlag}, + {acceptor_pool_size, + application:get_env(catlfish, http_server_pool_size, 16)}, + {ssl_opts, SSLOptions} + ], + {ChildName, + {catlfish_web, start, [WebConfig, Module, ChildName]}, + permanent, 5000, worker, dynamic}. + + +init([]) -> + SSLOptions = + [{certfile, application:get_env(plop, https_certfile, none)}, + {keyfile, application:get_env(plop, https_keyfile, none)}, + {cacertfile, application:get_env(plop, https_cacertfile, none)}], + Servers = + lists:map(fun (Config) -> + gen_http_config(Config, SSLOptions, true) + end, application:get_env(plop, https_servers, [])) ++ + lists:map(fun (Config) -> + gen_http_config(Config, SSLOptions, false) + end, application:get_env(plop, http_servers, [])), + lager:debug("Starting servers ~p", [Servers]), + {ok, + {{one_for_one, 3, 10}, + Servers}}. -- cgit v1.1 From 64daaf148cd59bf19942014bc754992b6bc6d86d Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 2 Mar 2017 12:52:16 +0100 Subject: Rename to statusserver --- Emakefile | 4 +- Makefile | 2 +- src/statusreport.erl | 2 +- statsserver/ebin/statsserver.app | 13 ------- statsserver/src/statsserver.erl | 71 ----------------------------------- statsserver/src/statsserver_app.erl | 13 ------- statsserver/src/statsserver_sup.erl | 42 --------------------- statusserver/ebin/statusserver.app | 13 +++++++ statusserver/src/statusserver.erl | 71 +++++++++++++++++++++++++++++++++++ statusserver/src/statusserver_app.erl | 13 +++++++ statusserver/src/statusserver_sup.erl | 42 +++++++++++++++++++++ 11 files changed, 143 insertions(+), 143 deletions(-) delete mode 100644 statsserver/ebin/statsserver.app delete mode 100644 statsserver/src/statsserver.erl delete mode 100644 statsserver/src/statsserver_app.erl delete mode 100644 statsserver/src/statsserver_sup.erl create mode 100644 statusserver/ebin/statusserver.app create mode 100644 statusserver/src/statusserver.erl create mode 100644 statusserver/src/statusserver_app.erl create mode 100644 statusserver/src/statusserver_sup.erl diff --git a/Emakefile b/Emakefile index 0a41ae3..f223dab 100644 --- a/Emakefile +++ b/Emakefile @@ -10,9 +10,9 @@ {i, "src/"}, % For plop.hrl. {outdir, "merge/ebin/"}, {parse_transform, lager_transform}]}. -{["statsserver/src/*"], +{["statusserver/src/*"], [debug_info, {i, "../"}, % For hackney. {i, "src/"}, % For plop.hrl. - {outdir, "statsserver/ebin/"}, + {outdir, "statusserver/ebin/"}, {parse_transform, lager_transform}]}. diff --git a/Makefile b/Makefile index 4e5d5b2..65f2355 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ clean: -rm priv/fsynchelper -rm ebin/*.beam -rm merge/ebin/*.beam - -rm statsserver/ebin/*.beam + -rm statusserver/ebin/*.beam dialyze: build dialyzer ebin merge/ebin tags: diff --git a/src/statusreport.erl b/src/statusreport.erl index cd5bb5a..63414cd 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -112,7 +112,7 @@ encode_status({multi, Statuses}) -> send(Service, Statusreports, Nodename) -> lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), - [NodeAddress] = plopconfig:get_env(statsservers, []), + [NodeAddress] = plopconfig:get_env(statusservers, []), DebugTag = "statusreport", URL = NodeAddress ++ Service, Headers = [{"Content-Type", "text/json"}], diff --git a/statsserver/ebin/statsserver.app b/statsserver/ebin/statsserver.app deleted file mode 100644 index 9c642ed..0000000 --- a/statsserver/ebin/statsserver.app +++ /dev/null @@ -1,13 +0,0 @@ -%%% Copyright (c) 2017, NORDUnet A/S. -%%% See LICENSE for licensing information. - -%%% Application resource file for statsserver, see app(5). - -{application, statsserver, - [{description, "Plop statsserver"}, - {vsn, "0.10.1"}, - {modules, [statsserver_app, statsserver_sup, statsserver]}, - {applications, [kernel, stdlib, lager, plop]}, - {registered, [statsserver_sup, statsserver]}, - {mod, {statsserver_app, []}} - ]}. diff --git a/statsserver/src/statsserver.erl b/statsserver/src/statsserver.erl deleted file mode 100644 index 1d42b27..0000000 --- a/statsserver/src/statsserver.erl +++ /dev/null @@ -1,71 +0,0 @@ -%%% Copyright (c) 2014-2016, NORDUnet A/S. -%%% See LICENSE for licensing information. - -%%% @doc Frontend node API - --module(statsserver). --export([init_module/0]). -%% API (URL) --export([request/4]). - --define(APPURL_PLOP_STATUS, "plop/v1/status"). - -request(post, ?APPURL_PLOP_STATUS, Service, Input) -> - case (catch mochijson2:decode(Input)) of - {error, E} -> - html("bad input:", E); - Entries when is_list(Entries) -> - lists:foreach(fun ({struct, PropList}) -> - Target = proplists:get_value(<<"target">>, PropList), - Variable = proplists:get_value(<<"key">>, PropList), - Status = proplists:get_value(<<"value">>, PropList), - set_status(Service, Target, Variable, Status) - end, Entries) - end, - success({[{result, <<"ok">>}]}); -request(get, "", "status", Input) -> - Now = erlang:monotonic_time(millisecond), - Variables = [{struct, [ - {service, list_to_binary(Service)}, - {target, Target}, - {variable, Variable}, - {status, Status}, - {age, (Now - Timestamp) / 1000} - ]} || {{Service, Target, Variable}, Status, Timestamp} <- get_status()], - success({[{result, Variables}]}). - - -success(Data) -> - {200, [{"Content-Type", "text/json"}, {"Access-Control-Allow-Origin", "*"}], mochijson2:encode(Data)}. - -html(Text, Input) -> - {400, [{"Content-Type", "text/html"}], - io_lib:format( - "

~n" ++ - "~s~n" ++ - "~p~n" ++ - "~n", [Text, Input])}. - --define(STATUSDB_TABLE, statsserver_statusdb). - -init_module() -> - create_statusdb(). - -create_statusdb() -> - case ets:info(?STATUSDB_TABLE) of - undefined -> - ok; - _ -> - ets:delete(?STATUSDB_TABLE) - end, - ets:new(?STATUSDB_TABLE, [set, public, named_table]). - -get_status() -> - [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')]. - -set_status(Service, Target, Variable, Status) -> - lager:info("status: ~p ~p ~p ~p", [Service, Target, Variable, Status]), - Timestamp = erlang:monotonic_time(millisecond), - true = ets:insert(?STATUSDB_TABLE, - {{Service, Target, Variable}, Status, Timestamp}), - ok. diff --git a/statsserver/src/statsserver_app.erl b/statsserver/src/statsserver_app.erl deleted file mode 100644 index 6caf2b7..0000000 --- a/statsserver/src/statsserver_app.erl +++ /dev/null @@ -1,13 +0,0 @@ -%%% Copyright (c) 2017, NORDUnet A/S. -%%% See LICENSE for licensing information. - --module(statsserver_app). --behaviour(application). --export([start/2, stop/1]). - -start(normal, Args) -> - statsserver:init_module(), - statsserver_sup:start_link(Args). - -stop(_State) -> - ok. diff --git a/statsserver/src/statsserver_sup.erl b/statsserver/src/statsserver_sup.erl deleted file mode 100644 index 6b92e35..0000000 --- a/statsserver/src/statsserver_sup.erl +++ /dev/null @@ -1,42 +0,0 @@ -%%% Copyright (c) 2017, NORDUnet A/S. -%%% See LICENSE for licensing information. - --module(statsserver_sup). --behaviour(supervisor). - --export([start_link/1, init/1]). - -start_link(_Args) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -gen_http_config(Config, SSLOptions, SSLFlag) -> - {ChildName, IpAddress, Port, Module} = Config, - {ok, IPv4Address} = inet:parse_ipv4strict_address(IpAddress), - WebConfig = [{ip, IPv4Address}, - {port, Port}, - {ssl, SSLFlag}, - {acceptor_pool_size, - application:get_env(catlfish, http_server_pool_size, 16)}, - {ssl_opts, SSLOptions} - ], - {ChildName, - {catlfish_web, start, [WebConfig, Module, ChildName]}, - permanent, 5000, worker, dynamic}. - - -init([]) -> - SSLOptions = - [{certfile, application:get_env(plop, https_certfile, none)}, - {keyfile, application:get_env(plop, https_keyfile, none)}, - {cacertfile, application:get_env(plop, https_cacertfile, none)}], - Servers = - lists:map(fun (Config) -> - gen_http_config(Config, SSLOptions, true) - end, application:get_env(plop, https_servers, [])) ++ - lists:map(fun (Config) -> - gen_http_config(Config, SSLOptions, false) - end, application:get_env(plop, http_servers, [])), - lager:debug("Starting servers ~p", [Servers]), - {ok, - {{one_for_one, 3, 10}, - Servers}}. diff --git a/statusserver/ebin/statusserver.app b/statusserver/ebin/statusserver.app new file mode 100644 index 0000000..1a032f1 --- /dev/null +++ b/statusserver/ebin/statusserver.app @@ -0,0 +1,13 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% Application resource file for statusserver, see app(5). + +{application, statusserver, + [{description, "Plop statusserver"}, + {vsn, "0.10.1"}, + {modules, [statusserver_app, statusserver_sup, statusserver]}, + {applications, [kernel, stdlib, lager, plop]}, + {registered, [statusserver_sup, statusserver]}, + {mod, {statusserver_app, []}} + ]}. diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl new file mode 100644 index 0000000..323b28d --- /dev/null +++ b/statusserver/src/statusserver.erl @@ -0,0 +1,71 @@ +%%% Copyright (c) 2014-2016, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% @doc Frontend node API + +-module(statusserver). +-export([init_module/0]). +%% API (URL) +-export([request/4]). + +-define(APPURL_PLOP_STATUS, "plop/v1/status"). + +request(post, ?APPURL_PLOP_STATUS, Service, Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("bad input:", E); + Entries when is_list(Entries) -> + lists:foreach(fun ({struct, PropList}) -> + Target = proplists:get_value(<<"target">>, PropList), + Variable = proplists:get_value(<<"key">>, PropList), + Status = proplists:get_value(<<"value">>, PropList), + set_status(Service, Target, Variable, Status) + end, Entries) + end, + success({[{result, <<"ok">>}]}); +request(get, "", "status", Input) -> + Now = erlang:monotonic_time(millisecond), + Variables = [{struct, [ + {service, list_to_binary(Service)}, + {target, Target}, + {variable, Variable}, + {status, Status}, + {age, (Now - Timestamp) / 1000} + ]} || {{Service, Target, Variable}, Status, Timestamp} <- get_status()], + success({[{result, Variables}]}). + + +success(Data) -> + {200, [{"Content-Type", "text/json"}, {"Access-Control-Allow-Origin", "*"}], mochijson2:encode(Data)}. + +html(Text, Input) -> + {400, [{"Content-Type", "text/html"}], + io_lib:format( + "

~n" ++ + "~s~n" ++ + "~p~n" ++ + "~n", [Text, Input])}. + +-define(STATUSDB_TABLE, statusserver_statusdb). + +init_module() -> + create_statusdb(). + +create_statusdb() -> + case ets:info(?STATUSDB_TABLE) of + undefined -> + ok; + _ -> + ets:delete(?STATUSDB_TABLE) + end, + ets:new(?STATUSDB_TABLE, [set, public, named_table]). + +get_status() -> + [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')]. + +set_status(Service, Target, Variable, Status) -> + lager:info("status: ~p ~p ~p ~p", [Service, Target, Variable, Status]), + Timestamp = erlang:monotonic_time(millisecond), + true = ets:insert(?STATUSDB_TABLE, + {{Service, Target, Variable}, Status, Timestamp}), + ok. diff --git a/statusserver/src/statusserver_app.erl b/statusserver/src/statusserver_app.erl new file mode 100644 index 0000000..2fd8b8d --- /dev/null +++ b/statusserver/src/statusserver_app.erl @@ -0,0 +1,13 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statusserver_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(normal, Args) -> + statusserver:init_module(), + statusserver_sup:start_link(Args). + +stop(_State) -> + ok. diff --git a/statusserver/src/statusserver_sup.erl b/statusserver/src/statusserver_sup.erl new file mode 100644 index 0000000..5b0811a --- /dev/null +++ b/statusserver/src/statusserver_sup.erl @@ -0,0 +1,42 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statusserver_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link(_Args) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +gen_http_config(Config, SSLOptions, SSLFlag) -> + {ChildName, IpAddress, Port, Module} = Config, + {ok, IPv4Address} = inet:parse_ipv4strict_address(IpAddress), + WebConfig = [{ip, IPv4Address}, + {port, Port}, + {ssl, SSLFlag}, + {acceptor_pool_size, + application:get_env(catlfish, http_server_pool_size, 16)}, + {ssl_opts, SSLOptions} + ], + {ChildName, + {catlfish_web, start, [WebConfig, Module, ChildName]}, + permanent, 5000, worker, dynamic}. + + +init([]) -> + SSLOptions = + [{certfile, application:get_env(plop, https_certfile, none)}, + {keyfile, application:get_env(plop, https_keyfile, none)}, + {cacertfile, application:get_env(plop, https_cacertfile, none)}], + Servers = + lists:map(fun (Config) -> + gen_http_config(Config, SSLOptions, true) + end, application:get_env(plop, https_servers, [])) ++ + lists:map(fun (Config) -> + gen_http_config(Config, SSLOptions, false) + end, application:get_env(plop, http_servers, [])), + lager:debug("Starting servers ~p", [Servers]), + {ok, + {{one_for_one, 3, 10}, + Servers}}. -- cgit v1.1 From 27b809c9525a876ecde0a5346e0264643197d934 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Wed, 8 Mar 2017 23:20:36 +0100 Subject: Added heartbeat service. Add source. Send better messages. --- merge/src/merge_backup.erl | 30 +++++++++++----------- merge/src/merge_dist.erl | 24 ++++++++--------- merge/src/merge_sth.erl | 4 +-- merge/src/merge_util.erl | 54 +++++++++++++++++++-------------------- src/http_auth.erl | 3 +-- src/statusreport.erl | 41 ++++++++++++++++++++++++----- statusserver/src/statusserver.erl | 12 +++++---- 7 files changed, 99 insertions(+), 69 deletions(-) diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl index f1e9253..068725c 100644 --- a/merge/src/merge_backup.erl +++ b/merge/src/merge_backup.erl @@ -44,12 +44,12 @@ backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) - lager:debug("~p: logorder size ~B", [NodeName, Size]), ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards". try - {ok, VerifiedSize} = verified_size(NodeAddress), + {ok, VerifiedSize} = verified_size(NodeName, NodeAddress), lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]), case VerifiedSize == Size of true -> TreeHead = ht:root(Size - 1), - ok = check_root(NodeAddress, Size, TreeHead), + ok = check_root(NodeName, NodeAddress, Size, TreeHead), ok = write_backupfile(NodeName, Size, TreeHead); false -> true = VerifiedSize < Size, % Secondary ahead of primary? @@ -68,14 +68,14 @@ do_backup(_, _, _, 0) -> do_backup(NodeName, NodeAddress, Start, NTotal) -> N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)), Hashes = index:getrange(logorder, Start, Start + N - 1), - ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), - {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress), + ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), + {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName), HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), - ok = merge_util:sendentries(NodeAddress, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), + ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), Size = Start + N, TreeHead = ht:root(Size - 1), - ok = check_root(NodeAddress, Size, TreeHead), - ok = setverifiedsize(NodeAddress, Size), + ok = check_root(NodeName, NodeAddress, Size, TreeHead), + ok = setverifiedsize(NodeName, NodeAddress, Size), ok = write_backupfile(NodeName, Size, TreeHead), true = NTotal >= N, do_backup(NodeName, NodeAddress, Size, NTotal - N). @@ -88,8 +88,8 @@ write_backupfile(NodeName, TreeSize, TreeHead) -> {"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}), atomic:replacefile(Path, Content). -check_root(NodeAddress, Size, TreeHead) -> - {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size), +check_root(NodeName, NodeAddress, Size, TreeHead) -> + {ok, TreeHeadToVerify} = verifyroot(NodeName, NodeAddress, Size), case TreeHeadToVerify == TreeHead of true -> ok; @@ -99,34 +99,34 @@ check_root(NodeAddress, Size, TreeHead) -> root_mismatch end. -verifyroot(NodeAddress, TreeSize) -> +verifyroot(NodeName, NodeAddress, TreeSize) -> DebugTag = io_lib:format("verifyroot ~B", [TreeSize]), URL = NodeAddress ++ "verifyroot", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})), - case merge_util:request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))}; Err -> throw({request_error, result, DebugTag, Err}) end. -verified_size(NodeAddress) -> +verified_size(NodeName, NodeAddress) -> DebugTag = "verifiedsize", URL = NodeAddress ++ "verifiedsize", - case merge_util:request(DebugTag, URL) of + case merge_util:request(DebugTag, URL, NodeName) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"size">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. -setverifiedsize(NodeAddress, Size) -> +setverifiedsize(NodeName, NodeAddress, Size) -> DebugTag = io_lib:format("setverifiedsize ~B", [Size]), URL = NodeAddress ++ "setverifiedsize", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})), - case merge_util:request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index da6b667..3c38401 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -64,7 +64,7 @@ dist({struct, PropList} = STH, [NodeAddress, Treesize, Logordersize]), statusreport:report("merge_dist", NodeName, "targetsth", Treesize), ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)), - ok = publish_sth(NodeAddress, STH), + ok = publish_sth(NodeName, NodeAddress, STH), statusreport:report("merge_dist", NodeName, "sth", Treesize), lager:info("~p: Published STH with size ~B and timestamp " ++ "~p.", [NodeAddress, Treesize, Timestamp]), @@ -87,7 +87,7 @@ dist({struct, PropList} = STH, %% @doc Has nonlocal return because of throw further down in %% merge_util:request/4. do_dist(NodeAddress, NodeName, Size) -> - {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), + {ok, VerifiedSize} = frontend_get_verifiedsize(NodeName, NodeAddress), lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]), true = VerifiedSize =< Size, do_dist(NodeAddress, NodeName, VerifiedSize, Size - VerifiedSize). @@ -100,40 +100,40 @@ do_dist(NodeAddress, NodeName, Start, NTotal) -> Hashes = index:getrange(logorder, Start, Start + N - 1), SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), - ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), + ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, SendlogChunksize), statusreport:report("merge_dist", NodeName, "sendlog", Start + N), - {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress), + {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName), lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]), HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), - ok = merge_util:sendentries(NodeAddress, HashesMissing, SendentriesChunksize), - {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N), + ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, SendentriesChunksize), + {ok, NewSize} = frontend_verify_entries(NodeName, NodeAddress, Start + N), lager:info("~p: Done distributing ~B out of ~B entries.", [NodeAddress, NewSize-Start, NTotal]), statusreport:report("merge_dist", NodeName, "verified", Start + N), true = NTotal >= NewSize - Start, do_dist(NodeAddress, NodeName, NewSize, NTotal - (NewSize - Start)). -frontend_get_verifiedsize(NodeAddress) -> - frontend_verify_entries(NodeAddress, 0). +frontend_get_verifiedsize(NodeName, NodeAddress) -> + frontend_verify_entries(NodeName, NodeAddress, 0). -frontend_verify_entries(NodeAddress, Size) -> +frontend_verify_entries(NodeName, NodeAddress, Size) -> DebugTag = io_lib:format("verify-entries ~B", [Size]), URL = NodeAddress ++ "verify-entries", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), - case merge_util:request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"verified">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. -publish_sth(NodeAddress, STH) -> +publish_sth(NodeName, NodeAddress, STH) -> DebugTag = "publish-sth", URL = NodeAddress ++ "publish-sth", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode(STH)), - case merge_util:request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> diff --git a/merge/src/merge_sth.erl b/merge/src/merge_sth.erl index b8ae1f9..4b77864 100644 --- a/merge/src/merge_sth.erl +++ b/merge/src/merge_sth.erl @@ -66,7 +66,7 @@ make_sth(CurSize, State) -> Wait = case NewSize < CurSize of true -> - statusreport:report("merge_sth", "sth", "sth", null), + statusreport:report("merge_sth", http_auth:own_name(), "sth", null), lager:debug("waiting for enough backups to reach ~B, now at ~B", [CurSize, NewSize]), 1; @@ -91,7 +91,7 @@ do_make_sth(Size) -> {"sha256_root_hash", base64:encode(NewRoot)}, {"tree_head_signature", base64:encode(PackedSignature)}], ok = plop:save_sth({struct, NewSTH}), - statusreport:report("merge_sth", "sth", "sth", Size), + statusreport:report("merge_sth", http_auth:own_name(), "sth", Size), ok; false -> lager:error("The signature we got for new tree of size ~B doesn't " ++ diff --git a/merge/src/merge_util.erl b/merge/src/merge_util.erl index 24eba60..c76d05f 100644 --- a/merge/src/merge_util.erl +++ b/merge/src/merge_util.erl @@ -2,82 +2,82 @@ %%% See LICENSE for licensing information. -module(merge_util). --export([sendlog/4, sendentries/3, missingentries/1]). --export([request/2, request/4]). +-export([sendlog/5, sendentries/4, missingentries/2]). +-export([request/3, request/5]). -export([readfile/1, nfetched/0]). -request(DebugTag, URL) -> - request(DebugTag, URL, [], <<>>). +request(DebugTag, URL, NodeName) -> + request(DebugTag, URL, NodeName, [], <<>>). -request(DebugTag, URL, Headers, RequestBody) -> +request(DebugTag, URL, NodeName, Headers, RequestBody) -> case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of {error, Err} -> - statusreport:report_multi("merge_errors", URL, "http_error", list_to_binary(io_lib:format("~w", [Err]))), + statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(io_lib:format("~w", [Err]))), throw({request_error, request, DebugTag, Err}); {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> - statusreport:report_multi("merge_errors", URL, "http_error", StatusCode), + statusreport:report_multi("merge_errors", NodeName, "http_error", StatusCode), throw({request_error, failure, DebugTag, StatusCode}); {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> case (catch mochijson2:decode(Body)) of {error, Err} -> - statusreport:report_multi("merge_errors", URL, "http_error", list_to_binary(Err)), + statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(Err)), throw({request_error, decode, DebugTag, Err}); {struct, PropList} -> - statusreport:report_multi("merge_errors", URL, "http_error", 200), + statusreport:report_multi("merge_errors", NodeName, "http_error", 200), {proplists:get_value(<<"result">>, PropList), PropList} end end. -sendlog(NodeAddress, Start, Hashes, Chunksize) -> +sendlog(NodeAddress, NodeName, Start, Hashes, Chunksize) -> lager:debug("sending log: start=~B, N=~B, chunksize=~B", [Start, length(Hashes), Chunksize]), - sendlog_chunk(NodeAddress, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize). + sendlog_chunk(NodeAddress, NodeName, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize). -sendlog_chunk(_, _, {[], _}, _) -> +sendlog_chunk(_, _, _, {[], _}, _) -> ok; -sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> +sendlog_chunk(NodeAddress, NodeName, Start, {Chunk, Rest}, Chunksize) -> lager:debug("sending log chunk: start=~B, N=~B", [Start, length(Chunk)]), - ok = sendlog_request(NodeAddress, Start, Chunk), - sendlog_chunk(NodeAddress, Start + length(Chunk), + ok = sendlog_request(NodeAddress, NodeName, Start, Chunk), + sendlog_chunk(NodeAddress, NodeName, Start + length(Chunk), lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). -sendlog_request(NodeAddress, Start, Hashes) -> +sendlog_request(NodeAddress, NodeName, Start, Hashes) -> DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]), URL = NodeAddress ++ "sendlog", Headers = [{"Content-Type", "text/json"}], EncodedHashes = [base64:encode(H) || H <- Hashes], RequestBody = list_to_binary(mochijson2:encode({[{"start", Start}, {"hashes", EncodedHashes}]})), - case request(DebugTag, URL, Headers, RequestBody) of + case request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end. -missingentries(NodeAddress) -> +missingentries(NodeAddress, NodeName) -> DebugTag = "missingentries", URL = NodeAddress ++ "missingentries", - case request(DebugTag, URL) of + case request(DebugTag, URL, NodeName) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"entries">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. -sendentries(NodeAddress, Hashes, Chunksize) -> +sendentries(NodeAddress, NodeName, Hashes, Chunksize) -> lager:debug("sending entries: N=~B, chunksize=~B", [length(Hashes), Chunksize]), {ChunkOfHashes, RestOfHashes} = lists:split(min(Chunksize, length(Hashes)), Hashes), - sendentries_chunk(NodeAddress, {ChunkOfHashes, RestOfHashes}, Chunksize). + sendentries_chunk(NodeAddress, NodeName, {ChunkOfHashes, RestOfHashes}, Chunksize). -sendentries_chunk(_, {[], _}, _) -> +sendentries_chunk(_, _, {[], _}, _) -> ok; -sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> +sendentries_chunk(NodeAddress, NodeName, {Chunk, Rest}, Chunksize) -> lager:debug("sending entries chunk: N=~B", [length(Chunk)]), HashesAndEntries = lists:zip(Chunk, lists:map(fun db:entry_for_leafhash/1, Chunk)), case lists:keysearch(noentry, 2, HashesAndEntries) of false -> - ok = sendentries_request(NodeAddress, HashesAndEntries), - sendentries_chunk(NodeAddress, + ok = sendentries_request(NodeAddress, NodeName, HashesAndEntries), + sendentries_chunk(NodeAddress, NodeName, lists:split(min(Chunksize, length(Rest)), Rest), Chunksize); Missing -> @@ -85,13 +85,13 @@ sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> {error, entrynotindb} end. -sendentries_request(NodeAddress, HashesAndEntries) -> +sendentries_request(NodeAddress, NodeName, HashesAndEntries) -> DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]), URL = NodeAddress ++ "sendentry", Headers = [{"Content-Type", "text/json"}], L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]), RequestBody = list_to_binary(L), - case request(DebugTag, URL, Headers, RequestBody) of + case request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> diff --git a/src/http_auth.erl b/src/http_auth.erl index e083a2c..c8a8389 100644 --- a/src/http_auth.erl +++ b/src/http_auth.erl @@ -25,9 +25,8 @@ read_key_table() -> none end. - own_name() -> - {_Key, KeyName} = own_key(), + {ok, {KeyName, _}} = application:get_env(plop, own_key), KeyName. own_key() -> diff --git a/src/statusreport.erl b/src/statusreport.erl index 63414cd..f0b7503 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -24,7 +24,8 @@ init([]) -> process_flag(trap_exit, true), ReportInterval = application:get_env(plop, status_report_interval, 1000), lager:info("~p: starting", [?MODULE]), - %Timer = erlang:start_timer(1000, self(), dist), + HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000), + erlang:start_timer(HeartbeatInterval, self(), heartbeat), {ok, #state{timer = none, nodename = http_auth:own_name(), statusreports = dict:new(), @@ -49,6 +50,20 @@ store_multi_status(State, Service, Target, Variable, Status) -> State#state.statusreports), State#state{statusreports = Statusreports}. +store_set_status(State, Service, Target, Variable, Statuses) -> + Statusreports = dict:store({Service, Target, Variable}, + {multi, Statuses}, + State#state.statusreports), + State#state{statusreports = Statusreports}. + +heartbeat(State) -> + {ok, ConfigVersion} = plopconfig:get_env(version), + RunningApps = [atom_to_list(App) ++ " " ++ Vsn || {App, _Desc, Vsn} <- application:which_applications()], + + NewState1 = store_status(State, "heartbeat", "", "configversion", ConfigVersion), + NewState2 = store_set_status(NewState1, "heartbeat", "", "applications", sets:from_list(RunningApps)), + NewState2. + handle_call(_, _From, State) -> {noreply, State}. @@ -61,7 +76,14 @@ handle_cast({report_multi, Service, Target, Variable, Status}, State) -> handle_info({timeout, _Timer, force_send}, State) -> lager:debug("statusreport timer timeout"), - {noreply, force_send(State)}. + {noreply, force_send(State)}; + +handle_info({timeout, _Timer, heartbeat}, State) -> + lager:debug("statusreport timer timeout"), + HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000), + erlang:start_timer(HeartbeatInterval, self(), heartbeat), + NewState = heartbeat(State), + {noreply, try_send(NewState)}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -105,10 +127,17 @@ group_by_service(Statusreports) -> dict:append(Service, {Target, Variable, Status}, Acc) end, dict:new(), dict:to_list(Statusreports))). -encode_status({single, Status}) -> +encode_one_status(Status) when is_number(Status) -> Status; +encode_one_status(Status) when is_list(Status) -> + list_to_binary(Status); +encode_one_status(Status) when is_binary(Status) -> + Status. + +encode_status({single, Status}) -> + encode_one_status(Status); encode_status({multi, Statuses}) -> - sets:to_list(Statuses). + lists:map(fun encode_one_status/1, sets:to_list(Statuses)). send(Service, Statusreports, Nodename) -> lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), @@ -160,10 +189,10 @@ try_send(State) -> force_send(State) end. -report(Service, Target, Variable, Status) -> +report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) -> lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). -report_multi(Service, Target, Variable, Status) -> +report_multi(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) -> lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl index 323b28d..f38bf69 100644 --- a/statusserver/src/statusserver.erl +++ b/statusserver/src/statusserver.erl @@ -17,9 +17,10 @@ request(post, ?APPURL_PLOP_STATUS, Service, Input) -> Entries when is_list(Entries) -> lists:foreach(fun ({struct, PropList}) -> Target = proplists:get_value(<<"target">>, PropList), + Source = proplists:get_value(<<"source">>, PropList), Variable = proplists:get_value(<<"key">>, PropList), Status = proplists:get_value(<<"value">>, PropList), - set_status(Service, Target, Variable, Status) + set_status(Service, Source, Target, Variable, Status) end, Entries) end, success({[{result, <<"ok">>}]}); @@ -27,11 +28,12 @@ request(get, "", "status", Input) -> Now = erlang:monotonic_time(millisecond), Variables = [{struct, [ {service, list_to_binary(Service)}, + {source, Source}, {target, Target}, {variable, Variable}, {status, Status}, {age, (Now - Timestamp) / 1000} - ]} || {{Service, Target, Variable}, Status, Timestamp} <- get_status()], + ]} || {{Service, Source, Target, Variable}, Status, Timestamp} <- get_status()], success({[{result, Variables}]}). @@ -63,9 +65,9 @@ create_statusdb() -> get_status() -> [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')]. -set_status(Service, Target, Variable, Status) -> - lager:info("status: ~p ~p ~p ~p", [Service, Target, Variable, Status]), +set_status(Service, Source, Target, Variable, Status) -> + lager:info("status: ~p ~p ~p ~p ~p", [Service, Source, Target, Variable, Status]), Timestamp = erlang:monotonic_time(millisecond), true = ets:insert(?STATUSDB_TABLE, - {{Service, Target, Variable}, Status, Timestamp}), + {{Service, Source, Target, Variable}, Status, Timestamp}), ok. -- cgit v1.1 From 515973768e37214d861fe592f25f2907567f2444 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 9 Mar 2017 00:45:02 +0100 Subject: Added ht:dump --- src/ht.erl | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/ht.erl b/src/ht.erl index 7f52db0..4e0e4c5 100644 --- a/src/ht.erl +++ b/src/ht.erl @@ -29,7 +29,7 @@ -behaviour(gen_server). -export([reset_tree/1, load_tree/1, size/0, leaf_hash/1]). --export([add/1, root/0, root/1, path/2, consistency/2]). +-export([add/1, root/0, root/1, path/2, consistency/2, dump/1]). -export([start_link/0, start_link/1, stop/0]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). @@ -96,6 +96,8 @@ print_tree() -> call(?MODULE, {print_tree, 4}). print_tree(HashOutputLen) -> call(?MODULE, {print_tree, HashOutputLen}). +dump(Filename) -> + call(?MODULE, {dump, Filename}). %% gen_server callbacks init(Args) -> @@ -141,7 +143,9 @@ handle_call({consistency, Version1, Version2}, _From, State) -> handle_call(testing_get_state, _From, State) -> {reply, State, State}; handle_call({print_tree, HashOutputLen}, _From, State) -> - {reply, print_tree(State, HashOutputLen), State}. + {reply, print_tree(State, HashOutputLen), State}; +handle_call({dump, Filename}, _From, State) -> + {reply, dump(State, Filename), State}. %%%%%%%%%%%%%%%%%%%% %% Private. @@ -444,6 +448,26 @@ print_layer(Tree, HashOutputLen, Layer, ILast) -> lists:seq(0, ILast)), io:format("~n"). +dump(Tree, Filename) -> + {ok, File} = file:open(Filename, [read, write, binary]), + ok = file:write(File, "# evaluated " ++ integer_to_list(Tree#tree.evaluated) ++ "\n"), + lists:foreach(fun (R) -> + write_layer_header(File, R), + lists:foreach(fun (I) -> + Entry = ts:retrieve(Tree#tree.store, {R, I}), + dumpone(File, Entry) + end, lists:seq(0, ts:count(Tree#tree.store, R) - 1)) + end, lists:seq(0, depth(Tree) - 1)), + file:close(File). + +write_layer_header(File, Layer) -> + EntryText = "# layer " ++ integer_to_list(Layer) ++ "\n", + ok = file:write(File, EntryText). + +dumpone(File, Entry) -> + EntryText = hex:bin_to_hexstr(Entry) ++ "\n", + ok = file:write(File, EntryText). + %%%%%%%%%%%%%%%%%%%% %% Testing ht. -- cgit v1.1 From 9c05acc9ee261d29374cfcbe74844013ec06ad85 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Tue, 14 Mar 2017 13:19:22 +0100 Subject: Fix copyright year. --- statusserver/src/statusserver.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl index f38bf69..e4f38d5 100644 --- a/statusserver/src/statusserver.erl +++ b/statusserver/src/statusserver.erl @@ -1,8 +1,6 @@ -%%% Copyright (c) 2014-2016, NORDUnet A/S. +%%% Copyright (c) 2017, NORDUnet A/S. %%% See LICENSE for licensing information. -%%% @doc Frontend node API - -module(statusserver). -export([init_module/0]). %% API (URL) -- cgit v1.1 From 0e3e7676cd119ceace2686371fe6575cc95f19bf Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Tue, 14 Mar 2017 13:19:45 +0100 Subject: Fix statusserver version number. --- statusserver/ebin/statusserver.app | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statusserver/ebin/statusserver.app b/statusserver/ebin/statusserver.app index 1a032f1..10a1ba8 100644 --- a/statusserver/ebin/statusserver.app +++ b/statusserver/ebin/statusserver.app @@ -5,7 +5,7 @@ {application, statusserver, [{description, "Plop statusserver"}, - {vsn, "0.10.1"}, + {vsn, "1.0.1-alpha-dev"}, {modules, [statusserver_app, statusserver_sup, statusserver]}, {applications, [kernel, stdlib, lager, plop]}, {registered, [statusserver_sup, statusserver]}, -- cgit v1.1 From 39cc6c19e79c19000d5e2174aa3b5f5c2ed2545b Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 15 Mar 2017 17:16:06 +0100 Subject: Add compat functions for timing functionality missing in R17. --- src/plop_compat.erl | 27 +++++++++++++++++++++++++-- src/statusreport.erl | 8 ++++---- statusserver/src/statusserver.erl | 4 ++-- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/src/plop_compat.erl b/src/plop_compat.erl index 9a98e3d..4d45590 100644 --- a/src/plop_compat.erl +++ b/src/plop_compat.erl @@ -2,14 +2,17 @@ %%% See LICENSE for licensing information. -module(plop_compat). --export([unpack_spki/1, timestamp/0]). +-export([unpack_spki/1, timestamp/0, monotonic_time/1, start_timer/4]). -include_lib("public_key/include/public_key.hrl"). unpack_spki(SPKI) -> unpack_spki(erlang:system_info(otp_release), SPKI). timestamp() -> timestamp(erlang:system_info(otp_release)). - +monotonic_time(Unit) -> + monotonic_time(erlang:system_info(otp_release), Unit). +start_timer(Time, Dest, Msg, Options) -> + start_timer(erlang:system_info(otp_release), Time, Dest, Msg, Options). unpack_spki("R16" ++ _, SPKI) -> #'SubjectPublicKeyInfo'{subjectPublicKey = {_, Octets}, @@ -37,3 +40,23 @@ timestamp("18") -> erlang:timestamp(); timestamp("19") -> erlang:timestamp(). + +monotonic_time("R16" ++ _, millisecond) -> + {MeS, S, MiS} = timestamp(), + MeS * 1000000 + S * 1000 + MiS; +monotonic_time("17", millisecond) -> + {MeS, S, MiS} = timestamp(), + MeS * 1000000 + S * 1000 + MiS; +monotonic_time("18", Unit) -> + erlang:monotonic_time(Unit); +monotonic_time("19", Unit) -> + erlang:monotonic_time(Unit). + +start_timer("R16" ++ _, Time, Dest, Msg, [{abs, true}]) -> + erlang:start_timer(max(0, Time - monotonic_time(millisecond)), Dest, Msg); +start_timer("17", Time, Dest, Msg, [{abs, true}]) -> + erlang:start_timer(max(0, Time - monotonic_time(millisecond)), Dest, Msg); +start_timer("18", Time, Dest, Msg, Options) -> + erlang:start_timer(Time, Dest, Msg, Options); +start_timer("19", Time, Dest, Msg, Options) -> + erlang:start_timer(Time, Dest, Msg, Options). diff --git a/src/statusreport.erl b/src/statusreport.erl index f0b7503..a9fef7f 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -29,7 +29,7 @@ init([]) -> {ok, #state{timer = none, nodename = http_auth:own_name(), statusreports = dict:new(), - lastsent = erlang:monotonic_time(millisecond) - ReportInterval}}. + lastsent = plop_compat:monotonic_time(millisecond) - ReportInterval}}. store_status(State, Service, Target, Variable, Status) -> Statusreports = dict:store({Service, Target, Variable}, @@ -101,7 +101,7 @@ set_timer(State) -> case State#state.timer of none -> ReportInterval = application:get_env(plop, status_report_interval, 1000), - Timer = erlang:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]), + Timer = plop_compat:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]), State#state{timer = Timer}; _ -> State @@ -174,12 +174,12 @@ force_send(State) -> send(Service, Statusreports, State#state.nodename) end, group_by_service(State#state.statusreports)), NewState = cancel_timer(State), - NewState#state{statusreports = dict:new(), lastsent = erlang:monotonic_time(millisecond)}. + NewState#state{statusreports = dict:new(), lastsent = plop_compat:monotonic_time(millisecond)}. try_send(State) -> ReportInterval = application:get_env(plop, status_report_interval, 1000), NextSend = State#state.lastsent + ReportInterval, - Now = erlang:monotonic_time(millisecond), + Now = plop_compat:monotonic_time(millisecond), if NextSend > Now -> lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]), diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl index e4f38d5..81fcd7a 100644 --- a/statusserver/src/statusserver.erl +++ b/statusserver/src/statusserver.erl @@ -23,7 +23,7 @@ request(post, ?APPURL_PLOP_STATUS, Service, Input) -> end, success({[{result, <<"ok">>}]}); request(get, "", "status", Input) -> - Now = erlang:monotonic_time(millisecond), + Now = plop_compat:monotonic_time(millisecond), Variables = [{struct, [ {service, list_to_binary(Service)}, {source, Source}, @@ -65,7 +65,7 @@ get_status() -> set_status(Service, Source, Target, Variable, Status) -> lager:info("status: ~p ~p ~p ~p ~p", [Service, Source, Target, Variable, Status]), - Timestamp = erlang:monotonic_time(millisecond), + Timestamp = plop_compat:monotonic_time(millisecond), true = ets:insert(?STATUSDB_TABLE, {{Service, Source, Target, Variable}, Status, Timestamp}), ok. -- cgit v1.1 From 8bb572816040a8ecda50be9687cd1ddc76436f65 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Tue, 14 Mar 2017 14:58:41 +0100 Subject: Handle 'null' case in statusreport. --- src/statusreport.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/statusreport.erl b/src/statusreport.erl index a9fef7f..db85b84 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -132,7 +132,9 @@ encode_one_status(Status) when is_number(Status) -> encode_one_status(Status) when is_list(Status) -> list_to_binary(Status); encode_one_status(Status) when is_binary(Status) -> - Status. + Status; +encode_one_status(null) -> + null. encode_status({single, Status}) -> encode_one_status(Status); @@ -189,7 +191,7 @@ try_send(State) -> force_send(State) end. -report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) -> +report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status); Status == null -> lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). -- cgit v1.1