From 829ab97fccb991832445862ec8246197a225ecec Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Fri, 27 Jan 2017 15:11:42 +0100 Subject: Parallelised merge, distribution phase. --- Emakefile | 7 +- Makefile | 2 +- README | 3 - merge/ebin/merge.app | 13 +++ merge/src/merge_app.erl | 12 +++ merge/src/merge_dist.erl | 191 +++++++++++++++++++++++++++++++++++++++++++ merge/src/merge_dist_sup.erl | 29 +++++++ merge/src/merge_sup.erl | 21 +++++ src/frontend.erl | 6 +- src/fsdb.erl | 6 +- src/perm.erl | 5 +- src/permdb.erl | 10 +-- src/plop_httputil.erl | 4 +- test/permdbtest.erl | 4 +- 14 files changed, 290 insertions(+), 23 deletions(-) create mode 100644 merge/ebin/merge.app create mode 100644 merge/src/merge_app.erl create mode 100644 merge/src/merge_dist.erl create mode 100644 merge/src/merge_dist_sup.erl create mode 100644 merge/src/merge_sup.erl diff --git a/Emakefile b/Emakefile index 0b2c07a..37b8c8d 100644 --- a/Emakefile +++ b/Emakefile @@ -1,6 +1,11 @@ %% erl -make (-*- erlang -*-) {["src/*"], [debug_info, - {i, "../"}, % For hackney. + {i, "../"}, % For hackney. {outdir, "ebin/"}, {parse_transform, lager_transform}]}. +{["merge/src/*"], + [debug_info, + {i, "../"}, % For hackney. + {outdir, "merge/ebin/"}, + {parse_transform, lager_transform}]}. diff --git a/Makefile b/Makefile index 55821c4..968c489 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ clean: -rm priv/fsynchelper -rm ebin/*.beam dialyze: build - dialyzer ebin + dialyzer ebin merge/ebin tags: etags src/*.[he]rl c_src/*.[ch] diff --git a/README b/README index d90daca..52523f7 100644 --- a/README +++ b/README @@ -1,9 +1,6 @@ plop is a public log based on a Merkle tree. It can be used for implementing Certificate Transparency (RFC 6962). -The first implementation is in Erlang. The only interface supported -initially is Erlang messages. - Requires Erlang/OTP 17 [erts-6.0] or later. Compile the application diff --git a/merge/ebin/merge.app b/merge/ebin/merge.app new file mode 100644 index 0000000..2dfbda7 --- /dev/null +++ b/merge/ebin/merge.app @@ -0,0 +1,13 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% Application resource file for merge, see app(5). + +{application, merge, + [{description, "Plop merge"}, + {vsn, "0.10.0-dev"}, + {modules, [merge_app, merge_dist, merge_dist_sup, merge_sup]}, + {applications, [kernel, stdlib, lager, plop]}, + {registered, [merge_dist, merge_dist_sup, merge_sup]}, + {mod, {merge_app, []}} + ]}. diff --git a/merge/src/merge_app.erl b/merge/src/merge_app.erl new file mode 100644 index 0000000..cd68d81 --- /dev/null +++ b/merge/src/merge_app.erl @@ -0,0 +1,12 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(normal, Args) -> + merge_sup:start_link(Args). + +stop(_State) -> + ok. diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl new file mode 100644 index 0000000..8d3dc2b --- /dev/null +++ b/merge/src/merge_dist.erl @@ -0,0 +1,191 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_dist). +-behaviour(gen_server). + +-export([start_link/1]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-record(state, { + timer :: reference(), + node_address :: string(), + sth_timestamp :: non_neg_integer() + }). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +init(Node) -> + lager:info("~p:~p: starting", [?MODULE, Node]), + Timer = erlang:start_timer(1000, self(), dist), + {ok, #state{timer = Timer, + node_address = Node, + sth_timestamp = 0}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}. +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({timeout, _Timer, dist}, State) -> + dist(plop:sth(), State). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(Reason, #state{timer = Timer}) -> + lager:info("~p terminating: ~p", [?MODULE, Reason]), + erlang:cancel_timer(Timer), + ok. + +%%%%%%%%%%%%%%%%%%%% + +dist(noentry, State) -> + Timer = erlang:start_timer(1000, self(), dist), + {noreply, State#state{timer = Timer}}; +dist({struct, PropList} = STH, + #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) -> + Treesize = proplists:get_value(<<"tree_size">>, PropList), + Timestamp = proplists:get_value(<<"timestamp">>, PropList), + RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), + Signature = base64:decode(proplists:get_value(<<"tree_head_signature">>, PropList)), + TS = case Timestamp > LastTimestamp of + true -> + true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature), + try + ok = do_dist(NodeAddress, min(Treesize, index:indexsize(logorder))), + ok = publish_sth(NodeAddress, STH), + lager:info("~p: Published STH with size ~B and timestamp " ++ + "~p.", [NodeAddress, Treesize, Timestamp]), + Timestamp + catch + throw:{request_error, SubErrType, DebugTag, Error} -> + lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error]), + LastTimestamp + end; + false -> + lager:debug("~p: STH timestamp ~p <= ~p, waiting.", + [NodeAddress, Timestamp, LastTimestamp]), + LastTimestamp + end, + Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 60)), + {noreply, + State#state{timer = erlang:start_timer(Wait * 1000, self(), dist), + sth_timestamp = TS}}. + +%% @doc Has nonlocal return because of throw further down in do_request/4. +do_dist(NodeAddress, Size) -> + {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), + true = Size >= VerifiedSize, + do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize). + +do_dist(_, _, 0) -> + ok; +do_dist(NodeAddress, Size, NTotal) -> + DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), + N = min(DistMaxWindow, NTotal), + Hashes = index:getrange(logorder, Size, Size + N - 1), + ok = frontend_sendlog(NodeAddress, Size, Hashes), + ok = frontend_send_missing_entries(NodeAddress, Hashes), + {ok, NewSize} = frontend_verify_entries(NodeAddress, Size + N), + lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Size]), + true = NTotal >= NewSize - Size, + do_dist(NodeAddress, NewSize, NTotal - (NewSize - Size)). + +frontend_sendlog(NodeAddress, Start, Hashes) -> + SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), + frontend_sendlog_chunk(NodeAddress, Start, lists:split(min(SendlogChunksize, length(Hashes)), Hashes), SendlogChunksize). + +frontend_sendlog_chunk(_, _, {[], _}, _) -> + ok; +frontend_sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> + ok = frontend_sendlog_request(NodeAddress, Start, Chunk), + frontend_sendlog_chunk(NodeAddress, Start + length(Chunk), + lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). + +frontend_sendlog_request(NodeAddress, Start, Hashes) -> + DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]), + URL = NodeAddress ++ "sendlog", + Headers = [{"Content-Type", "text/json"}], + EncodedHashes = [base64:encode(H) || H <- Hashes], + RequestBody = list_to_binary(mochijson2:encode({[{"start", Start}, + {"hashes", EncodedHashes}]})), + case do_request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, _} -> + ok; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +frontend_send_missing_entries(NodeAddress, Hashes) -> + SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), + {ChunkOfHashes, RestOfHashes} = lists:split(min(SendentriesChunksize, length(Hashes)), Hashes), + frontend_send_entries_chunk(NodeAddress, + {ChunkOfHashes, RestOfHashes}, + SendentriesChunksize). + +frontend_send_entries_chunk(_, {[], _}, _) -> + ok; +frontend_send_entries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> + HashesAndEntries = lists:zip(Chunk, [db:entry_for_leafhash(H) || H <- Chunk]), + ok = frontend_send_entries_request(NodeAddress, HashesAndEntries), + frontend_send_entries_chunk(NodeAddress, + lists:split(min(Chunksize, length(Rest)), Rest), + Chunksize). + +frontend_send_entries_request(NodeAddress, HashesAndEntries) -> + DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]), + URL = NodeAddress ++ "sendentry", + Headers = [{"Content-Type", "text/json"}], + L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]), + RequestBody = list_to_binary(L), + case do_request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, _} -> + ok; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +frontend_get_verifiedsize(NodeAddress) -> + frontend_verify_entries(NodeAddress, 0). + +frontend_verify_entries(NodeAddress, Size) -> + DebugTag = io_lib:format("verify-entries ~B", [Size]), + URL = NodeAddress ++ "verify-entries", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), + case do_request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, PropList} -> + {ok, proplists:get_value(<<"verified">>, PropList)}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +publish_sth(NodeAddress, STH) -> + DebugTag = "publish-sth", + URL = NodeAddress ++ "publish-sth", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode(STH)), + case do_request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, _} -> + ok; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +do_request(DebugTag, URL, Headers, RequestBody) -> + case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of + {error, Err} -> + throw({request_error, request, DebugTag, Err}); + {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> + throw({request_error, failure, DebugTag, StatusCode}); + {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> + case (catch mochijson2:decode(Body)) of + {error, Err} -> + throw({request_error, decode, DebugTag, Err}); + {struct, PropList} -> + {proplists:get_value(<<"result">>, PropList), PropList} + end + end. diff --git a/merge/src/merge_dist_sup.erl b/merge/src/merge_dist_sup.erl new file mode 100644 index 0000000..050ddc5 --- /dev/null +++ b/merge/src/merge_dist_sup.erl @@ -0,0 +1,29 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_dist_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link([]) -> + {ok, Nodes} = application:get_env(plop, frontend_nodes), + lager:info("starting merge dist for frontend nodes: ~p", [Nodes]), + {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []), + lists:map(fun(Node) -> + lager:debug("starting dist worker: ~p", [Node]), + {ok, Child} = + supervisor:start_child(?MODULE, [Node]), + Child + end, Nodes), + {ok, Pid}. + +init(_Args) -> + {ok, + {{simple_one_for_one, 3, 10}, + [ + {ignored, + {merge_dist, start_link, []}, + permanent, 10000, worker, + [merge_dist]} + ]}}. diff --git a/merge/src/merge_sup.erl b/merge/src/merge_sup.erl new file mode 100644 index 0000000..124fb12 --- /dev/null +++ b/merge/src/merge_sup.erl @@ -0,0 +1,21 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link(_Args) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, LogorderPath} = application:get_env(plop, index_path), + {ok, + {{one_for_one, 3, 10}, + [ + {the_logorder, {index, start_link, [logorder, LogorderPath]}, + permanent, 10000, worker, [index]}, + {merge_dist_sup, {merge_dist_sup, start_link, [[]]}, + transient, infinity, supervisor, [merge_dist_sup]} + ]}}. diff --git a/src/frontend.erl b/src/frontend.erl index e4d8e40..8a593b4 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -33,7 +33,7 @@ request(post, ?APPURL_PLOP_FRONTEND, "sendentry", Input) -> request(post, ?APPURL_PLOP_FRONTEND, "sendlog", Input) -> case (catch mochijson2:decode(Input)) of {error, E} -> - html("sendentry: bad input:", E); + html("sendlog: bad input:", E); {struct, PropList} -> Start = proplists:get_value(<<"start">>, PropList), Hashes = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"hashes">>, PropList)), @@ -142,7 +142,7 @@ request(post, ?APPURL_PLOP_MERGE, "sendentry", Input) -> request(post, ?APPURL_PLOP_MERGE, "sendlog", Input) -> case (catch mochijson2:decode(Input)) of {error, E} -> - html("sendentry: bad input:", E); + html("sendlog: bad input:", E); {struct, PropList} -> Start = proplists:get_value(<<"start">>, PropList), Hashes = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"hashes">>, PropList)), @@ -152,7 +152,7 @@ request(post, ?APPURL_PLOP_MERGE, "sendlog", Input) -> request(post, ?APPURL_PLOP_MERGE, "verifyroot", Input) -> case (catch mochijson2:decode(Input)) of {error, E} -> - html("sendentry: bad input:", E); + html("verifyroot: bad input:", E); {struct, PropList} -> OldSize = db:verifiedsize(), Treesize = proplists:get_value(<<"tree_size">>, PropList), diff --git a/src/fsdb.erl b/src/fsdb.erl index 4a85cdc..d644128 100644 --- a/src/fsdb.erl +++ b/src/fsdb.erl @@ -1,10 +1,10 @@ -%%% Copyright (c) 2014-2015, NORDUnet A/S. +%%% Copyright (c) 2014-2015,2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(fsdb). -behaviour(gen_server). --export([start_link/2, stop/1, init_module/0]). +-export([start_link/3, stop/1, init_module/0]). -export([getvalue/2, addvalue/3, commit/1, commit/2]). %% gen_server callbacks. @@ -24,7 +24,7 @@ init_module() -> end, ets:new(?DIRECTORY_TABLE, [set, public, named_table]). -start_link(Name, Filename) -> +start_link(Name, Filename, _Options) -> gen_server:start_link({local, Name}, ?MODULE, [Name, Filename], []). diff --git a/src/perm.erl b/src/perm.erl index 2e12fdf..e571d23 100644 --- a/src/perm.erl +++ b/src/perm.erl @@ -1,4 +1,4 @@ -%%% Copyright (c) 2015, NORDUnet A/S. +%%% Copyright (c) 2015,2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(perm). @@ -8,7 +8,8 @@ start_link(Name, Filename) -> Module = application:get_env(plop, db_backend, fsdb), - Module:start_link(Name, Filename). + Options = application:get_env(plop, db_backend_opt, []), + Module:start_link(Name, Filename, Options). stop(Name) -> Module = application:get_env(plop, db_backend, fsdb), diff --git a/src/permdb.erl b/src/permdb.erl index 0a1765e..faca986 100644 --- a/src/permdb.erl +++ b/src/permdb.erl @@ -1,11 +1,11 @@ -%%% Copyright (c) 2015-2016, NORDUnet A/S. +%%% Copyright (c) 2015-2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(permdb). -behaviour(gen_server). --export([start_link/2, start_link/3, stop/1, init_module/0]). +-export([start_link/3, stop/1, init_module/0]). -export([getvalue/2, addvalue/3, commit/1, commit/2, keyexists/2]). %% gen_server callbacks. @@ -61,10 +61,8 @@ init([Name, Filename, WriteFlag]) -> init_module() -> ok. -start_link(Name, Filename) -> - start_link(Name, Filename, write). - -start_link(Name, Filename, WriteFlag) -> +start_link(Name, Filename, Options) -> + WriteFlag = proplists:get_value(write_flag, Options, write), gen_server:start_link({local, Name}, ?MODULE, [Name, Filename, WriteFlag], []). diff --git a/src/plop_httputil.erl b/src/plop_httputil.erl index af4a5d1..b4188e7 100644 --- a/src/plop_httputil.erl +++ b/src/plop_httputil.erl @@ -64,14 +64,14 @@ read_and_verify_cacertfile(Filename) -> [KeyPem] = public_key:pem_decode(PemBin), {'Certificate', Der, _} = KeyPem, CalculatedHash = crypto:hash(sha256, Der), - CorrectHash = application:get_env(catlfish, https_cacert_fingerprint, none), + CorrectHash = application:get_env(plop, https_cacert_fingerprint, none), CorrectHash = CalculatedHash, Der. request(DebugTag, URL, Headers, RequestBody) -> Starttime = os:timestamp(), ParsedURL = hackney_url:parse_url(URL), - CACertFile = application:get_env(catlfish, https_cacertfile, none), + CACertFile = application:get_env(plop, https_cacertfile, none), CACert = read_and_verify_cacertfile(CACertFile), #hackney_url{path = Path, host = Host} = ParsedURL, lager:debug("~s: sending http request to ~p", diff --git a/test/permdbtest.erl b/test/permdbtest.erl index 5d0453b..e48158f 100755 --- a/test/permdbtest.erl +++ b/test/permdbtest.erl @@ -14,7 +14,7 @@ timeprint(Time) -> io_lib:format("~.2fs", [Time/1000000]). testinit(Filename) -> - permdb:start_link(testdb, Filename, write). + permdb:start_link(testdb, Filename, [{write_flag, write}]). teststop() -> permdb:stop(testdb). @@ -145,7 +145,7 @@ main([]) -> testinit(Filename), - permdb:start_link(testdb_ro, Filename, read), + permdb:start_link(testdb_ro, Filename, [{write_flag, read}]), testget(Filename, gentestdata(1+2+3+4), 99), testadd(Filename, gentestdata(1+2+3+4+5), 99), -- cgit v1.1