summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2017-03-17 01:24:44 +0100
committerMagnus Ahltorp <map@kth.se>2017-03-17 01:46:32 +0100
commit2b7dda1f9519b2b5d722ee68fa7da596ae9b0e4c (patch)
tree598b02751d5273c93adf392587006cd346fcdda9
parent28e24e48dcd6203f2c24e28f15aec8167697c9c0 (diff)
Added benchmark reporting.benchreport
-rw-r--r--merge/src/merge_backup.erl8
-rw-r--r--merge/src/merge_dist.erl8
-rw-r--r--src/bench.erl18
-rw-r--r--src/plop_compat.erl30
-rw-r--r--src/statusreport.erl66
-rw-r--r--statusserver/src/statusserver.erl50
6 files changed, 155 insertions, 25 deletions
diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl
index 068725c..3c19527 100644
--- a/merge/src/merge_backup.erl
+++ b/merge/src/merge_backup.erl
@@ -20,6 +20,7 @@ start_link(Args) ->
init([Name, Address]) ->
lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]),
Timer = erlang:start_timer(1000, self(), backup),
+ bench:timingpoint("merge_backup", Name, "start"),
{ok, #state{timer = Timer, node_name = Name, node_address = Address}}.
handle_call(stop, _From, State) ->
@@ -42,14 +43,17 @@ terminate(Reason, #state{timer = Timer}) ->
backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) ->
lager:debug("~p: logorder size ~B", [NodeName, Size]),
+ bench:timingpoint("merge_backup", NodeName, "idle"),
ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards".
try
{ok, VerifiedSize} = verified_size(NodeName, NodeAddress),
+ bench:timingpoint("merge_backup", NodeName, "verifiedsize"),
lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]),
case VerifiedSize == Size of
true ->
TreeHead = ht:root(Size - 1),
ok = check_root(NodeName, NodeAddress, Size, TreeHead),
+ bench:timingpoint("merge_backup", NodeName, "verifyroot"),
ok = write_backupfile(NodeName, Size, TreeHead);
false ->
true = VerifiedSize < Size, % Secondary ahead of primary?
@@ -69,13 +73,17 @@ do_backup(NodeName, NodeAddress, Start, NTotal) ->
N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)),
Hashes = index:getrange(logorder, Start, Start + N - 1),
ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)),
+ bench:timingpoint("merge_backup", NodeName, "sendlog"),
{ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName),
HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded),
ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)),
+ bench:timingpoint("merge_backup", NodeName, "sendentries"),
Size = Start + N,
TreeHead = ht:root(Size - 1),
ok = check_root(NodeName, NodeAddress, Size, TreeHead),
+ bench:timingpoint("merge_backup", NodeName, "verifyroot"),
ok = setverifiedsize(NodeName, NodeAddress, Size),
+ bench:timingpoint("merge_backup", NodeName, "setverifiedsize"),
ok = write_backupfile(NodeName, Size, TreeHead),
true = NTotal >= N,
do_backup(NodeName, NodeAddress, Size, NTotal - N).
diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl
index 3c38401..23c9d19 100644
--- a/merge/src/merge_dist.erl
+++ b/merge/src/merge_dist.erl
@@ -21,6 +21,7 @@ start_link(Args) ->
init([Name, Address]) ->
lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]),
Timer = erlang:start_timer(1000, self(), dist),
+ bench:timingpoint("merge_dist", Name, "start"),
{ok, #state{timer = Timer,
node_name = Name,
node_address = Address,
@@ -51,6 +52,7 @@ dist({struct, PropList} = STH,
#state{node_address = NodeAddress,
node_name = NodeName,
sth_timestamp = LastTimestamp} = State) ->
+ bench:timingpoint("merge_dist", NodeName, "idle"),
Treesize = proplists:get_value(<<"tree_size">>, PropList),
Timestamp = proplists:get_value(<<"timestamp">>, PropList),
RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)),
@@ -59,12 +61,14 @@ dist({struct, PropList} = STH,
TS = case Timestamp > LastTimestamp of
true ->
true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature),
+ bench:timingpoint("merge_dist", NodeName, "verify_sth"),
try
lager:info("~p: starting dist, sth at ~B, logorder at ~B",
[NodeAddress, Treesize, Logordersize]),
statusreport:report("merge_dist", NodeName, "targetsth", Treesize),
ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)),
ok = publish_sth(NodeName, NodeAddress, STH),
+ bench:timingpoint("merge_dist", NodeName, "publish_sth"),
statusreport:report("merge_dist", NodeName, "sth", Treesize),
lager:info("~p: Published STH with size ~B and timestamp " ++
"~p.", [NodeAddress, Treesize, Timestamp]),
@@ -101,12 +105,16 @@ do_dist(NodeAddress, NodeName, Start, NTotal) ->
SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000),
SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100),
ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, SendlogChunksize),
+ bench:timingpoint("merge_dist", NodeName, "sendlog"),
statusreport:report("merge_dist", NodeName, "sendlog", Start + N),
{ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName),
+ bench:timingpoint("merge_dist", NodeName, "missingentries"),
lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]),
HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded),
ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, SendentriesChunksize),
+ bench:timingpoint("merge_dist", NodeName, "sendentries"),
{ok, NewSize} = frontend_verify_entries(NodeName, NodeAddress, Start + N),
+ bench:timingpoint("merge_dist", NodeName, "verifyentries"),
lager:info("~p: Done distributing ~B out of ~B entries.",
[NodeAddress, NewSize-Start, NTotal]),
statusreport:report("merge_dist", NodeName, "verified", Start + N),
diff --git a/src/bench.erl b/src/bench.erl
new file mode 100644
index 0000000..06e4777
--- /dev/null
+++ b/src/bench.erl
@@ -0,0 +1,18 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(bench).
+
+-export([timingpoint/3]).
+
+timingpoint(Service, Target, Tag) ->
+ Thispoint = plop_compat:monotonic_time(millisecond),
+ Seq = plop_compat:unique_integer([monotonic]),
+ case get(bench_lastpoint) of
+ undefined ->
+ statusreport:bench(Service, Target, Tag, Seq, Thispoint + plop_compat:time_offset(millisecond), null);
+ Lastpoint ->
+ statusreport:bench(Service, Target, Tag, Seq, Lastpoint + plop_compat:time_offset(millisecond), Thispoint - Lastpoint)
+ end,
+ put(bench_lastpoint, Thispoint),
+ ok.
diff --git a/src/plop_compat.erl b/src/plop_compat.erl
index 4d45590..5c1fa17 100644
--- a/src/plop_compat.erl
+++ b/src/plop_compat.erl
@@ -2,7 +2,7 @@
%%% See LICENSE for licensing information.
-module(plop_compat).
--export([unpack_spki/1, timestamp/0, monotonic_time/1, start_timer/4]).
+-export([unpack_spki/1, timestamp/0, monotonic_time/1, start_timer/4, unique_integer/1, time_offset/1]).
-include_lib("public_key/include/public_key.hrl").
unpack_spki(SPKI) ->
@@ -13,6 +13,10 @@ monotonic_time(Unit) ->
monotonic_time(erlang:system_info(otp_release), Unit).
start_timer(Time, Dest, Msg, Options) ->
start_timer(erlang:system_info(otp_release), Time, Dest, Msg, Options).
+unique_integer(Modifiers) ->
+ unique_integer(erlang:system_info(otp_release), Modifiers).
+time_offset(Unit) ->
+ time_offset(erlang:system_info(otp_release), Unit).
unpack_spki("R16" ++ _, SPKI) ->
#'SubjectPublicKeyInfo'{subjectPublicKey = {_, Octets},
@@ -43,15 +47,35 @@ timestamp("19") ->
monotonic_time("R16" ++ _, millisecond) ->
{MeS, S, MiS} = timestamp(),
- MeS * 1000000 + S * 1000 + MiS;
+ (MeS * 1000000 + S) * 1000 + MiS div 1000;
monotonic_time("17", millisecond) ->
{MeS, S, MiS} = timestamp(),
- MeS * 1000000 + S * 1000 + MiS;
+ (MeS * 1000000 + S) * 1000 + MiS div 1000;
monotonic_time("18", Unit) ->
erlang:monotonic_time(Unit);
monotonic_time("19", Unit) ->
erlang:monotonic_time(Unit).
+unique_integer("R16", _Modifiers) ->
+ {MeS, S, MiS} = erlang:now(),
+ (MeS * 1000000 + S) * 1000000 + MiS;
+unique_integer("17", _Modifiers) ->
+ {MeS, S, MiS} = erlang:now(),
+ (MeS * 1000000 + S) * 1000000 + MiS;
+unique_integer("18", Modifiers) ->
+ erlang:unique_integer(Modifiers);
+unique_integer("19", Modifiers) ->
+ erlang:unique_integer(Modifiers).
+
+time_offset("R16", _Unit) ->
+ 0;
+time_offset("17", _Unit) ->
+ 0;
+time_offset("18", Unit) ->
+ erlang:time_offset(Unit);
+time_offset("19", Unit) ->
+ erlang:time_offset(Unit).
+
start_timer("R16" ++ _, Time, Dest, Msg, [{abs, true}]) ->
erlang:start_timer(max(0, Time - monotonic_time(millisecond)), Dest, Msg);
start_timer("17", Time, Dest, Msg, [{abs, true}]) ->
diff --git a/src/statusreport.erl b/src/statusreport.erl
index db85b84..f010751 100644
--- a/src/statusreport.erl
+++ b/src/statusreport.erl
@@ -8,7 +8,7 @@
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
-export([report/4]).
--export([report_multi/4]).
+-export([report_multi/4, bench/6]).
-record(state, {
timer :: none|reference(),
@@ -32,7 +32,7 @@ init([]) ->
lastsent = plop_compat:monotonic_time(millisecond) - ReportInterval}}.
store_status(State, Service, Target, Variable, Status) ->
- Statusreports = dict:store({Service, Target, Variable},
+ Statusreports = dict:store({{status, Service}, {Target, Variable}},
{single, Status},
State#state.statusreports),
State#state{statusreports = Statusreports}.
@@ -45,13 +45,19 @@ dict_append_set(Key, Value, Dict) ->
{multi, AppendSet}, Dict).
store_multi_status(State, Service, Target, Variable, Status) ->
- Statusreports = dict_append_set({Service, Target, Variable},
+ Statusreports = dict_append_set({{status, Service}, {Target, Variable}},
Status,
State#state.statusreports),
State#state{statusreports = Statusreports}.
+store_bench(State, Service, Target, Tag, Seq, Starttime, Elapsed) ->
+ Statusreports = dict:store({{bench, Service}, {Target, Tag, Seq}},
+ {Starttime, Elapsed},
+ State#state.statusreports),
+ State#state{statusreports = Statusreports}.
+
store_set_status(State, Service, Target, Variable, Statuses) ->
- Statusreports = dict:store({Service, Target, Variable},
+ Statusreports = dict:store({{status, Service}, {Target, Variable}},
{multi, Statuses},
State#state.statusreports),
State#state{statusreports = Statusreports}.
@@ -72,6 +78,9 @@ handle_cast({report, Service, Target, Variable, Status}, State) ->
{noreply, try_send(NewState)};
handle_cast({report_multi, Service, Target, Variable, Status}, State) ->
NewState = store_multi_status(State, Service, Target, Variable, Status),
+ {noreply, try_send(NewState)};
+handle_cast({bench, Service, Target, Tag, Seq, Starttime, Elapsed}, State) ->
+ NewState = store_bench(State, Service, Target, Tag, Seq, Starttime, Elapsed),
{noreply, try_send(NewState)}.
handle_info({timeout, _Timer, force_send}, State) ->
@@ -123,8 +132,8 @@ terminate(Reason, State) ->
group_by_service(Statusreports) ->
dict:to_list(
lists:foldl(
- fun ({{Service, Target, Variable}, Status}, Acc) ->
- dict:append(Service, {Target, Variable, Status}, Acc)
+ fun ({{Service, Group}, Status}, Acc) ->
+ dict:append(Service, {Group, Status}, Acc)
end, dict:new(), dict:to_list(Statusreports))).
encode_one_status(Status) when is_number(Status) ->
@@ -141,22 +150,43 @@ encode_status({single, Status}) ->
encode_status({multi, Statuses}) ->
lists:map(fun encode_one_status/1, sets:to_list(Statuses)).
-send(Service, Statusreports, Nodename) ->
- lager:debug("reporting status to ~p: ~p", [Service, Statusreports]),
- [NodeAddress] = plopconfig:get_env(statusservers, []),
+encode_report(status, Nodename, {{Target, Variable}, Status}) ->
+ {struct,
+ [{"target", list_to_binary(Target)},
+ {"source", list_to_binary(Nodename)},
+ {"key", list_to_binary(Variable)},
+ {"value", encode_status(Status)}]};
+
+encode_report(bench, Nodename, {{Target, Tag, Seq}, {Starttime, Elapsed}}) ->
+ {struct,
+ [{"target", list_to_binary(Target)},
+ {"source", list_to_binary(Nodename)},
+ {"tag", list_to_binary(Tag)},
+ {"seq", Seq},
+ {"starttime", Starttime},
+ {"elapsed", Elapsed}]}.
+
+addresses_for_servicetype(status) ->
+ plopconfig:get_env(statusservers, []);
+addresses_for_servicetype(bench) ->
+ plopconfig:get_env(benchservers, []).
+
+send({ServiceType, Service}, Statusreports, Nodename) ->
+ lager:debug("reporting status to ~p ~p: ~p", [ServiceType, Service, Statusreports]),
+ NodeAddresses = addresses_for_servicetype(ServiceType),
DebugTag = "statusreport",
- URL = NodeAddress ++ Service,
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(
mochijson2:encode(
[
- {struct,
- [{"target", list_to_binary(Target)},
- {"source", list_to_binary(Nodename)},
- {"key", list_to_binary(Variable)},
- {"value", encode_status(Status)}]}
- || {Target, Variable, Status} <- Statusreports
+ encode_report(ServiceType, Nodename, Statusreport)
+ || Statusreport <- Statusreports
])),
+ lists:foreach(fun (NodeAddress) ->
+ send_one_server(DebugTag, NodeAddress ++ Service, Headers, RequestBody)
+ end, NodeAddresses).
+
+send_one_server(DebugTag, URL, Headers, RequestBody) ->
case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of
{error, Err} ->
lager:debug("request error ~p ~p", [DebugTag, Err]);
@@ -198,3 +228,7 @@ report(Service, Target, Variable, Status) when is_number(Status); is_list(Status
report_multi(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) ->
lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]),
gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}).
+
+bench(Service, Target, Tag, Seq, Starttime, Elapsed) ->
+ lager:debug("reporting bench ~p ~p ~p ~p ~p", [Service, Target, Tag, Seq, Starttime, Elapsed]),
+ gen_server:cast(?MODULE, {bench, Service, Target, Tag, Seq, Starttime, Elapsed}).
diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl
index 81fcd7a..ca11e7c 100644
--- a/statusserver/src/statusserver.erl
+++ b/statusserver/src/statusserver.erl
@@ -7,6 +7,7 @@
-export([request/4]).
-define(APPURL_PLOP_STATUS, "plop/v1/status").
+-define(APPURL_PLOP_BENCH, "plop/v1/bench").
request(post, ?APPURL_PLOP_STATUS, Service, Input) ->
case (catch mochijson2:decode(Input)) of
@@ -22,7 +23,23 @@ request(post, ?APPURL_PLOP_STATUS, Service, Input) ->
end, Entries)
end,
success({[{result, <<"ok">>}]});
-request(get, "", "status", Input) ->
+request(post, ?APPURL_PLOP_BENCH, Service, Input) ->
+ case (catch mochijson2:decode(Input)) of
+ {error, E} ->
+ html("bad input:", E);
+ Entries when is_list(Entries) ->
+ lists:foreach(fun ({struct, PropList}) ->
+ Target = proplists:get_value(<<"target">>, PropList),
+ Source = proplists:get_value(<<"source">>, PropList),
+ Tag = proplists:get_value(<<"tag">>, PropList),
+ Seq = proplists:get_value(<<"seq">>, PropList),
+ Starttime = proplists:get_value(<<"starttime">>, PropList),
+ Elapsed = proplists:get_value(<<"elapsed">>, PropList),
+ add_bench(Service, Source, Target, Tag, Seq, Starttime, Elapsed)
+ end, Entries)
+ end,
+ success({[{result, <<"ok">>}]});
+request(get, "", "status", _Input) ->
Now = plop_compat:monotonic_time(millisecond),
Variables = [{struct, [
{service, list_to_binary(Service)},
@@ -32,6 +49,16 @@ request(get, "", "status", Input) ->
{status, Status},
{age, (Now - Timestamp) / 1000}
]} || {{Service, Source, Target, Variable}, Status, Timestamp} <- get_status()],
+ success({[{result, Variables}]});
+request(get, "", "bench", _Input) ->
+ Variables = [{struct, [
+ {service, list_to_binary(Service)},
+ {source, Source},
+ {target, Target},
+ {tag, Tag},
+ {starttime, Starttime},
+ {elapsed, Elapsed}
+ ]} || {{Service, Source, Target, Tag, _Seq, _Starttime}, Starttime, Elapsed} <- get_bench()],
success({[{result, Variables}]}).
@@ -47,25 +74,36 @@ html(Text, Input) ->
"</body></html>~n", [Text, Input])}.
-define(STATUSDB_TABLE, statusserver_statusdb).
+-define(BENCHDB_TABLE, statusserver_benchdb).
init_module() ->
- create_statusdb().
+ create_table(?STATUSDB_TABLE),
+ create_table(?BENCHDB_TABLE).
-create_statusdb() ->
- case ets:info(?STATUSDB_TABLE) of
+create_table(Table) ->
+ case ets:info(Table) of
undefined ->
ok;
_ ->
- ets:delete(?STATUSDB_TABLE)
+ ets:delete(Table)
end,
- ets:new(?STATUSDB_TABLE, [set, public, named_table]).
+ ets:new(Table, [set, public, named_table]).
get_status() ->
[E || [E] <- ets:match(?STATUSDB_TABLE, '$1')].
+get_bench() ->
+ [E || [E] <- ets:match(?BENCHDB_TABLE, '$1')].
+
set_status(Service, Source, Target, Variable, Status) ->
lager:info("status: ~p ~p ~p ~p ~p", [Service, Source, Target, Variable, Status]),
Timestamp = plop_compat:monotonic_time(millisecond),
true = ets:insert(?STATUSDB_TABLE,
{{Service, Source, Target, Variable}, Status, Timestamp}),
ok.
+
+add_bench(Service, Source, Target, Tag, Seq, Starttime, Elapsed) ->
+ lager:info("bench: ~p ~p ~p ~p ~p ~p ~p", [Service, Source, Target, Tag, Seq, Starttime, Elapsed]),
+ true = ets:insert(?BENCHDB_TABLE,
+ {{Service, Source, Target, Tag, Seq, Starttime}, Starttime, Elapsed}),
+ ok.