summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2017-03-15 17:17:58 +0100
committerLinus Nordberg <linus@nordu.net>2017-03-15 17:17:58 +0100
commit598d6ae6d00644c7f6e318cf5a4928ee5a8eb9ca (patch)
tree7653518afa13e7ccc1a3925603d36d85570cb88c
parent7e41c7c7630c4a96567029e6b4d7688a7df6ccee (diff)
parent8bb572816040a8ecda50be9687cd1ddc76436f65 (diff)
Merge branch 'map-statusserver'
-rw-r--r--Emakefile6
-rw-r--r--Makefile1
-rw-r--r--merge/src/merge_backup.erl31
-rw-r--r--merge/src/merge_dist.erl44
-rw-r--r--merge/src/merge_sth.erl2
-rw-r--r--merge/src/merge_util.erl50
-rw-r--r--src/ht.erl28
-rw-r--r--src/http_auth.erl5
-rw-r--r--src/plop_compat.erl27
-rw-r--r--src/plop_sup.erl1
-rw-r--r--src/statusreport.erl200
-rw-r--r--src/storage.erl14
-rw-r--r--statusserver/ebin/statusserver.app13
-rw-r--r--statusserver/src/statusserver.erl71
-rw-r--r--statusserver/src/statusserver_app.erl13
-rw-r--r--statusserver/src/statusserver_sup.erl42
16 files changed, 486 insertions, 62 deletions
diff --git a/Emakefile b/Emakefile
index a40d624..f223dab 100644
--- a/Emakefile
+++ b/Emakefile
@@ -10,3 +10,9 @@
{i, "src/"}, % For plop.hrl.
{outdir, "merge/ebin/"},
{parse_transform, lager_transform}]}.
+{["statusserver/src/*"],
+ [debug_info,
+ {i, "../"}, % For hackney.
+ {i, "src/"}, % For plop.hrl.
+ {outdir, "statusserver/ebin/"},
+ {parse_transform, lager_transform}]}.
diff --git a/Makefile b/Makefile
index 2b2f69d..65f2355 100644
--- a/Makefile
+++ b/Makefile
@@ -11,6 +11,7 @@ clean:
-rm priv/fsynchelper
-rm ebin/*.beam
-rm merge/ebin/*.beam
+ -rm statusserver/ebin/*.beam
dialyze: build
dialyzer ebin merge/ebin
tags:
diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl
index bf20f23..068725c 100644
--- a/merge/src/merge_backup.erl
+++ b/merge/src/merge_backup.erl
@@ -44,12 +44,12 @@ backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) -
lager:debug("~p: logorder size ~B", [NodeName, Size]),
ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards".
try
- {ok, VerifiedSize} = verified_size(NodeAddress),
+ {ok, VerifiedSize} = verified_size(NodeName, NodeAddress),
lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]),
case VerifiedSize == Size of
true ->
TreeHead = ht:root(Size - 1),
- ok = check_root(NodeAddress, Size, TreeHead),
+ ok = check_root(NodeName, NodeAddress, Size, TreeHead),
ok = write_backupfile(NodeName, Size, TreeHead);
false ->
true = VerifiedSize < Size, % Secondary ahead of primary?
@@ -68,27 +68,28 @@ do_backup(_, _, _, 0) ->
do_backup(NodeName, NodeAddress, Start, NTotal) ->
N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)),
Hashes = index:getrange(logorder, Start, Start + N - 1),
- ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)),
- {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress),
+ ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)),
+ {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName),
HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded),
- ok = merge_util:sendentries(NodeAddress, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)),
+ ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)),
Size = Start + N,
TreeHead = ht:root(Size - 1),
- ok = check_root(NodeAddress, Size, TreeHead),
- ok = setverifiedsize(NodeAddress, Size),
+ ok = check_root(NodeName, NodeAddress, Size, TreeHead),
+ ok = setverifiedsize(NodeName, NodeAddress, Size),
ok = write_backupfile(NodeName, Size, TreeHead),
true = NTotal >= N,
do_backup(NodeName, NodeAddress, Size, NTotal - N).
write_backupfile(NodeName, TreeSize, TreeHead) ->
+ statusreport:report("merge_backup", NodeName, "verified", TreeSize),
{ok, BasePath} = application:get_env(plop, verified_path),
Path = BasePath ++ "." ++ NodeName,
Content = mochijson2:encode({[{"tree_size", TreeSize},
{"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}),
atomic:replacefile(Path, Content).
-check_root(NodeAddress, Size, TreeHead) ->
- {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size),
+check_root(NodeName, NodeAddress, Size, TreeHead) ->
+ {ok, TreeHeadToVerify} = verifyroot(NodeName, NodeAddress, Size),
case TreeHeadToVerify == TreeHead of
true ->
ok;
@@ -98,34 +99,34 @@ check_root(NodeAddress, Size, TreeHead) ->
root_mismatch
end.
-verifyroot(NodeAddress, TreeSize) ->
+verifyroot(NodeName, NodeAddress, TreeSize) ->
DebugTag = io_lib:format("verifyroot ~B", [TreeSize]),
URL = NodeAddress ++ "verifyroot",
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})),
- case merge_util:request(DebugTag, URL, Headers, RequestBody) of
+ case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of
{<<"ok">>, PropList} ->
{ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))};
Err ->
throw({request_error, result, DebugTag, Err})
end.
-verified_size(NodeAddress) ->
+verified_size(NodeName, NodeAddress) ->
DebugTag = "verifiedsize",
URL = NodeAddress ++ "verifiedsize",
- case merge_util:request(DebugTag, URL) of
+ case merge_util:request(DebugTag, URL, NodeName) of
{<<"ok">>, PropList} ->
{ok, proplists:get_value(<<"size">>, PropList)};
Err ->
throw({request_error, result, DebugTag, Err})
end.
-setverifiedsize(NodeAddress, Size) ->
+setverifiedsize(NodeName, NodeAddress, Size) ->
DebugTag = io_lib:format("setverifiedsize ~B", [Size]),
URL = NodeAddress ++ "setverifiedsize",
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})),
- case merge_util:request(DebugTag, URL, Headers, RequestBody) of
+ case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of
{<<"ok">>, _} ->
ok;
Err ->
diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl
index f8f0c7c..3c38401 100644
--- a/merge/src/merge_dist.erl
+++ b/merge/src/merge_dist.erl
@@ -48,7 +48,9 @@ dist(noentry, State) ->
Timer = erlang:start_timer(1000, self(), dist),
{noreply, State#state{timer = Timer}};
dist({struct, PropList} = STH,
- #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) ->
+ #state{node_address = NodeAddress,
+ node_name = NodeName,
+ sth_timestamp = LastTimestamp} = State) ->
Treesize = proplists:get_value(<<"tree_size">>, PropList),
Timestamp = proplists:get_value(<<"timestamp">>, PropList),
RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)),
@@ -60,8 +62,10 @@ dist({struct, PropList} = STH,
try
lager:info("~p: starting dist, sth at ~B, logorder at ~B",
[NodeAddress, Treesize, Logordersize]),
- ok = do_dist(NodeAddress, min(Treesize, Logordersize)),
- ok = publish_sth(NodeAddress, STH),
+ statusreport:report("merge_dist", NodeName, "targetsth", Treesize),
+ ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)),
+ ok = publish_sth(NodeName, NodeAddress, STH),
+ statusreport:report("merge_dist", NodeName, "sth", Treesize),
lager:info("~p: Published STH with size ~B and timestamp " ++
"~p.", [NodeAddress, Treesize, Timestamp]),
Timestamp
@@ -82,52 +86,54 @@ dist({struct, PropList} = STH,
%% @doc Has nonlocal return because of throw further down in
%% merge_util:request/4.
-do_dist(NodeAddress, Size) ->
- {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress),
+do_dist(NodeAddress, NodeName, Size) ->
+ {ok, VerifiedSize} = frontend_get_verifiedsize(NodeName, NodeAddress),
lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]),
true = VerifiedSize =< Size,
- do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize).
+ do_dist(NodeAddress, NodeName, VerifiedSize, Size - VerifiedSize).
-do_dist(_, _, 0) ->
+do_dist(_, _, _, 0) ->
ok;
-do_dist(NodeAddress, Start, NTotal) ->
+do_dist(NodeAddress, NodeName, Start, NTotal) ->
DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000),
N = min(DistMaxWindow, NTotal),
Hashes = index:getrange(logorder, Start, Start + N - 1),
SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000),
SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100),
- ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize),
- {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress),
+ ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, SendlogChunksize),
+ statusreport:report("merge_dist", NodeName, "sendlog", Start + N),
+ {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName),
lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]),
HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded),
- ok = merge_util:sendentries(NodeAddress, HashesMissing, SendentriesChunksize),
- {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N),
+ ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, SendentriesChunksize),
+ {ok, NewSize} = frontend_verify_entries(NodeName, NodeAddress, Start + N),
lager:info("~p: Done distributing ~B out of ~B entries.",
[NodeAddress, NewSize-Start, NTotal]),
+ statusreport:report("merge_dist", NodeName, "verified", Start + N),
true = NTotal >= NewSize - Start,
- do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)).
+ do_dist(NodeAddress, NodeName, NewSize, NTotal - (NewSize - Start)).
-frontend_get_verifiedsize(NodeAddress) ->
- frontend_verify_entries(NodeAddress, 0).
+frontend_get_verifiedsize(NodeName, NodeAddress) ->
+ frontend_verify_entries(NodeName, NodeAddress, 0).
-frontend_verify_entries(NodeAddress, Size) ->
+frontend_verify_entries(NodeName, NodeAddress, Size) ->
DebugTag = io_lib:format("verify-entries ~B", [Size]),
URL = NodeAddress ++ "verify-entries",
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})),
- case merge_util:request(DebugTag, URL, Headers, RequestBody) of
+ case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of
{<<"ok">>, PropList} ->
{ok, proplists:get_value(<<"verified">>, PropList)};
Err ->
throw({request_error, result, DebugTag, Err})
end.
-publish_sth(NodeAddress, STH) ->
+publish_sth(NodeName, NodeAddress, STH) ->
DebugTag = "publish-sth",
URL = NodeAddress ++ "publish-sth",
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(mochijson2:encode(STH)),
- case merge_util:request(DebugTag, URL, Headers, RequestBody) of
+ case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of
{<<"ok">>, _} ->
ok;
Err ->
diff --git a/merge/src/merge_sth.erl b/merge/src/merge_sth.erl
index ab1cd8f..4b77864 100644
--- a/merge/src/merge_sth.erl
+++ b/merge/src/merge_sth.erl
@@ -66,6 +66,7 @@ make_sth(CurSize, State) ->
Wait =
case NewSize < CurSize of
true ->
+ statusreport:report("merge_sth", http_auth:own_name(), "sth", null),
lager:debug("waiting for enough backups to reach ~B, now at ~B",
[CurSize, NewSize]),
1;
@@ -90,6 +91,7 @@ do_make_sth(Size) ->
{"sha256_root_hash", base64:encode(NewRoot)},
{"tree_head_signature", base64:encode(PackedSignature)}],
ok = plop:save_sth({struct, NewSTH}),
+ statusreport:report("merge_sth", http_auth:own_name(), "sth", Size),
ok;
false ->
lager:error("The signature we got for new tree of size ~B doesn't " ++
diff --git a/merge/src/merge_util.erl b/merge/src/merge_util.erl
index 7598e40..c76d05f 100644
--- a/merge/src/merge_util.erl
+++ b/merge/src/merge_util.erl
@@ -2,78 +2,82 @@
%%% See LICENSE for licensing information.
-module(merge_util).
--export([sendlog/4, sendentries/3, missingentries/1]).
--export([request/2, request/4]).
+-export([sendlog/5, sendentries/4, missingentries/2]).
+-export([request/3, request/5]).
-export([readfile/1, nfetched/0]).
-request(DebugTag, URL) ->
- request(DebugTag, URL, [], <<>>).
+request(DebugTag, URL, NodeName) ->
+ request(DebugTag, URL, NodeName, [], <<>>).
-request(DebugTag, URL, Headers, RequestBody) ->
+request(DebugTag, URL, NodeName, Headers, RequestBody) ->
case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of
{error, Err} ->
+ statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(io_lib:format("~w", [Err]))),
throw({request_error, request, DebugTag, Err});
{failure, {none, StatusCode, none}, _RespHeaders, _Body} ->
+ statusreport:report_multi("merge_errors", NodeName, "http_error", StatusCode),
throw({request_error, failure, DebugTag, StatusCode});
{success, {_, StatusCode, _}, _, Body} when StatusCode == 200 ->
case (catch mochijson2:decode(Body)) of
{error, Err} ->
+ statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(Err)),
throw({request_error, decode, DebugTag, Err});
{struct, PropList} ->
+ statusreport:report_multi("merge_errors", NodeName, "http_error", 200),
{proplists:get_value(<<"result">>, PropList), PropList}
end
end.
-sendlog(NodeAddress, Start, Hashes, Chunksize) ->
+sendlog(NodeAddress, NodeName, Start, Hashes, Chunksize) ->
lager:debug("sending log: start=~B, N=~B, chunksize=~B", [Start, length(Hashes), Chunksize]),
- sendlog_chunk(NodeAddress, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize).
+ sendlog_chunk(NodeAddress, NodeName, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize).
-sendlog_chunk(_, _, {[], _}, _) ->
+sendlog_chunk(_, _, _, {[], _}, _) ->
ok;
-sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) ->
+sendlog_chunk(NodeAddress, NodeName, Start, {Chunk, Rest}, Chunksize) ->
lager:debug("sending log chunk: start=~B, N=~B", [Start, length(Chunk)]),
- ok = sendlog_request(NodeAddress, Start, Chunk),
- sendlog_chunk(NodeAddress, Start + length(Chunk),
+ ok = sendlog_request(NodeAddress, NodeName, Start, Chunk),
+ sendlog_chunk(NodeAddress, NodeName, Start + length(Chunk),
lists:split(min(Chunksize, length(Rest)), Rest), Chunksize).
-sendlog_request(NodeAddress, Start, Hashes) ->
+sendlog_request(NodeAddress, NodeName, Start, Hashes) ->
DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]),
URL = NodeAddress ++ "sendlog",
Headers = [{"Content-Type", "text/json"}],
EncodedHashes = [base64:encode(H) || H <- Hashes],
RequestBody = list_to_binary(mochijson2:encode({[{"start", Start},
{"hashes", EncodedHashes}]})),
- case request(DebugTag, URL, Headers, RequestBody) of
+ case request(DebugTag, URL, NodeName, Headers, RequestBody) of
{<<"ok">>, _} ->
ok;
Err ->
throw({request_error, result, DebugTag, Err})
end.
-missingentries(NodeAddress) ->
+missingentries(NodeAddress, NodeName) ->
DebugTag = "missingentries",
URL = NodeAddress ++ "missingentries",
- case request(DebugTag, URL) of
+ case request(DebugTag, URL, NodeName) of
{<<"ok">>, PropList} ->
{ok, proplists:get_value(<<"entries">>, PropList)};
Err ->
throw({request_error, result, DebugTag, Err})
end.
-sendentries(NodeAddress, Hashes, Chunksize) ->
+sendentries(NodeAddress, NodeName, Hashes, Chunksize) ->
lager:debug("sending entries: N=~B, chunksize=~B", [length(Hashes), Chunksize]),
{ChunkOfHashes, RestOfHashes} = lists:split(min(Chunksize, length(Hashes)), Hashes),
- sendentries_chunk(NodeAddress, {ChunkOfHashes, RestOfHashes}, Chunksize).
+ sendentries_chunk(NodeAddress, NodeName, {ChunkOfHashes, RestOfHashes}, Chunksize).
-sendentries_chunk(_, {[], _}, _) ->
+sendentries_chunk(_, _, {[], _}, _) ->
ok;
-sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) ->
+sendentries_chunk(NodeAddress, NodeName, {Chunk, Rest}, Chunksize) ->
lager:debug("sending entries chunk: N=~B", [length(Chunk)]),
HashesAndEntries = lists:zip(Chunk, lists:map(fun db:entry_for_leafhash/1, Chunk)),
case lists:keysearch(noentry, 2, HashesAndEntries) of
false ->
- ok = sendentries_request(NodeAddress, HashesAndEntries),
- sendentries_chunk(NodeAddress,
+ ok = sendentries_request(NodeAddress, NodeName, HashesAndEntries),
+ sendentries_chunk(NodeAddress, NodeName,
lists:split(min(Chunksize, length(Rest)), Rest),
Chunksize);
Missing ->
@@ -81,13 +85,13 @@ sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) ->
{error, entrynotindb}
end.
-sendentries_request(NodeAddress, HashesAndEntries) ->
+sendentries_request(NodeAddress, NodeName, HashesAndEntries) ->
DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]),
URL = NodeAddress ++ "sendentry",
Headers = [{"Content-Type", "text/json"}],
L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]),
RequestBody = list_to_binary(L),
- case request(DebugTag, URL, Headers, RequestBody) of
+ case request(DebugTag, URL, NodeName, Headers, RequestBody) of
{<<"ok">>, _} ->
ok;
Err ->
diff --git a/src/ht.erl b/src/ht.erl
index 7f52db0..4e0e4c5 100644
--- a/src/ht.erl
+++ b/src/ht.erl
@@ -29,7 +29,7 @@
-behaviour(gen_server).
-export([reset_tree/1, load_tree/1, size/0, leaf_hash/1]).
--export([add/1, root/0, root/1, path/2, consistency/2]).
+-export([add/1, root/0, root/1, path/2, consistency/2, dump/1]).
-export([start_link/0, start_link/1, stop/0]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
@@ -96,6 +96,8 @@ print_tree() ->
call(?MODULE, {print_tree, 4}).
print_tree(HashOutputLen) ->
call(?MODULE, {print_tree, HashOutputLen}).
+dump(Filename) ->
+ call(?MODULE, {dump, Filename}).
%% gen_server callbacks
init(Args) ->
@@ -141,7 +143,9 @@ handle_call({consistency, Version1, Version2}, _From, State) ->
handle_call(testing_get_state, _From, State) ->
{reply, State, State};
handle_call({print_tree, HashOutputLen}, _From, State) ->
- {reply, print_tree(State, HashOutputLen), State}.
+ {reply, print_tree(State, HashOutputLen), State};
+handle_call({dump, Filename}, _From, State) ->
+ {reply, dump(State, Filename), State}.
%%%%%%%%%%%%%%%%%%%%
%% Private.
@@ -444,6 +448,26 @@ print_layer(Tree, HashOutputLen, Layer, ILast) ->
lists:seq(0, ILast)),
io:format("~n").
+dump(Tree, Filename) ->
+ {ok, File} = file:open(Filename, [read, write, binary]),
+ ok = file:write(File, "# evaluated " ++ integer_to_list(Tree#tree.evaluated) ++ "\n"),
+ lists:foreach(fun (R) ->
+ write_layer_header(File, R),
+ lists:foreach(fun (I) ->
+ Entry = ts:retrieve(Tree#tree.store, {R, I}),
+ dumpone(File, Entry)
+ end, lists:seq(0, ts:count(Tree#tree.store, R) - 1))
+ end, lists:seq(0, depth(Tree) - 1)),
+ file:close(File).
+
+write_layer_header(File, Layer) ->
+ EntryText = "# layer " ++ integer_to_list(Layer) ++ "\n",
+ ok = file:write(File, EntryText).
+
+dumpone(File, Entry) ->
+ EntryText = hex:bin_to_hexstr(Entry) ++ "\n",
+ ok = file:write(File, EntryText).
+
%%%%%%%%%%%%%%%%%%%%
%% Testing ht.
diff --git a/src/http_auth.erl b/src/http_auth.erl
index 2cee51f..c8a8389 100644
--- a/src/http_auth.erl
+++ b/src/http_auth.erl
@@ -2,7 +2,7 @@
%%% See LICENSE for licensing information.
-module(http_auth).
--export([verify_auth/4, create_auth/3, init_key_table/0, sign_stored/1, verify_stored/3]).
+-export([verify_auth/4, create_auth/3, init_key_table/0, sign_stored/1, verify_stored/3, own_name/0]).
-define(KEY_TABLE, http_auth_keys).
@@ -25,6 +25,9 @@ read_key_table() ->
none
end.
+own_name() ->
+ {ok, {KeyName, _}} = application:get_env(plop, own_key),
+ KeyName.
own_key() ->
case application:get_env(plop, own_key, none) of
diff --git a/src/plop_compat.erl b/src/plop_compat.erl
index 9a98e3d..4d45590 100644
--- a/src/plop_compat.erl
+++ b/src/plop_compat.erl
@@ -2,14 +2,17 @@
%%% See LICENSE for licensing information.
-module(plop_compat).
--export([unpack_spki/1, timestamp/0]).
+-export([unpack_spki/1, timestamp/0, monotonic_time/1, start_timer/4]).
-include_lib("public_key/include/public_key.hrl").
unpack_spki(SPKI) ->
unpack_spki(erlang:system_info(otp_release), SPKI).
timestamp() ->
timestamp(erlang:system_info(otp_release)).
-
+monotonic_time(Unit) ->
+ monotonic_time(erlang:system_info(otp_release), Unit).
+start_timer(Time, Dest, Msg, Options) ->
+ start_timer(erlang:system_info(otp_release), Time, Dest, Msg, Options).
unpack_spki("R16" ++ _, SPKI) ->
#'SubjectPublicKeyInfo'{subjectPublicKey = {_, Octets},
@@ -37,3 +40,23 @@ timestamp("18") ->
erlang:timestamp();
timestamp("19") ->
erlang:timestamp().
+
+monotonic_time("R16" ++ _, millisecond) ->
+ {MeS, S, MiS} = timestamp(),
+ MeS * 1000000 + S * 1000 + MiS;
+monotonic_time("17", millisecond) ->
+ {MeS, S, MiS} = timestamp(),
+ MeS * 1000000 + S * 1000 + MiS;
+monotonic_time("18", Unit) ->
+ erlang:monotonic_time(Unit);
+monotonic_time("19", Unit) ->
+ erlang:monotonic_time(Unit).
+
+start_timer("R16" ++ _, Time, Dest, Msg, [{abs, true}]) ->
+ erlang:start_timer(max(0, Time - monotonic_time(millisecond)), Dest, Msg);
+start_timer("17", Time, Dest, Msg, [{abs, true}]) ->
+ erlang:start_timer(max(0, Time - monotonic_time(millisecond)), Dest, Msg);
+start_timer("18", Time, Dest, Msg, Options) ->
+ erlang:start_timer(Time, Dest, Msg, Options);
+start_timer("19", Time, Dest, Msg, Options) ->
+ erlang:start_timer(Time, Dest, Msg, Options).
diff --git a/src/plop_sup.erl b/src/plop_sup.erl
index 27f7680..15f3fe8 100644
--- a/src/plop_sup.erl
+++ b/src/plop_sup.erl
@@ -50,6 +50,7 @@ init([]) ->
Children = [permanent_worker(the_db, {db, start_link, []}, [db]),
permanent_worker(the_storagedb, {storagedb, start_link, []}),
permanent_worker(fsync, {fsyncport, start_link, []}),
+ permanent_worker(the_statusreport, {statusreport, start_link, []}),
permanent_worker(plopcontrol, {plopcontrol, start_link, []})],
OptionalChildren = lists:map(fun (ServiceName) ->
case ServiceName of
diff --git a/src/statusreport.erl b/src/statusreport.erl
new file mode 100644
index 0000000..db85b84
--- /dev/null
+++ b/src/statusreport.erl
@@ -0,0 +1,200 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(statusreport).
+-behaviour(gen_server).
+
+-export([start_link/0]).
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+-export([report/4]).
+-export([report_multi/4]).
+
+-record(state, {
+ timer :: none|reference(),
+ nodename :: string(),
+ statusreports :: dict:dict(),
+ lastsent :: integer()
+ }).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ ReportInterval = application:get_env(plop, status_report_interval, 1000),
+ lager:info("~p: starting", [?MODULE]),
+ HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000),
+ erlang:start_timer(HeartbeatInterval, self(), heartbeat),
+ {ok, #state{timer = none,
+ nodename = http_auth:own_name(),
+ statusreports = dict:new(),
+ lastsent = plop_compat:monotonic_time(millisecond) - ReportInterval}}.
+
+store_status(State, Service, Target, Variable, Status) ->
+ Statusreports = dict:store({Service, Target, Variable},
+ {single, Status},
+ State#state.statusreports),
+ State#state{statusreports = Statusreports}.
+
+dict_append_set(Key, Value, Dict) ->
+ AppendSet = sets:from_list([Value]),
+ dict:update(Key, fun ({multi, Old}) ->
+ {multi, sets:union(Old, AppendSet)}
+ end,
+ {multi, AppendSet}, Dict).
+
+store_multi_status(State, Service, Target, Variable, Status) ->
+ Statusreports = dict_append_set({Service, Target, Variable},
+ Status,
+ State#state.statusreports),
+ State#state{statusreports = Statusreports}.
+
+store_set_status(State, Service, Target, Variable, Statuses) ->
+ Statusreports = dict:store({Service, Target, Variable},
+ {multi, Statuses},
+ State#state.statusreports),
+ State#state{statusreports = Statusreports}.
+
+heartbeat(State) ->
+ {ok, ConfigVersion} = plopconfig:get_env(version),
+ RunningApps = [atom_to_list(App) ++ " " ++ Vsn || {App, _Desc, Vsn} <- application:which_applications()],
+
+ NewState1 = store_status(State, "heartbeat", "", "configversion", ConfigVersion),
+ NewState2 = store_set_status(NewState1, "heartbeat", "", "applications", sets:from_list(RunningApps)),
+ NewState2.
+
+handle_call(_, _From, State) ->
+ {noreply, State}.
+
+handle_cast({report, Service, Target, Variable, Status}, State) ->
+ NewState = store_status(State, Service, Target, Variable, Status),
+ {noreply, try_send(NewState)};
+handle_cast({report_multi, Service, Target, Variable, Status}, State) ->
+ NewState = store_multi_status(State, Service, Target, Variable, Status),
+ {noreply, try_send(NewState)}.
+
+handle_info({timeout, _Timer, force_send}, State) ->
+ lager:debug("statusreport timer timeout"),
+ {noreply, force_send(State)};
+
+handle_info({timeout, _Timer, heartbeat}, State) ->
+ lager:debug("statusreport timer timeout"),
+ HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000),
+ erlang:start_timer(HeartbeatInterval, self(), heartbeat),
+ NewState = heartbeat(State),
+ {noreply, try_send(NewState)}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+cancel_timer(State) ->
+ case State#state.timer of
+ none ->
+ none;
+ _ ->
+ erlang:cancel_timer(State#state.timer)
+ end,
+ State#state{timer = none}.
+
+set_timer(State) ->
+ case State#state.timer of
+ none ->
+ ReportInterval = application:get_env(plop, status_report_interval, 1000),
+ Timer = plop_compat:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]),
+ State#state{timer = Timer};
+ _ ->
+ State
+ end.
+
+terminate(Reason, State) ->
+ lager:info("~p terminating: ~p", [?MODULE, Reason]),
+ NewState = cancel_timer(State),
+ case Reason of
+ shutdown ->
+ force_send(NewState);
+ _ ->
+ none
+ end,
+ ok.
+
+
+
+group_by_service(Statusreports) ->
+ dict:to_list(
+ lists:foldl(
+ fun ({{Service, Target, Variable}, Status}, Acc) ->
+ dict:append(Service, {Target, Variable, Status}, Acc)
+ end, dict:new(), dict:to_list(Statusreports))).
+
+encode_one_status(Status) when is_number(Status) ->
+ Status;
+encode_one_status(Status) when is_list(Status) ->
+ list_to_binary(Status);
+encode_one_status(Status) when is_binary(Status) ->
+ Status;
+encode_one_status(null) ->
+ null.
+
+encode_status({single, Status}) ->
+ encode_one_status(Status);
+encode_status({multi, Statuses}) ->
+ lists:map(fun encode_one_status/1, sets:to_list(Statuses)).
+
+send(Service, Statusreports, Nodename) ->
+ lager:debug("reporting status to ~p: ~p", [Service, Statusreports]),
+ [NodeAddress] = plopconfig:get_env(statusservers, []),
+ DebugTag = "statusreport",
+ URL = NodeAddress ++ Service,
+ Headers = [{"Content-Type", "text/json"}],
+ RequestBody = list_to_binary(
+ mochijson2:encode(
+ [
+ {struct,
+ [{"target", list_to_binary(Target)},
+ {"source", list_to_binary(Nodename)},
+ {"key", list_to_binary(Variable)},
+ {"value", encode_status(Status)}]}
+ || {Target, Variable, Status} <- Statusreports
+ ])),
+ case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of
+ {error, Err} ->
+ lager:debug("request error ~p ~p", [DebugTag, Err]);
+ {failure, {none, StatusCode, none}, _RespHeaders, _Body} ->
+ lager:debug("request failure ~p ~p", [DebugTag, StatusCode]);
+ {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 ->
+ case (catch mochijson2:decode(Body)) of
+ {error, Err} ->
+ lager:debug("error returned ~p ~p", [DebugTag, Err]);
+ {struct, _PropList} ->
+ none
+ end
+ end.
+
+force_send(State) ->
+ lists:foreach(fun ({Service, Statusreports}) ->
+ send(Service, Statusreports, State#state.nodename)
+ end, group_by_service(State#state.statusreports)),
+ NewState = cancel_timer(State),
+ NewState#state{statusreports = dict:new(), lastsent = plop_compat:monotonic_time(millisecond)}.
+
+try_send(State) ->
+ ReportInterval = application:get_env(plop, status_report_interval, 1000),
+ NextSend = State#state.lastsent + ReportInterval,
+ Now = plop_compat:monotonic_time(millisecond),
+ if
+ NextSend > Now ->
+ lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]),
+ set_timer(State);
+ true ->
+ lager:debug("status report send long enough ago"),
+ force_send(State)
+ end.
+
+report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status); Status == null ->
+ lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]),
+ gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}).
+
+report_multi(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) ->
+ lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]),
+ gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}).
diff --git a/src/storage.erl b/src/storage.erl
index 7498635..6114a57 100644
--- a/src/storage.erl
+++ b/src/storage.erl
@@ -9,6 +9,18 @@
-define(APPURL_PLOP_STORAGE, "plop/v1/storage").
+reportnewentries() ->
+ NodeName = http_auth:own_name(),
+ {LastIndexOrZero, LastHash} = storagedb:lastverifiednewentry(),
+ VerifiedEntries = case LastHash of
+ none ->
+ 0;
+ _ ->
+ LastIndexOrZero + 1
+ end,
+ NewEntries = index:indexsize(newentries_db) - VerifiedEntries,
+ statusreport:report("storage", NodeName, "newentries", NewEntries).
+
request(post, ?APPURL_PLOP_STORAGE, "sendentry", Input) ->
case (catch mochijson2:decode(Input)) of
{error, E} ->
@@ -20,6 +32,7 @@ request(post, ?APPURL_PLOP_STORAGE, "sendentry", Input) ->
ok = db:add_entry_sync(TreeLeafHash, LogEntry),
ok = storagedb:add(TreeLeafHash),
{KeyName, Sig} = http_auth:sign_stored(plop:spt_data_from_entry(LogEntry)),
+ reportnewentries(),
success({[{result, <<"ok">>},
{"sig", KeyName ++ ":" ++ base64:encode_to_string(Sig)}
]})
@@ -55,6 +68,7 @@ request(get, ?APPURL_PLOP_STORAGE, "fetchnewentries", _Input) ->
Entries = lists:map(fun(LeafHash) ->
base64:encode(LeafHash)
end, NewHashes),
+ reportnewentries(),
success({[{result, <<"ok">>},
{entries, Entries}]});
request(get, ?APPURL_PLOP_STORAGE, "getentry", Query) ->
diff --git a/statusserver/ebin/statusserver.app b/statusserver/ebin/statusserver.app
new file mode 100644
index 0000000..10a1ba8
--- /dev/null
+++ b/statusserver/ebin/statusserver.app
@@ -0,0 +1,13 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+%%% Application resource file for statusserver, see app(5).
+
+{application, statusserver,
+ [{description, "Plop statusserver"},
+ {vsn, "1.0.1-alpha-dev"},
+ {modules, [statusserver_app, statusserver_sup, statusserver]},
+ {applications, [kernel, stdlib, lager, plop]},
+ {registered, [statusserver_sup, statusserver]},
+ {mod, {statusserver_app, []}}
+ ]}.
diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl
new file mode 100644
index 0000000..81fcd7a
--- /dev/null
+++ b/statusserver/src/statusserver.erl
@@ -0,0 +1,71 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(statusserver).
+-export([init_module/0]).
+%% API (URL)
+-export([request/4]).
+
+-define(APPURL_PLOP_STATUS, "plop/v1/status").
+
+request(post, ?APPURL_PLOP_STATUS, Service, Input) ->
+ case (catch mochijson2:decode(Input)) of
+ {error, E} ->
+ html("bad input:", E);
+ Entries when is_list(Entries) ->
+ lists:foreach(fun ({struct, PropList}) ->
+ Target = proplists:get_value(<<"target">>, PropList),
+ Source = proplists:get_value(<<"source">>, PropList),
+ Variable = proplists:get_value(<<"key">>, PropList),
+ Status = proplists:get_value(<<"value">>, PropList),
+ set_status(Service, Source, Target, Variable, Status)
+ end, Entries)
+ end,
+ success({[{result, <<"ok">>}]});
+request(get, "", "status", Input) ->
+ Now = plop_compat:monotonic_time(millisecond),
+ Variables = [{struct, [
+ {service, list_to_binary(Service)},
+ {source, Source},
+ {target, Target},
+ {variable, Variable},
+ {status, Status},
+ {age, (Now - Timestamp) / 1000}
+ ]} || {{Service, Source, Target, Variable}, Status, Timestamp} <- get_status()],
+ success({[{result, Variables}]}).
+
+
+success(Data) ->
+ {200, [{"Content-Type", "text/json"}, {"Access-Control-Allow-Origin", "*"}], mochijson2:encode(Data)}.
+
+html(Text, Input) ->
+ {400, [{"Content-Type", "text/html"}],
+ io_lib:format(
+ "<html><body><p>~n" ++
+ "~s~n" ++
+ "~p~n" ++
+ "</body></html>~n", [Text, Input])}.
+
+-define(STATUSDB_TABLE, statusserver_statusdb).
+
+init_module() ->
+ create_statusdb().
+
+create_statusdb() ->
+ case ets:info(?STATUSDB_TABLE) of
+ undefined ->
+ ok;
+ _ ->
+ ets:delete(?STATUSDB_TABLE)
+ end,
+ ets:new(?STATUSDB_TABLE, [set, public, named_table]).
+
+get_status() ->
+ [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')].
+
+set_status(Service, Source, Target, Variable, Status) ->
+ lager:info("status: ~p ~p ~p ~p ~p", [Service, Source, Target, Variable, Status]),
+ Timestamp = plop_compat:monotonic_time(millisecond),
+ true = ets:insert(?STATUSDB_TABLE,
+ {{Service, Source, Target, Variable}, Status, Timestamp}),
+ ok.
diff --git a/statusserver/src/statusserver_app.erl b/statusserver/src/statusserver_app.erl
new file mode 100644
index 0000000..2fd8b8d
--- /dev/null
+++ b/statusserver/src/statusserver_app.erl
@@ -0,0 +1,13 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(statusserver_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(normal, Args) ->
+ statusserver:init_module(),
+ statusserver_sup:start_link(Args).
+
+stop(_State) ->
+ ok.
diff --git a/statusserver/src/statusserver_sup.erl b/statusserver/src/statusserver_sup.erl
new file mode 100644
index 0000000..5b0811a
--- /dev/null
+++ b/statusserver/src/statusserver_sup.erl
@@ -0,0 +1,42 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(statusserver_sup).
+-behaviour(supervisor).
+
+-export([start_link/1, init/1]).
+
+start_link(_Args) ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+gen_http_config(Config, SSLOptions, SSLFlag) ->
+ {ChildName, IpAddress, Port, Module} = Config,
+ {ok, IPv4Address} = inet:parse_ipv4strict_address(IpAddress),
+ WebConfig = [{ip, IPv4Address},
+ {port, Port},
+ {ssl, SSLFlag},
+ {acceptor_pool_size,
+ application:get_env(catlfish, http_server_pool_size, 16)},
+ {ssl_opts, SSLOptions}
+ ],
+ {ChildName,
+ {catlfish_web, start, [WebConfig, Module, ChildName]},
+ permanent, 5000, worker, dynamic}.
+
+
+init([]) ->
+ SSLOptions =
+ [{certfile, application:get_env(plop, https_certfile, none)},
+ {keyfile, application:get_env(plop, https_keyfile, none)},
+ {cacertfile, application:get_env(plop, https_cacertfile, none)}],
+ Servers =
+ lists:map(fun (Config) ->
+ gen_http_config(Config, SSLOptions, true)
+ end, application:get_env(plop, https_servers, [])) ++
+ lists:map(fun (Config) ->
+ gen_http_config(Config, SSLOptions, false)
+ end, application:get_env(plop, http_servers, [])),
+ lager:debug("Starting servers ~p", [Servers]),
+ {ok,
+ {{one_for_one, 3, 10},
+ Servers}}.