From 80c8ef847d996af04ec677a79555d640733641f2 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Sun, 19 Oct 2014 01:37:29 +0200 Subject: db:get_by_leaf_hash(): Return notfound instead of crashing when no entry could be found. db:get_by_entry_hash(): Don't fetch index, isn't used and might not exist. index:add(): Allow writes at exiting indicies. --- src/db.erl | 25 +++++++++++++++++------ src/index.erl | 64 +++++++++++++++++++++++++++++++++++------------------------ 2 files changed, 57 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/src/db.erl b/src/db.erl index 6315ae5..6fce8a3 100644 --- a/src/db.erl +++ b/src/db.erl @@ -103,7 +103,12 @@ entry_for_leafhash(LeafHash) -> perm:readfile(entry_root_path(), LeafHash). index_for_leafhash(LeafHash) -> - binary_to_integer(perm:readfile(indexforhash_root_path(), LeafHash)). + case perm:readfile(indexforhash_root_path(), LeafHash) of + noentry -> + noentry; + Index -> + binary_to_integer(Index) + end. leafhash_for_index(Index) -> index:get(index_path(), Index). @@ -148,9 +153,17 @@ handle_call({get_by_index, Index}, _From, State) -> {reply, R, State}; handle_call({get_by_leaf_hash, LeafHash}, _From, State) -> - Entry = entry_for_leafhash(LeafHash), - Index = index_for_leafhash(LeafHash), - R = {Index, LeafHash, Entry}, + R = case entry_for_leafhash(LeafHash) of + noentry -> + notfound; + Entry -> + case index_for_leafhash(LeafHash) of + noentry -> + notfound; + Index -> + {Index, LeafHash, Entry} + end + end, {reply, R, State}; handle_call({get_by_entry_hash, EntryHash}, _From, State) -> @@ -159,7 +172,7 @@ handle_call({get_by_entry_hash, EntryHash}, _From, State) -> notfound; LeafHash -> Entry = entry_for_leafhash(LeafHash), - Index = index_for_leafhash(LeafHash), - {Index, LeafHash, Entry} + %% Don't fetch index, isn't used and might not exist + {notfetched, LeafHash, Entry} end, {reply, R, State}. diff --git a/src/index.erl b/src/index.erl index 7871215..5169fbb 100644 --- a/src/index.erl +++ b/src/index.erl @@ -1,17 +1,18 @@ %%% Copyright (c) 2014, NORDUnet A/S. %%% See LICENSE for licensing information. -%% Implements an interface to a file pair (basename and basename.chksum) -%% that stores an ordered list of fixed-size entries. Entries can be -%% added at the end and are retrieved by index. The list can also be -%% truncated. +%% Implements an interface to a file pair (basename and +%% basename.chksum) that stores an ordered list of fixed-size entries. +%% Entries can be added at the end and are retrieved by index. Entries +%% can also be added at already existing indices, but then the +%% contents must be the same. %% -%% Writes(add, truncate, addlast) need to be serialized. +%% Writes(add, addlast) need to be serialized. %% TODO: Checksums -module(index). --export([get/2, add/3, addlast/2, truncate/2]). +-export([get/2, add/3, addlast/2]). -define(ENTRYSIZE, 32). -define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). @@ -21,27 +22,38 @@ add(Basepath, Index, Entry) when is_binary(Entry), size(Entry) == ?ENTRYSIZE -> case file:open(Basepath, [read, write, binary]) of {ok, File} -> {ok, Position} = file:position(File, eof), - case Index of - last when Position rem ?ENTRYSIZEINFILE == 0 -> - ok; - Index when is_integer(Index), - Index * ?ENTRYSIZEINFILE == Position -> - ok - end, + Mode = case Index of + last when Position rem ?ENTRYSIZEINFILE == 0 -> + write; + Index when is_integer(Index), + Index * ?ENTRYSIZEINFILE == Position -> + write; + Index when is_integer(Index), + Index * ?ENTRYSIZEINFILE < Position -> + read; + _ -> + util:exit_with_error(invalid, writefile, + "Index not valid") + end, EntryText = hex:bin_to_hexstr(Entry) ++ "\n", - ok = file:write(File, EntryText), - ok = file:close(File), - util:fsync([Basepath, filename:dirname(Basepath)]); - {error, Error} -> - util:exit_with_error(Error, writefile, - "Error opening file for writing") - end. - -truncate(Basepath, Index) -> - case file:open(Basepath, [read, write, binary]) of - {ok, File} -> - {ok, _Position} = file:position(File, Index * ?ENTRYSIZEINFILE), - ok = file:truncate(File), + case Mode of + write -> + ok = file:write(File, EntryText); + read -> + {ok, _Position} = + file:position(File, {bof, Index * ?ENTRYSIZEINFILE}), + {ok, OldEntryText} = file:read(File, ?ENTRYSIZEINFILE), + %% check that the written content is the same as + %% the old content + case binary_to_list(OldEntryText) of + EntryText -> + ok; + _ -> + util:exit_with_error(invalid, writefile, + "Written content not the" ++ + " same as old content") + end + end, ok = file:close(File), util:fsync([Basepath, filename:dirname(Basepath)]); {error, Error} -> -- cgit v1.1 From 088e4de4f1e2499f6cc0e332ae8cb34b935a6425 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 20 Oct 2014 12:22:50 +0200 Subject: Added HTTP API:s for external merge --- src/db.erl | 38 +++++++++++++++++++-- src/frontend.erl | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/storage.erl | 77 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 215 insertions(+), 2 deletions(-) create mode 100644 src/frontend.erl create mode 100644 src/storage.erl (limited to 'src') diff --git a/src/db.erl b/src/db.erl index 6fce8a3..413f4b9 100644 --- a/src/db.erl +++ b/src/db.erl @@ -6,8 +6,8 @@ %% API. -export([start_link/0, stop/0]). --export([add/4, size/0]). --export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1, get_by_entry_hash/1]). +-export([add/4, add/2, add_entryhash/2, add_index/2, set_treesize/1, size/0]). +-export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1, get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). @@ -34,6 +34,22 @@ stop() -> add(LeafHash, EntryHash, Data, Index) -> gen_server:call(?MODULE, {add, {LeafHash, EntryHash, Data, Index}}). +-spec add(binary(), binary()) -> ok. +add(LeafHash, Data) -> + gen_server:call(?MODULE, {add, {LeafHash, Data}}). + +-spec add_entryhash(binary(), binary()) -> ok. +add_entryhash(LeafHash, EntryHash) -> + gen_server:call(?MODULE, {add_entryhash, {LeafHash, EntryHash}}). + +-spec add_index(binary(), non_neg_integer()) -> ok. +add_index(LeafHash, Index) -> + gen_server:call(?MODULE, {add_index, {LeafHash, Index}}). + +-spec set_treesize(non_neg_integer()) -> ok. +set_treesize(Size) -> + gen_server:call(?MODULE, {set_treesize, Size}). + -spec get_by_indices(integer(), integer(), {sorted, true|false}) -> [{non_neg_integer(), binary(), binary()}]. get_by_indices(Start, End, {sorted, Sorted}) -> @@ -143,6 +159,24 @@ handle_call({add, {LeafHash, EntryHash, Data, Index}}, _From, State) -> ok = atomic:replacefile(treesize_path(), integer_to_list(Index+1)), {reply, ok, State}; +handle_call({add, {LeafHash, Data}}, _From, State) -> + ok = perm:ensurefile(entry_root_path(), LeafHash, Data), + {reply, ok, State}; + +handle_call({add_entryhash, {LeafHash, EntryHash}}, _From, State) -> + ok = perm:ensurefile(entryhash_root_path(), EntryHash, LeafHash), + {reply, ok, State}; + +handle_call({add_index, {LeafHash, Index}}, _From, State) -> + ok = perm:ensurefile(indexforhash_root_path(), + LeafHash, integer_to_binary(Index)), + ok = index:add(index_path(), Index, LeafHash), + {reply, ok, State}; + +handle_call({set_treesize, Size}, _From, State) -> + ok = atomic:replacefile(treesize_path(), integer_to_list(Size)), + {reply, ok, State}; + handle_call({get_by_indices, {Start, End, _Sorted}}, _From, State) -> {reply, get_by_indices_helper(Start, End), State}; diff --git a/src/frontend.erl b/src/frontend.erl new file mode 100644 index 0000000..8d0eccd --- /dev/null +++ b/src/frontend.erl @@ -0,0 +1,102 @@ +%%% Copyright (c) 2014, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% @doc Frontend node API + +-module(frontend). +%% API (URL) +-export([sendlog/3, missingentries/3, sendentry/3, sendsth/3, currentposition/3]). + +sendentry(SessionID, _Env, Input) -> + R = (catch case (catch jiffy:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {PropList} -> + LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), + TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + + ok = db:add(TreeLeafHash, LogEntry), + binary_to_list( + jiffy:encode( + {[{result, <<"ok">>}]})) + end), + deliver(SessionID, R). + +sendlog(SessionID, _Env, Input) -> + R = (catch case (catch jiffy:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {PropList} -> + Start = proplists:get_value(<<"start">>, PropList), + Hashes = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"hashes">>, PropList)), + + Indices = lists:seq(Start, Start + length(Hashes) - 1), + lists:foreach(fun ({Hash, Index}) -> + ok = db:add_index(Hash, Index) + end, lists:zip(Hashes, Indices)), + binary_to_list( + jiffy:encode( + {[{result, <<"ok">>}]})) + end), + deliver(SessionID, R). + +sendsth(SessionID, _Env, Input) -> + R = (catch case (catch jiffy:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {PropList} -> + Treesize = proplists:get_value(<<"tree_size">>, PropList), + + ok = db:set_treesize(Treesize), + + ht:reset_tree([db:size() - 1]), + + binary_to_list( + jiffy:encode( + {[{result, <<"ok">>}]})) + end), + deliver(SessionID, R). + +currentposition(SessionID, _Env, _Input) -> + Size = db:size(), + R = binary_to_list( + jiffy:encode( + {[{result, <<"ok">>}, {position, Size}]})), + deliver(SessionID, R). + +fetchmissingentries(Index) -> + lists:reverse(fetchmissingentries(Index, [])). + +fetchmissingentries(Index, Acc) -> + case db:leafhash_for_index(Index) of + noentry -> + Acc; + Hash -> + case db:entry_for_leafhash(Hash) of + noentry -> + fetchmissingentries(Index + 1, [Hash | Acc]); + _ -> + fetchmissingentries(Index + 1, Acc) + end + end. + +missingentries(SessionID, _Env, _Input) -> + Size = db:size(), + Missing = fetchmissingentries(Size), + R = binary_to_list( + jiffy:encode( + {[{result, <<"ok">>}, {entries, Missing}]})), + deliver(SessionID, R). + +%% Private functions. +html(Text, Input) -> + io_lib:format( + "Content-Type: text/html\r\n\r\n" ++ + "

~n" ++ + "~s~n" ++ + "~p~n" ++ + "~n", [Text, Input]). + +-spec deliver(any(), string()) -> ok | {error, _Reason}. +deliver(Session, Data) -> + mod_esi:deliver(Session, Data). diff --git a/src/storage.erl b/src/storage.erl new file mode 100644 index 0000000..243cc6c --- /dev/null +++ b/src/storage.erl @@ -0,0 +1,77 @@ +%%% Copyright (c) 2014, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% @doc Storage node API + +-module(storage). +%% API (URL) +-export([sendentry/3, entrycommitted/3, fetchnewentries/3]). + +newentries_path() -> + {ok, Value} = application:get_env(plop, newentries_path), + Value. + +sendentry(SessionID, _Env, Input) -> + R = (catch case (catch jiffy:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {PropList} -> + LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), + TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + + ok = db:add(TreeLeafHash, LogEntry), + ok = index:addlast(newentries_path(), TreeLeafHash), + binary_to_list( + jiffy:encode( + {[{result, <<"ok">>}]})) + end), + deliver(SessionID, R). + +entrycommitted(SessionID, _Env, Input) -> + R = (catch case (catch jiffy:decode(Input)) of + {error, E} -> + html("entrycommitted: bad input:", E); + {PropList} -> + EntryHash = base64:decode(proplists:get_value(<<"entryhash">>, PropList)), + LeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + ok = db:add_entryhash(LeafHash, EntryHash), + binary_to_list( + jiffy:encode( + {[{result, <<"ok">>}]})) + end), + deliver(SessionID, R). + +fetchnewhashes(Index) -> + lists:reverse(fetchnewhashes(Index, [])). + +fetchnewhashes(Index, Acc) -> + case index:get(newentries_path(), Index) of + noentry -> + Acc; + Entry -> + fetchnewhashes(Index + 1, [Entry | Acc]) + end. + +fetchnewentries(SessionID, _Env, _Input) -> + NewHashes = fetchnewhashes(0), + Entries = lists:map(fun(LeafHash) -> + {[{hash, base64:encode(LeafHash)}, + {entry, base64:encode(db:entry_for_leafhash(LeafHash))}]} + end, NewHashes), + R = (catch binary_to_list( + jiffy:encode( + {[{result, <<"ok">>}, {entries, Entries}]}))), + deliver(SessionID, R). + +%% Private functions. +html(Text, Input) -> + io_lib:format( + "Content-Type: text/html\r\n\r\n" ++ + "

~n" ++ + "~s~n" ++ + "~p~n" ++ + "~n", [Text, Input]). + +-spec deliver(any(), string()) -> ok | {error, _Reason}. +deliver(Session, Data) -> + mod_esi:deliver(Session, Data). -- cgit v1.1 From 653d3a1b047241ffda69cfc8601390591d63d295 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 20 Oct 2014 14:20:37 +0200 Subject: Make frontend send entries to storage nodes if storage_nodes configuration is set --- src/plop.erl | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 127 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/plop.erl b/src/plop.erl index 0b101be..07042aa 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -45,7 +45,10 @@ -record(state, {pubkey :: public_key:rsa_public_key(), privkey :: public_key:rsa_private_key(), - logid :: binary()}). + logid :: binary(), + http_requests, + own_requests + }). %%%%% moved from plop.hrl, maybe remove -define(PLOPVERSION, 0). @@ -70,6 +73,22 @@ start_link(Keyfile, Passphrase) -> stop() -> gen_server:call(?MODULE, stop). +add_http_request(Plop, RequestId, Data) -> + Plop#state{http_requests = dict:store(RequestId, Data, + Plop#state.http_requests)}. + +add_own_request(Plop, RequestId, Data) -> + Plop#state{own_requests = dict:store(RequestId, Data, + Plop#state.own_requests)}. + +remove_http_request(Plop, RequestId) -> + Plop#state{http_requests = dict:erase(RequestId, + Plop#state.http_requests)}. + +remove_own_request(Plop, RequestId) -> + Plop#state{own_requests = dict:erase(RequestId, + Plop#state.own_requests)}. + %%%%%%%%%%%%%%%%%%%% init([PrivKeyfile, PubKeyfile]) -> %% Read RSA keypair. @@ -81,11 +100,48 @@ init([PrivKeyfile, PubKeyfile]) -> _Tree = ht:reset_tree([db:size() - 1]), {ok, #state{pubkey = Public_key, privkey = Private_key, - logid = LogID}}. + logid = LogID, + http_requests = dict:new(), + own_requests = dict:new()}}. handle_cast(_Request, State) -> {noreply, State}. +handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}}, + StatusCode, Body) -> + {PropList} = (catch jiffy:decode(Body)), + Result = proplists:get_value(<<"result">>, PropList), + case dict:fetch(OwnRequestId, State#state.own_requests) of + undefined -> + {noreply, State}; + {storage_sendentry, {From, Completion, RepliesUntilQuorum}} + when Result == <<"ok">>, StatusCode == 200 -> + case RepliesUntilQuorum - 1 of + 0 -> + %% reached quorum + gen_server:reply(From, ok), + StateWithCompletion = Completion(State), + {noreply, remove_own_request(StateWithCompletion, + OwnRequestId)}; + NewRepliesUntilQuorum -> + {noreply, add_own_request(State, OwnRequestId, + {storage_sendentry, + {From, Completion, + NewRepliesUntilQuorum}})} + end + end. + +handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) -> + {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine, + case dict:fetch(RequestId, Plop#state.http_requests) of + undefined -> + {noreply, Plop}; + ignore -> + {noreply, Plop}; + HttpRequest -> + handle_http_reply(remove_http_request(Plop, RequestId), + HttpRequest, StatusCode, Body) + end; handle_info(_Info, State) -> {noreply, State}. @@ -131,6 +187,64 @@ get_logid() -> gen_server:call(?MODULE, {get, logid}). testing_get_pubkey() -> gen_server:call(?MODULE, {test, pubkey}). + +storage_nodes() -> + {ok, Value} = application:get_env(plop, storage_nodes, {ok, []}), + Value. + +storage_nodes_quorum() -> + {ok, Value} = application:get_env(plop, storage_nodes_quorum), + Value. + +send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) -> + Request = jiffy:encode( + {[{plop_version, 1}, + {entry, base64:encode(LogEntry)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), + httpc:request(post, {URLBase ++ "sendentry", [], + "text/json", Request}, + [], [{sync, false}]). + +send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> + Request = jiffy:encode( + {[{plop_version, 1}, + {entryhash, base64:encode(EntryHash)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), + httpc:request(post, {URLBase ++ "entrycommitted", [], + "text/json", Request}, + [], [{sync, false}]). + +store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> + OwnRequestId = make_ref(), + + Completion = + fun(CompletionState) -> + RequestIds = [send_storage_entrycommitted(URLBase, EntryHash, + TreeLeafHash) + || URLBase <- Nodes], + lists:foldl(fun({ok, RequestId}, StateAcc) -> + add_http_request(StateAcc, RequestId, + ignore) + end, CompletionState, RequestIds) + end, + + PlopWithOwn = add_own_request(State, OwnRequestId, + {storage_sendentry, + {From, Completion, + storage_nodes_quorum()}}), + + RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) + || URLBase <- Nodes], + PlopWithRequests = + lists:foldl(fun({ok, RequestId}, PlopAcc) -> + add_http_request(PlopAcc, RequestId, + {storage_sendentry_http, + {OwnRequestId}}) + end, PlopWithOwn, RequestIds), + PlopWithRequests. + %%%%%%%%%%%%%%%%%%%% handle_call(stop, _From, Plop) -> {stop, normal, stopped, Plop}; @@ -145,10 +259,17 @@ handle_call({get, logid}, _From, Plop = #state{logid = LogID}) -> {reply, LogID, Plop}; -handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, _From, Plop) -> - ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), - ok = ht:add(TreeLeafHash), - {reply, ok, Plop}; +handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) -> + case storage_nodes() of + [] -> + ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), + ok = ht:add(TreeLeafHash), + {reply, ok, Plop}; + Nodes -> + {noreply, + store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, + From, Plop)} + end; handle_call({sth, Data}, _From, Plop = #state{privkey = PrivKey}) -> -- cgit v1.1 From 729c7410504252d7c33e8fd2f43e662725186960 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 20 Oct 2014 19:00:39 +0200 Subject: Fix bug getting storage_nodes configuration variable --- src/plop.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src') diff --git a/src/plop.erl b/src/plop.erl index 07042aa..84d0920 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -189,8 +189,7 @@ testing_get_pubkey() -> gen_server:call(?MODULE, {test, pubkey}). storage_nodes() -> - {ok, Value} = application:get_env(plop, storage_nodes, {ok, []}), - Value. + application:get_env(plop, storage_nodes, []). storage_nodes_quorum() -> {ok, Value} = application:get_env(plop, storage_nodes_quorum), -- cgit v1.1 From b968cb1330ecb13f26e35d948c0511882b89ab2a Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Fri, 24 Oct 2014 15:32:58 +0200 Subject: Added lager for logging --- src/db.erl | 2 ++ src/fsyncport.erl | 7 +++++++ src/perm.erl | 11 +++++++++-- src/plop.erl | 4 ++++ src/storage.erl | 2 ++ 5 files changed, 24 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/db.erl b/src/db.erl index 413f4b9..fade7ce 100644 --- a/src/db.erl +++ b/src/db.erl @@ -160,7 +160,9 @@ handle_call({add, {LeafHash, EntryHash, Data, Index}}, _From, State) -> {reply, ok, State}; handle_call({add, {LeafHash, Data}}, _From, State) -> + lager:debug("add leafhash ~p", [LeafHash]), ok = perm:ensurefile(entry_root_path(), LeafHash, Data), + lager:debug("leafhash ~p added", [LeafHash]), {reply, ok, State}; handle_call({add_entryhash, {LeafHash, EntryHash}}, _From, State) -> diff --git a/src/fsyncport.erl b/src/fsyncport.erl index 7e2bf11..5084fdd 100644 --- a/src/fsyncport.erl +++ b/src/fsyncport.erl @@ -22,11 +22,13 @@ call_port(Msg) -> end. init(ExtPrg) -> + lager:debug("starting fsync service"), register(fsyncport, self()), process_flag(trap_exit, true), Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg}, [{packet, 2}]) end, lists:seq(1, 32)), + lager:debug("fsync service started", []), loop(Ports). loop(Ports) -> @@ -34,12 +36,14 @@ loop(Ports) -> loop(IdlePorts, BusyPorts, Waiting) -> receive {call, Caller, {fsync, Path}} -> + lager:debug("fsync incoming request: ~p", [Path]), case IdlePorts of [] -> loop(IdlePorts, BusyPorts, queue:in({Caller, Path}, Waiting)); [Port | Rest] -> + lager:debug("fsync port ~p assigned to request ~p", [Port, Path]), Port ! {self(), {command, Path}}, loop(Rest, dict:store(Port, {Caller, os:timestamp()}, BusyPorts), @@ -47,6 +51,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> end; {Port, {data, Data}} when is_port(Port) -> + lager:debug("fsync request finished: ~p", [Port]), {Caller, Starttime} = dict:fetch(Port, BusyPorts), Stoptime = os:timestamp(), statreport({fsync, Stoptime, Starttime}), @@ -65,6 +70,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> NewWaiting) end; stop -> + lager:debug("fsync stop request received"), lists:foreach(fun (Port) -> Port ! {self(), close} end, @@ -78,6 +84,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> exit(normal) %% XXX exits when first port is closed end; {'EXIT', Port, _Reason} when is_port(Port) -> + lager:debug("fsync port ~p exited, exiting", [Port]), %% XXX supervisor doesn't restart fsyncport, why? exit(port_terminated) end. diff --git a/src/perm.erl b/src/perm.erl index 466cc4f..c386d08 100644 --- a/src/perm.erl +++ b/src/perm.erl @@ -49,20 +49,27 @@ path_for_key(Rootdir, Key) -> -spec ensurefile(string(), binary(), binary()) -> ok | differ. ensurefile(Rootdir, Key, Content) -> + lager:debug("dir ~p key ~p", [Rootdir, Key]), {Dirs, Path} = path_for_key(Rootdir, Key), case readfile_and_verify(Path, Content) of ok -> - util:fsync([Path, Rootdir | Dirs]); + lager:debug("key ~p existed, fsync", [Key]), + util:fsync([Path, Rootdir | Dirs]), + lager:debug("key ~p fsynced", [Key]); differ -> + lager:debug("key ~p existed, was different", [Key]), differ; {error, enoent} -> + lager:debug("key ~p didn't exist, add", [Key]), util:check_error(make_dirs([Rootdir, Rootdir ++ "nursery/"] ++ Dirs), makedir, "Error creating directory"), NurseryName = Rootdir ++ "nursery/" ++ util:tempfilename(hex:bin_to_hexstr(Key)), util:write_tempfile_and_rename(Path, NurseryName, Content), - util:fsync([Path, Rootdir | Dirs]); + lager:debug("key ~p added, fsync", [Key]), + util:fsync([Path, Rootdir | Dirs]), + lager:debug("key ~p fsynced", [Key]); {error, Error} -> util:exit_with_error(Error, readfile, "Error reading file") end. diff --git a/src/plop.erl b/src/plop.erl index 84d0920..6e15781 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -109,6 +109,7 @@ handle_cast(_Request, State) -> handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}}, StatusCode, Body) -> + lager:debug("http_reply: ~p", [Body]), {PropList} = (catch jiffy:decode(Body)), Result = proplists:get_value(<<"result">>, PropList), case dict:fetch(OwnRequestId, State#state.own_requests) of @@ -216,6 +217,7 @@ send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> [], [{sync, false}]). store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> + lager:debug("leafhash ~p", [TreeLeafHash]), OwnRequestId = make_ref(), Completion = @@ -234,6 +236,7 @@ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> {From, Completion, storage_nodes_quorum()}}), + lager:debug("send requests to ~p", [Nodes]), RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) || URLBase <- Nodes], PlopWithRequests = @@ -259,6 +262,7 @@ handle_call({get, logid}, _From, {reply, LogID, Plop}; handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) -> + lager:debug("add leafhash ~p", [TreeLeafHash]), case storage_nodes() of [] -> ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), diff --git a/src/storage.erl b/src/storage.erl index 243cc6c..b966a12 100644 --- a/src/storage.erl +++ b/src/storage.erl @@ -12,6 +12,7 @@ newentries_path() -> Value. sendentry(SessionID, _Env, Input) -> + lager:debug("~p", [Input]), R = (catch case (catch jiffy:decode(Input)) of {error, E} -> html("sendentry: bad input:", E); @@ -25,6 +26,7 @@ sendentry(SessionID, _Env, Input) -> jiffy:encode( {[{result, <<"ok">>}]})) end), + lager:debug("result ~p", [R]), deliver(SessionID, R). entrycommitted(SessionID, _Env, Input) -> -- cgit v1.1 From 868a029e39ec8e9aa368da917146d088edee4d2f Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Sat, 25 Oct 2014 01:51:46 +0200 Subject: Move internal HTTP APIs to mochiweb. Stop using jiffy. --- src/frontend.erl | 135 +++++++++++++++++++++++++------------------------------ src/plop.erl | 19 ++++---- src/storage.erl | 89 ++++++++++++++++-------------------- 3 files changed, 109 insertions(+), 134 deletions(-) (limited to 'src') diff --git a/src/frontend.erl b/src/frontend.erl index 8d0eccd..a8a8b9e 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -5,64 +5,59 @@ -module(frontend). %% API (URL) --export([sendlog/3, missingentries/3, sendentry/3, sendsth/3, currentposition/3]). - -sendentry(SessionID, _Env, Input) -> - R = (catch case (catch jiffy:decode(Input)) of - {error, E} -> - html("sendentry: bad input:", E); - {PropList} -> - LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), - TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), - - ok = db:add(TreeLeafHash, LogEntry), - binary_to_list( - jiffy:encode( - {[{result, <<"ok">>}]})) - end), - deliver(SessionID, R). - -sendlog(SessionID, _Env, Input) -> - R = (catch case (catch jiffy:decode(Input)) of - {error, E} -> - html("sendentry: bad input:", E); - {PropList} -> - Start = proplists:get_value(<<"start">>, PropList), - Hashes = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"hashes">>, PropList)), - - Indices = lists:seq(Start, Start + length(Hashes) - 1), - lists:foreach(fun ({Hash, Index}) -> - ok = db:add_index(Hash, Index) - end, lists:zip(Hashes, Indices)), - binary_to_list( - jiffy:encode( - {[{result, <<"ok">>}]})) - end), - deliver(SessionID, R). - -sendsth(SessionID, _Env, Input) -> - R = (catch case (catch jiffy:decode(Input)) of - {error, E} -> - html("sendentry: bad input:", E); - {PropList} -> - Treesize = proplists:get_value(<<"tree_size">>, PropList), - - ok = db:set_treesize(Treesize), - - ht:reset_tree([db:size() - 1]), - - binary_to_list( - jiffy:encode( - {[{result, <<"ok">>}]})) - end), - deliver(SessionID, R). - -currentposition(SessionID, _Env, _Input) -> +-export([request/3]). + +request(post, "ct/frontend/sendentry", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {struct, PropList} -> + LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), + TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + + ok = db:add(TreeLeafHash, LogEntry), + success({[{result, <<"ok">>}]}) + end; + +request(post, "ct/frontend/sendlog", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {struct, PropList} -> + Start = proplists:get_value(<<"start">>, PropList), + Hashes = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"hashes">>, PropList)), + + Indices = lists:seq(Start, Start + length(Hashes) - 1), + lists:foreach(fun ({Hash, Index}) -> + ok = db:add_index(Hash, Index) + end, lists:zip(Hashes, Indices)), + success({[{result, <<"ok">>}]}) + end; + +request(post, "ct/frontend/sendsth", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {struct, PropList} -> + Treesize = proplists:get_value(<<"tree_size">>, PropList), + + ok = db:set_treesize(Treesize), + + ht:reset_tree([db:size() - 1]), + + success({[{result, <<"ok">>}]}) + end; + +request(get, "ct/frontend/currentposition", _Query) -> + Size = db:size(), + success({[{result, <<"ok">>}, + {position, Size}]}); + +request(get, "ct/frontend/missingentries", _Query) -> Size = db:size(), - R = binary_to_list( - jiffy:encode( - {[{result, <<"ok">>}, {position, Size}]})), - deliver(SessionID, R). + Missing = fetchmissingentries(Size), + success({[{result, <<"ok">>}, + {entries, Missing}]}). fetchmissingentries(Index) -> lists:reverse(fetchmissingentries(Index, [])). @@ -80,23 +75,15 @@ fetchmissingentries(Index, Acc) -> end end. -missingentries(SessionID, _Env, _Input) -> - Size = db:size(), - Missing = fetchmissingentries(Size), - R = binary_to_list( - jiffy:encode( - {[{result, <<"ok">>}, {entries, Missing}]})), - deliver(SessionID, R). %% Private functions. html(Text, Input) -> - io_lib:format( - "Content-Type: text/html\r\n\r\n" ++ - "

~n" ++ - "~s~n" ++ - "~p~n" ++ - "~n", [Text, Input]). - --spec deliver(any(), string()) -> ok | {error, _Reason}. -deliver(Session, Data) -> - mod_esi:deliver(Session, Data). + {400, [{"Content-Type", "text/html"}], + io_lib:format( + "

~n" ++ + "~s~n" ++ + "~p~n" ++ + "~n", [Text, Input])}. + +success(Data) -> + {200, [{"Content-Type", "text/json"}], mochijson2:encode(Data)}. diff --git a/src/plop.erl b/src/plop.erl index 6e15781..0523613 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -110,7 +110,7 @@ handle_cast(_Request, State) -> handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}}, StatusCode, Body) -> lager:debug("http_reply: ~p", [Body]), - {PropList} = (catch jiffy:decode(Body)), + {struct, PropList} = mochijson2:decode(Body), Result = proplists:get_value(<<"result">>, PropList), case dict:fetch(OwnRequestId, State#state.own_requests) of undefined -> @@ -197,23 +197,24 @@ storage_nodes_quorum() -> Value. send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) -> - Request = jiffy:encode( + Request = mochijson2:encode( {[{plop_version, 1}, {entry, base64:encode(LogEntry)}, {treeleafhash, base64:encode(TreeLeafHash)} ]}), + lager:debug("send sendentry to storage node ~p: ~p", [URLBase, Request]), httpc:request(post, {URLBase ++ "sendentry", [], - "text/json", Request}, + "text/json", list_to_binary(Request)}, [], [{sync, false}]). send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> - Request = jiffy:encode( - {[{plop_version, 1}, - {entryhash, base64:encode(EntryHash)}, - {treeleafhash, base64:encode(TreeLeafHash)} - ]}), + Request = mochijson2:encode( + {[{plop_version, 1}, + {entryhash, base64:encode(EntryHash)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), httpc:request(post, {URLBase ++ "entrycommitted", [], - "text/json", Request}, + "text/json", list_to_binary(Request)}, [], [{sync, false}]). store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> diff --git a/src/storage.erl b/src/storage.erl index b966a12..e09acdb 100644 --- a/src/storage.erl +++ b/src/storage.erl @@ -5,43 +5,42 @@ -module(storage). %% API (URL) --export([sendentry/3, entrycommitted/3, fetchnewentries/3]). +-export([request/3]). newentries_path() -> {ok, Value} = application:get_env(plop, newentries_path), Value. -sendentry(SessionID, _Env, Input) -> - lager:debug("~p", [Input]), - R = (catch case (catch jiffy:decode(Input)) of - {error, E} -> - html("sendentry: bad input:", E); - {PropList} -> - LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), - TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), +request(post, "ct/storage/sendentry", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {struct, PropList} -> + LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), + TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), - ok = db:add(TreeLeafHash, LogEntry), - ok = index:addlast(newentries_path(), TreeLeafHash), - binary_to_list( - jiffy:encode( - {[{result, <<"ok">>}]})) - end), - lager:debug("result ~p", [R]), - deliver(SessionID, R). - -entrycommitted(SessionID, _Env, Input) -> - R = (catch case (catch jiffy:decode(Input)) of - {error, E} -> - html("entrycommitted: bad input:", E); - {PropList} -> - EntryHash = base64:decode(proplists:get_value(<<"entryhash">>, PropList)), - LeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), - ok = db:add_entryhash(LeafHash, EntryHash), - binary_to_list( - jiffy:encode( - {[{result, <<"ok">>}]})) - end), - deliver(SessionID, R). + ok = db:add(TreeLeafHash, LogEntry), + ok = index:addlast(newentries_path(), TreeLeafHash), + success({[{result, <<"ok">>}]}) + end; +request(post, "ct/storage/entrycommitted", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("entrycommitted: bad input:", E); + {struct, PropList} -> + EntryHash = base64:decode(proplists:get_value(<<"entryhash">>, PropList)), + LeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + ok = db:add_entryhash(LeafHash, EntryHash), + success({[{result, <<"ok">>}]}) + end; +request(get, "ct/storage/fetchnewentries", _Input) -> + NewHashes = fetchnewhashes(0), + Entries = lists:map(fun(LeafHash) -> + {[{hash, base64:encode(LeafHash)}, + {entry, base64:encode(db:entry_for_leafhash(LeafHash))}]} + end, NewHashes), + success({[{result, <<"ok">>}, + {entries, Entries}]}). fetchnewhashes(Index) -> lists:reverse(fetchnewhashes(Index, [])). @@ -54,26 +53,14 @@ fetchnewhashes(Index, Acc) -> fetchnewhashes(Index + 1, [Entry | Acc]) end. -fetchnewentries(SessionID, _Env, _Input) -> - NewHashes = fetchnewhashes(0), - Entries = lists:map(fun(LeafHash) -> - {[{hash, base64:encode(LeafHash)}, - {entry, base64:encode(db:entry_for_leafhash(LeafHash))}]} - end, NewHashes), - R = (catch binary_to_list( - jiffy:encode( - {[{result, <<"ok">>}, {entries, Entries}]}))), - deliver(SessionID, R). - %% Private functions. html(Text, Input) -> - io_lib:format( - "Content-Type: text/html\r\n\r\n" ++ - "

~n" ++ - "~s~n" ++ - "~p~n" ++ - "~n", [Text, Input]). + {400, [{"Content-Type", "text/html"}], + io_lib:format( + "

~n" ++ + "~s~n" ++ + "~p~n" ++ + "~n", [Text, Input])}. --spec deliver(any(), string()) -> ok | {error, _Reason}. -deliver(Session, Data) -> - mod_esi:deliver(Session, Data). +success(Data) -> + {200, [{"Content-Type", "text/json"}], mochijson2:encode(Data)}. -- cgit v1.1 From 2483f0cf09ccc4cf73558c7a85bbb51a72d29c3a Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Sat, 25 Oct 2014 15:22:09 +0200 Subject: Optimize db:get_by_indices by not fetching entry and implementing index:getrange --- src/db.erl | 12 +++++++----- src/index.erl | 41 ++++++++++++++++++++++++----------------- src/plop.erl | 7 ++++++- 3 files changed, 37 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/db.erl b/src/db.erl index fade7ce..f7c2057 100644 --- a/src/db.erl +++ b/src/db.erl @@ -129,6 +129,9 @@ index_for_leafhash(LeafHash) -> leafhash_for_index(Index) -> index:get(index_path(), Index). +leafhash_for_indices(Start, End) -> + index:getrange(index_path(), Start, End). + leafhash_for_entryhash(EntryHash) -> perm:readfile(entryhash_root_path(), EntryHash). @@ -138,11 +141,10 @@ get_by_indices_helper(Start, End) -> EndBound = min(End, size() - 1), case Start =< EndBound of true -> - lists:map(fun (Index) -> - LeafHash = leafhash_for_index(Index), - Entry = entry_for_leafhash(LeafHash), - {Index, LeafHash, Entry} - end, lists:seq(Start, EndBound)); + lists:map(fun ({LeafHash, Index}) -> + {Index, LeafHash, notfetched} + end, lists:zip(leafhash_for_indices(Start, EndBound), + lists:seq(Start, EndBound))); false -> [] end. diff --git a/src/index.erl b/src/index.erl index 5169fbb..bbc9a10 100644 --- a/src/index.erl +++ b/src/index.erl @@ -12,7 +12,7 @@ %% TODO: Checksums -module(index). --export([get/2, add/3, addlast/2]). +-export([get/2, getrange/3, add/3, addlast/2]). -define(ENTRYSIZE, 32). -define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). @@ -66,31 +66,38 @@ add(Basepath, Index, Entry) when is_binary(Entry), size(Entry) == ?ENTRYSIZE -> addlast(Basepath, Entry) -> add(Basepath, last, Entry). -%% From lib/stdlib/src/lists.erl. For supporting < R17. --spec droplast(nonempty_list()) -> list(). -droplast([_T]) -> []; -droplast([H|T]) -> [H|droplast(T)]. +decodedata(Binary) -> + lists:reverse(decodedata(Binary, [])). -decodedata(EntryText) when length(EntryText) == ?ENTRYSIZEINFILE -> - case [lists:last(EntryText)] of - "\n" -> - hex:hexstr_to_bin(droplast(EntryText)); - _ -> - util:exit_with_error(badformat, readindex, - "Index line not ending with linefeed") - end. +decodedata(<<>>, Acc) -> + Acc; +decodedata(<>, Acc) -> + decodedata(Rest, [mochihex:to_bin(binary_to_list(Entry)) | Acc]); +decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) -> + util:exit_with_error(badformat, readindex, + "Index line not ending with linefeed"). -spec get(string(), integer()) -> binary(). get(Basepath, Index) -> + case getrange(Basepath, Index, Index) of + noentry -> + noentry; + [Entry] -> + Entry + end. + +-spec getrange(string(), integer(), integer()) -> [binary()]. +getrange(Basepath, Start, End) when Start =< End -> case file:open(Basepath, [read, binary]) of {ok, File} -> {ok, Filesize} = file:position(File, eof), if - Index * ?ENTRYSIZEINFILE + ?ENTRYSIZEINFILE =< Filesize -> + End * ?ENTRYSIZEINFILE + ?ENTRYSIZEINFILE =< Filesize -> {ok, _Position} = file:position(File, - Index * ?ENTRYSIZEINFILE), - {ok, EntryText} = file:read(File, ?ENTRYSIZEINFILE), - Entry = decodedata(binary_to_list(EntryText)), + Start * ?ENTRYSIZEINFILE), + {ok, EntryText} = + file:read(File, ?ENTRYSIZEINFILE * (End - Start + 1)), + Entry = decodedata(EntryText), file:close(File), Entry; true -> diff --git a/src/plop.erl b/src/plop.erl index 0523613..0c85b21 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -248,12 +248,17 @@ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> end, PlopWithOwn, RequestIds), PlopWithRequests. +fill_in_entry({_Index, LeafHash, notfetched}) -> + db:get_by_leaf_hash(LeafHash). + %%%%%%%%%%%%%%%%%%%% handle_call(stop, _From, Plop) -> {stop, normal, stopped, Plop}; handle_call({get, {index, Start, End}}, _From, Plop) -> - {reply, db:get_by_indices(Start, End, {sorted, false}), Plop}; + {reply, lists:map(fun (E) -> fill_in_entry(E) end, + db:get_by_indices(Start, End, {sorted, false})), + Plop}; handle_call({get, {hash, EntryHash}}, _From, Plop) -> {reply, db:get_by_entry_hash(EntryHash), Plop}; -- cgit v1.1 From ebc9d5bac1a69ba25044a73674b8e9ea18217f60 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Sat, 25 Oct 2014 23:56:41 +0200 Subject: Optimize fetchnewentries --- src/index.erl | 11 +++++++++++ src/storage.erl | 11 ++--------- 2 files changed, 13 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/index.erl b/src/index.erl index bbc9a10..c1e0352 100644 --- a/src/index.erl +++ b/src/index.erl @@ -77,6 +77,17 @@ decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) -> util:exit_with_error(badformat, readindex, "Index line not ending with linefeed"). +-spec size(string()) -> integer(). +size(Basepath) -> + case file:open(Basepath, [read, binary]) of + {ok, File} -> + {ok, Filesize} = file:position(File, eof), + Filesize mod ?ENTRYSIZEINFILE; + {error, Error} -> + util:exit_with_error(Error, readfile, + "Error opening file for reading") + end. + -spec get(string(), integer()) -> binary(). get(Basepath, Index) -> case getrange(Basepath, Index, Index) of diff --git a/src/storage.erl b/src/storage.erl index e09acdb..df6641a 100644 --- a/src/storage.erl +++ b/src/storage.erl @@ -43,15 +43,8 @@ request(get, "ct/storage/fetchnewentries", _Input) -> {entries, Entries}]}). fetchnewhashes(Index) -> - lists:reverse(fetchnewhashes(Index, [])). - -fetchnewhashes(Index, Acc) -> - case index:get(newentries_path(), Index) of - noentry -> - Acc; - Entry -> - fetchnewhashes(Index + 1, [Entry | Acc]) - end. + Size = index:size(newentries_path()), + index:getrange(newentries_path(), Index, Size - 1). %% Private functions. html(Text, Input) -> -- cgit v1.1 From dc8952f6fefc91e21bacf125f5414edf0d35db55 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 27 Oct 2014 01:15:20 +0100 Subject: Correct function specifications. --- src/atomic.erl | 2 +- src/perm.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/atomic.erl b/src/atomic.erl index 5ad48ba..36fba81 100644 --- a/src/atomic.erl +++ b/src/atomic.erl @@ -10,7 +10,7 @@ replacefile(Path, Content) -> util:write_tempfile_and_rename(Path, TempName, Content), util:fsync([Path, filename:dirname(Path)]). --spec readfile(string()) -> binary(). +-spec readfile(string()) -> binary() | noentry. readfile(Path) -> case file:read_file(Path) of {ok, Contents} -> diff --git a/src/perm.erl b/src/perm.erl index c386d08..9f02b55 100644 --- a/src/perm.erl +++ b/src/perm.erl @@ -74,7 +74,7 @@ ensurefile(Rootdir, Key, Content) -> util:exit_with_error(Error, readfile, "Error reading file") end. --spec readfile(string(), binary()) -> binary(). +-spec readfile(string(), binary()) -> binary() | noentry. readfile(Rootdir, Key) -> {_Dirs, Path} = path_for_key(Rootdir, Key), atomic:readfile(Path). -- cgit v1.1 From 80ea2ac6af8f993888444a4f75bbcc976ddd3973 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 27 Oct 2014 01:24:09 +0100 Subject: Parallel fsync --- src/fsyncport.erl | 22 +++++++++++++++++++++- src/util.erl | 16 +++++++--------- 2 files changed, 28 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/fsyncport.erl b/src/fsyncport.erl index 5084fdd..c9be44d 100644 --- a/src/fsyncport.erl +++ b/src/fsyncport.erl @@ -3,7 +3,7 @@ -module(fsyncport). -export([start_link/0, stop/0, init/1]). --export([fsync/1]). +-export([fsync/1, fsyncall/1]). start_link() -> Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]), @@ -14,6 +14,9 @@ stop() -> fsync(Path) -> call_port({fsync, Path}). +fsyncall(Paths) -> + call_port_multi([{fsync, Path} || Path <- Paths]). + call_port(Msg) -> fsyncport ! {call, self(), Msg}, receive @@ -21,6 +24,23 @@ call_port(Msg) -> Result end. +call_port_multi(Msgs) -> + lists:foreach(fun (Msg) -> + fsyncport ! {call, self(), Msg} + end, Msgs), + lists:foldl(fun (_Msg, Acc) -> + R = receive + {fsyncport, Result} -> + Result + end, + case R of + ok -> + Acc; + Error -> + Error + end + end, ok, Msgs). + init(ExtPrg) -> lager:debug("starting fsync service"), register(fsyncport, self()), diff --git a/src/util.erl b/src/util.erl index dd42752..435dbc8 100644 --- a/src/util.erl +++ b/src/util.erl @@ -13,15 +13,13 @@ tempfilename(Base) -> Filename. -spec fsync([string()]) -> ok. -fsync([]) -> - ok; -fsync([Name | Rest]) -> - case fsyncport:fsync(Name) of - ok -> - fsync(Rest); - {error, Error} -> - exit_with_error(fsync, Error, "Error in fsync") - end. +fsync(Paths) -> + case fsyncport:fsyncall(Paths) of + ok -> + ok; + {error, Error} -> + exit_with_error(fsync, Error, "Error in fsync") + end. -spec exit_with_error(atom(), atom(), string()) -> no_return(). exit_with_error(Operation, Error, ErrorMessage) -> -- cgit v1.1 From fb3b9591cc81158824db13818cf6320d5f4a0f7b Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 27 Oct 2014 01:28:32 +0100 Subject: Fix mistake in ebc9d5ba (Optimize fetchnewentries) --- src/index.erl | 13 ++++++++----- src/storage.erl | 8 ++++++-- 2 files changed, 14 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/index.erl b/src/index.erl index c1e0352..96195e3 100644 --- a/src/index.erl +++ b/src/index.erl @@ -12,7 +12,7 @@ %% TODO: Checksums -module(index). --export([get/2, getrange/3, add/3, addlast/2]). +-export([get/2, getrange/3, add/3, addlast/2, indexsize/1]). -define(ENTRYSIZE, 32). -define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). @@ -77,18 +77,19 @@ decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) -> util:exit_with_error(badformat, readindex, "Index line not ending with linefeed"). --spec size(string()) -> integer(). -size(Basepath) -> +-spec indexsize(string()) -> integer(). +indexsize(Basepath) -> case file:open(Basepath, [read, binary]) of {ok, File} -> {ok, Filesize} = file:position(File, eof), - Filesize mod ?ENTRYSIZEINFILE; + lager:debug("file ~p size ~p", [Basepath, Filesize]), + Filesize div ?ENTRYSIZEINFILE; {error, Error} -> util:exit_with_error(Error, readfile, "Error opening file for reading") end. --spec get(string(), integer()) -> binary(). +-spec get(string(), integer()) -> binary() | noentry. get(Basepath, Index) -> case getrange(Basepath, Index, Index) of noentry -> @@ -99,6 +100,7 @@ get(Basepath, Index) -> -spec getrange(string(), integer(), integer()) -> [binary()]. getrange(Basepath, Start, End) when Start =< End -> + lager:debug("path ~p start ~p end ~p", [Basepath, Start, End]), case file:open(Basepath, [read, binary]) of {ok, File} -> {ok, Filesize} = file:position(File, eof), @@ -109,6 +111,7 @@ getrange(Basepath, Start, End) when Start =< End -> {ok, EntryText} = file:read(File, ?ENTRYSIZEINFILE * (End - Start + 1)), Entry = decodedata(EntryText), + lager:debug("entries ~p", [length(Entry)]), file:close(File), Entry; true -> diff --git a/src/storage.erl b/src/storage.erl index df6641a..8136308 100644 --- a/src/storage.erl +++ b/src/storage.erl @@ -43,8 +43,12 @@ request(get, "ct/storage/fetchnewentries", _Input) -> {entries, Entries}]}). fetchnewhashes(Index) -> - Size = index:size(newentries_path()), - index:getrange(newentries_path(), Index, Size - 1). + case index:indexsize(newentries_path()) of + 0 -> + []; + Size -> + index:getrange(newentries_path(), Index, Size - 1) + end. %% Private functions. html(Text, Input) -> -- cgit v1.1 From 79caa8decbb21228cf3f5cbe32fbf972c40e70dc Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 27 Oct 2014 01:30:15 +0100 Subject: Check that entries are actually present when receiving new STH from merge nodes --- src/db.erl | 9 ++++++--- src/frontend.erl | 55 ++++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/db.erl b/src/db.erl index f7c2057..2a64935 100644 --- a/src/db.erl +++ b/src/db.erl @@ -7,7 +7,7 @@ %% API. -export([start_link/0, stop/0]). -export([add/4, add/2, add_entryhash/2, add_index/2, set_treesize/1, size/0]). --export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1, get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1]). +-export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1, get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1, leafhash_for_indices/2, indexsize/0]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). @@ -18,6 +18,9 @@ size() -> binary_to_integer(atomic:readfile(treesize_path())). +indexsize() -> + index:indexsize(index_path()). + init(_Args) -> {ok, []}. @@ -158,7 +161,7 @@ handle_call({add, {LeafHash, EntryHash, Data, Index}}, _From, State) -> ok = perm:ensurefile(indexforhash_root_path(), LeafHash, integer_to_binary(Index)), ok = index:add(index_path(), Index, LeafHash), - ok = atomic:replacefile(treesize_path(), integer_to_list(Index+1)), + ok = atomic:replacefile(treesize_path(), integer_to_binary(Index+1)), {reply, ok, State}; handle_call({add, {LeafHash, Data}}, _From, State) -> @@ -178,7 +181,7 @@ handle_call({add_index, {LeafHash, Index}}, _From, State) -> {reply, ok, State}; handle_call({set_treesize, Size}, _From, State) -> - ok = atomic:replacefile(treesize_path(), integer_to_list(Size)), + ok = atomic:replacefile(treesize_path(), integer_to_binary(Size)), {reply, ok, State}; handle_call({get_by_indices, {Start, End, _Sorted}}, _From, State) -> diff --git a/src/frontend.erl b/src/frontend.erl index a8a8b9e..9c69517 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -39,13 +39,33 @@ request(post, "ct/frontend/sendsth", Input) -> {error, E} -> html("sendentry: bad input:", E); {struct, PropList} -> + OldSize = db:size(), Treesize = proplists:get_value(<<"tree_size">>, PropList), - - ok = db:set_treesize(Treesize), - - ht:reset_tree([db:size() - 1]), - - success({[{result, <<"ok">>}]}) + Indexsize = db:indexsize(), + + if + Treesize < OldSize -> + html("Size is older than current size", OldSize); + Treesize == OldSize -> + success({[{result, <<"ok">>}]}); + Treesize > Indexsize -> + html("Has too few entries", Indexsize); + true -> + NewEntries = db:leafhash_for_indices(OldSize, Treesize - 1), + lager:debug("old size: ~p new size: ~p entries: ~p", + [OldSize, Treesize, NewEntries]), + + Errors = check_entries(NewEntries, OldSize, Treesize - 1), + + case Errors of + [] -> + ok = db:set_treesize(Treesize), + ht:reset_tree([db:size() - 1]), + success({[{result, <<"ok">>}]}); + _ -> + html("Database not complete", Errors) + end + end end; request(get, "ct/frontend/currentposition", _Query) -> @@ -56,19 +76,40 @@ request(get, "ct/frontend/currentposition", _Query) -> request(get, "ct/frontend/missingentries", _Query) -> Size = db:size(), Missing = fetchmissingentries(Size), + lager:debug("missingentries: ~p", [Missing]), success({[{result, <<"ok">>}, - {entries, Missing}]}). + {entries, lists:map(fun (Entry) -> base64:encode(Entry) end, + Missing)}]}). +check_entries(Entries, Start, End) -> + lists:foldl(fun ({Hash, Index}, Acc) -> + case check_entry(Hash, Index) of + ok -> + Acc; + Error -> + [Error | Acc] + end + end, [], lists:zip(Entries, lists:seq(Start, End))). + +check_entry(Hash, Index) -> + case db:get_by_leaf_hash(Hash) of + notfound -> + {notfound, Index}; + _ -> + ok + end. fetchmissingentries(Index) -> lists:reverse(fetchmissingentries(Index, [])). fetchmissingentries(Index, Acc) -> + lager:debug("index ~p", [Index]), case db:leafhash_for_index(Index) of noentry -> Acc; Hash -> case db:entry_for_leafhash(Hash) of noentry -> + lager:debug("didn't find hash ~p", [Hash]), fetchmissingentries(Index + 1, [Hash | Acc]); _ -> fetchmissingentries(Index + 1, Acc) -- cgit v1.1 From 9e97754f5e005f80c64b7889c280f78b63d47b5b Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 27 Oct 2014 01:32:24 +0100 Subject: Optimize ts by storing tree in array of arrays. --- src/ts.erl | 51 ++++++++++++++++++++++++--------------------------- 1 file changed, 24 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/ts.erl b/src/ts.erl index cf71ff5..07edbf6 100644 --- a/src/ts.erl +++ b/src/ts.erl @@ -9,56 +9,53 @@ -export_type([tree_store/0]). -export([new/0, add/3, delete/2, retrieve/2, count/2]). -%% FIXME: Keep the entries in binaries instead of lists? Hashes do -%% have fixed lenght. --record(tree_store, {layers :: list()}). % orddict of lists, keyed on layer. +-record(tree_store, {layers :: array:array(array:array(binary()))}). % array of arrays, keyed on layer. -type tree_store() :: #tree_store{}. %%%%%%%%%%%%%%%%%%%% %% Public. new() -> - #tree_store{layers = orddict:new()}. + #tree_store{layers = array:new()}. -spec add(tree_store(), non_neg_integer(), binary()) -> tree_store(). add(S = #tree_store{layers = Layers}, Layer, Entry) -> - {NewLayers, List} = layer(Layers, rw, Layer), - NewList = [Entry | List], - S#tree_store{layers = orddict:store(Layer, NewList, NewLayers)}. + {NewLayers, List} = layer_rw(Layers, Layer), + NewList = array:set(array:size(List), Entry, List), + S#tree_store{layers = array:set(Layer, NewList, NewLayers)}. -spec delete(tree_store(), non_neg_integer()) -> tree_store(). delete(S = #tree_store{layers = Layers}, Layer) -> - List = layer(Layers, ro, Layer), - [_ | NewList] = List, - S#tree_store{layers = orddict:store(Layer, NewList, Layers)}. + List = layer_ro(Layers, Layer), + NewList = array:resize(array:size(List) - 1, List), + S#tree_store{layers = array:set(Layer, NewList, Layers)}. -spec retrieve(tree_store(), tuple()) -> binary() | undefined. retrieve(#tree_store{layers = Layers}, {Layer, Index}) -> - List = layer(Layers, ro, Layer), - Len = length(List), + List = layer_ro(Layers, Layer), + Len = array:size(List), case Index < Len of - true -> lists:nth(Len - Index, List); + true -> array:get(Index, List); false -> undefined end. -spec count(tree_store(), non_neg_integer()) -> non_neg_integer(). count(#tree_store{layers = Layers}, Layer) -> - length(layer(Layers, ro, Layer)). + array:size(layer_ro(Layers, Layer)). %%%%%%%%%%%%%%%%%%%% %% Private. --spec layer(list(), rw | ro, non_neg_integer()) -> list() | {list(), list()}. -layer(Layers, Access, Layer) -> - case Access of - rw -> - case orddict:find(Layer, Layers) of - error -> {orddict:store(Layer, [], Layers), []}; - {ok, List} -> {Layers, List} - end; - ro -> - case orddict:find(Layer, Layers) of - error -> []; - {ok, List} -> List - end +-spec layer_ro(array:array(array:array(binary())), non_neg_integer()) -> array:array(binary). +layer_ro(Layers, Layer) -> + case array:get(Layer, Layers) of + undefined -> array:new(); + List -> List + end. + +-spec layer_rw(array:array(array:array(binary())), non_neg_integer()) -> {array:array(), array:array(binary)}. +layer_rw(Layers, Layer) -> + case array:get(Layer, Layers) of + undefined -> {array:set(Layer, array:new(), Layers), array:new()}; + List -> {Layers, List} end. %%%%%%%%%%%%%%%%%%%% -- cgit v1.1 From cc2aaa2807bb13f4683c2d74a414d39d5b29a372 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 27 Oct 2014 01:33:09 +0100 Subject: Remove old code --- src/plop_app.erl | 4 ---- 1 file changed, 4 deletions(-) (limited to 'src') diff --git a/src/plop_app.erl b/src/plop_app.erl index f90792d..767bf06 100644 --- a/src/plop_app.erl +++ b/src/plop_app.erl @@ -4,10 +4,6 @@ -module(plop_app). -behaviour(application). -export([start/2, stop/1]). --export([install/1]). - -install(Nodes) -> - db:init_db(Nodes). start(normal, Args) -> plop_sup:start_link(Args). -- cgit v1.1