summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2017-07-28 13:38:43 +0200
committerMagnus Ahltorp <map@kth.se>2017-07-28 13:38:43 +0200
commit11e0bdd000dad72ed4c46d419e253ee0caefbc2b (patch)
tree3c886c080433ce70bd23d682c730d232f2e87a3b
parent489df8ecaf16ca7429eb15b31ffbe6f686f5b0d1 (diff)
Still WIPmergefetch
-rw-r--r--merge/ebin/merge.app4
-rw-r--r--merge/src/merge_fetch_ctrl.erl24
-rw-r--r--merge/src/merge_fetch_fetch.erl72
-rw-r--r--merge/src/merge_fetch_fetch_sup.erl30
-rw-r--r--merge/src/merge_fetch_newentries.erl26
-rw-r--r--merge/src/merge_sup.erl5
-rw-r--r--src/plop.erl8
-rw-r--r--src/plop_httputil.erl21
8 files changed, 168 insertions, 22 deletions
diff --git a/merge/ebin/merge.app b/merge/ebin/merge.app
index b34334a..c195185 100644
--- a/merge/ebin/merge.app
+++ b/merge/ebin/merge.app
@@ -6,8 +6,8 @@
{application, merge,
[{description, "Plop merge"},
{vsn, "1.0.1-alpha-dev"},
- {modules, [merge_app, merge_dist, merge_dist_sup, merge_sup]},
- {applications, [kernel, stdlib, lager, plop]},
+ {modules, [merge_app, merge_dist, merge_dist_sup, merge_sup, merge_fetch]},
+ {applications, [kernel, stdlib, lager, catlfish, plop]},
{registered, [merge_dist, merge_dist_sup, merge_sup]},
{mod, {merge_app, []}}
]}.
diff --git a/merge/src/merge_fetch_ctrl.erl b/merge/src/merge_fetch_ctrl.erl
index 6c055df..49fe043 100644
--- a/merge/src/merge_fetch_ctrl.erl
+++ b/merge/src/merge_fetch_ctrl.erl
@@ -69,7 +69,7 @@ handle_call({newentries, List, Node}, _From, #state{tofetch = ToFetch} = State)
{true, {Hash, undefined}}
end
end, List),
- {noreply, State#state{tofetch = [ToFetch | NewEntries]}};
+ {reply, ok, State#state{tofetch = ToFetch ++ NewEntries}};
handle_call({entriestofetch, Node, NMax}, _From,
#state{tofetch = ToFetch} = State) ->
@@ -79,12 +79,12 @@ handle_call({entriestofetch, Node, NMax}, _From,
handle_call({fetchstatus, Entry, success}, _From,
#state{tofetch = ToFetch} = State) ->
NewToFetch = fetch_success(ToFetch, Entry),
- {noreply, State#state{tofetch = NewToFetch}};
+ {reply, ok, State#state{tofetch = NewToFetch}};
handle_call({fetchstatus, Entry, failure}, _From,
#state{tofetch = ToFetch} = State) ->
NewToFetch = fetch_failure(ToFetch, Entry),
- {noreply, State#state{tofetch = NewToFetch}}.
+ {reply, ok, State#state{tofetch = NewToFetch}}.
handle_cast(_Request, State) ->
{noreply, State}.
@@ -97,8 +97,18 @@ terminate(_Reason, _State) ->
ok.
%%%%%%%%%%%%%%%%%%%%
+
+write_currentsize(CurrentSize, LastHash) ->
+ {ok, CurrentSizeFile} = application:get_env(plop, fetched_path),
+ CurrentSizeData = {struct, [{index, CurrentSize - 1}, {hash, list_to_binary(hex:bin_to_hexstr(LastHash))}]},
+ ok = atomic:replacefile(CurrentSizeFile, mochijson2:encode(CurrentSizeData)).
+
-spec fetch_success(list(), binary()) -> list().
fetch_success(ToFetch, Entry) ->
+ CurrentPos = db:indexsize(),
+ db:add_index_nosync_noreverse(Entry, CurrentPos),
+ db:index_sync(),
+ write_currentsize(CurrentPos + 1, Entry),
true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, []}),
keydelete(Entry, 1, ToFetch).
@@ -116,7 +126,7 @@ fetch_failure(ToFetch, Entry) ->
etf(ToFetch, Node, NMax) ->
etf(ToFetch, Node, NMax, [], []).
etf(ToFetchRest, _Node, 0, AccToFetch, AccEntries) ->
- {[reverse(AccToFetch) | ToFetchRest], reverse(AccEntries)};
+ {reverse(AccToFetch) ++ ToFetchRest, reverse(AccEntries)};
etf([], Node, _NMax, AccToFetch, AccEntries) ->
etf([], Node, 0, AccToFetch, AccEntries);
etf([H|T], Node, NMax, AccToFetch, AccEntries) ->
@@ -142,8 +152,10 @@ etf([H|T], Node, NMax, AccToFetch, AccEntries) ->
not_fetched(List) ->
filter(fun(H) -> case ets:match(?MISSINGENTRIES_TABLE, {H, '$1'}) of
- [[[]]] -> false; % Match: Empty list.
- _ -> true
+ [] ->
+ true; % Match: Empty list.
+ E ->
+ false
end
end,
List).
diff --git a/merge/src/merge_fetch_fetch.erl b/merge/src/merge_fetch_fetch.erl
index b2fadd9..2ba7e9a 100644
--- a/merge/src/merge_fetch_fetch.erl
+++ b/merge/src/merge_fetch_fetch.erl
@@ -7,19 +7,81 @@
-export([start_link/1]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
+-export([loop/2]).
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
-init({Name, _Address}) ->
- lager:info("~p:~p: starting", [?MODULE, Name]),
- {ok, []}.
+init({Name, Address}) ->
+ lager:info("~p:~p starting", [?MODULE, Name]),
+ ChildPid = spawn_link(?MODULE, loop, [Name, Address]),
+ {ok, ChildPid}.
+
+decode_entry({struct, EncodedEntry}) ->
+ Hash = base64:decode(proplists:get_value(<<"hash">>, EncodedEntry)),
+ Entry = base64:decode(proplists:get_value(<<"entry">>, EncodedEntry)),
+ {Hash, Entry}.
+
+get_entries(_, _, []) ->
+ {ok, []};
+get_entries(NodeName, NodeAddress, Hashes) ->
+ DebugTag = "getentry",
+ URL = NodeAddress ++ "getentry",
+ EncodedHashes = lists:map(fun (H) -> {"hash", base64:encode(H)} end, Hashes),
+ Params = hackney_url:qs(EncodedHashes),
+ URLWithParams = [URL, "?", Params],
+ case merge_util:request(DebugTag, binary_to_list(iolist_to_binary(URLWithParams)), NodeName) of
+ {<<"ok">>, PropList} ->
+ Entries = lists:map(fun (S) -> decode_entry(S) end, proplists:get_value(<<"entries">>, PropList)),
+ {ok, Entries};
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
+loop(Name, Address) ->
+ receive after 1000 -> ok end,
+ lager:info("~p:~p: asking for entries to get from ~p",
+ [?MODULE, Name, Address]),
+ Hashes = merge_fetch_ctrl:entriestofetch(Name, 10),
+ {ok, Entries} = get_entries(Name, Address, Hashes),
+ lists:foreach(fun ({Hash, Entry}) ->
+
+ try
+ case plop:verify_entry(Entry) of
+ {ok, Hash} ->
+ ok;
+ {ok, DifferentLeafHash} ->
+ lager:error("leaf hash not correct: requested hash is ~p " ++
+ "and contents are ~p",
+ [hex:bin_to_hexstr(Hash),
+ hex:bin_to_hexstr(DifferentLeafHash)]),
+ throw({request_error, result, "", differentleafhash});
+ {error, Reason} ->
+ lager:error("verification failed: ~p", [Reason]),
+ throw({request_error, result, "", verificationfailed})
+ end
+ catch
+ Type:What ->
+ [CrashFunction | Stack] = erlang:get_stacktrace(),
+ lager:error("Crash: ~p ~p~n~p~n~p~n",
+ [Type, What, CrashFunction, Stack]),
+ throw(What)
+ end,
+
+ db:add_entry_sync(Hash, Entry),
+
+ merge_fetch_ctrl:fetchstatus(Hash, success),
+ end, Entries),
+ loop(Name, Address).
+
%% TODO: if we crash here, we restart all of fetch -- spawn child proc
%% for the actual fetching?
-handle_call(stop, _From, State) ->
- {stop, normal, stopped, State}.
+handle_call(stop, _From, ChildPid) ->
+ lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]),
+ exit(ChildPid, stop),
+ {stop, normal, stopped, nil}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
diff --git a/merge/src/merge_fetch_fetch_sup.erl b/merge/src/merge_fetch_fetch_sup.erl
new file mode 100644
index 0000000..fb89ab4
--- /dev/null
+++ b/merge/src/merge_fetch_fetch_sup.erl
@@ -0,0 +1,30 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(merge_fetch_fetch_sup).
+-behaviour(supervisor).
+
+-export([start_link/1, init/1]).
+
+start_link(Nodes) ->
+ {ok, Pid} =
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []),
+ Children =
+ lists:map(fun({NodeName, NodeAddress}) ->
+ lager:info("starting fetch worker: ~p", [NodeName]),
+
+ {ok, Child} = supervisor:start_child(
+ ?MODULE,
+ [{NodeName, NodeAddress}]),
+ Child
+ end, Nodes),
+ lager:debug("~p started newentry workers: ~p", [Pid, Children]),
+ {ok, Pid}.
+
+init([]) ->
+ {ok,
+ {{simple_one_for_one, 3, 10},
+ [{ignored,
+ {merge_fetch_fetch, start_link, []},
+ permanent, 10000, worker,
+ [merge_fetch_fetch]}]}}.
diff --git a/merge/src/merge_fetch_newentries.erl b/merge/src/merge_fetch_newentries.erl
index b45aaec..befb3f7 100644
--- a/merge/src/merge_fetch_newentries.erl
+++ b/merge/src/merge_fetch_newentries.erl
@@ -21,10 +21,34 @@ handle_call(stop, _From, ChildPid) ->
exit(ChildPid, stop),
{stop, normal, stopped, nil}.
+get_newentries(NodeName, NodeAddress) ->
+ DebugTag = "fetchnewentries",
+ URL = NodeAddress ++ "fetchnewentries",
+ case merge_util:request(DebugTag, URL, NodeName) of
+ {<<"ok">>, PropList} ->
+ Entries = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"entries">>, PropList)),
+ {ok, Entries};
+ Err ->
+ throw({request_error, result, DebugTag, Err})
+ end.
+
loop(Name, Address, Period) ->
+ receive after Period -> ok end,
lager:info("~p:~p: asking storage node at ~p for missing entries",
[?MODULE, Name, Address]),
- receive after Period -> ok end,
+ EntriesResult = try
+ get_newentries(Name, Address)
+ catch
+ throw:{request_error,request,Tag,Error2} ->
+ {error, Tag, Error2}
+ end,
+ case EntriesResult of
+ {ok, Entries} ->
+ lager:debug("got entries: ~p", [Entries]),
+ merge_fetch_ctrl:newentries(Entries, Name);
+ {error, _Tag, Error} ->
+ lager:info("failed to get entries from ~p: ~p", [Name, Error])
+ end,
loop(Name, Address, Period).
handle_cast(_Request, State) ->
diff --git a/merge/src/merge_sup.erl b/merge/src/merge_sup.erl
index a158077..7373de4 100644
--- a/merge/src/merge_sup.erl
+++ b/merge/src/merge_sup.erl
@@ -11,6 +11,7 @@ start_link(_Args) ->
init([]) ->
{ok, LogorderPath} = application:get_env(plop, index_path),
+ {ok, StorageNodes} = plopconfig:get_env(storage_nodes),
{ok,
{{one_for_one, 3, 10},
[
@@ -21,5 +22,7 @@ init([]) ->
{merge_dist_sup, {merge_dist_sup, start_link, [[]]},
transient, infinity, supervisor, [merge_dist_sup]},
{merge_sth, {merge_sth, start_link, [[]]},
- transient, 10000, worker, [merge_sth]}
+ transient, 10000, worker, [merge_sth]},
+ {merge_fetch_sup, {merge_fetch_sup, start_link, [StorageNodes]},
+ transient, 10000, worker, [merge_fetch]}
]}}.
diff --git a/src/plop.erl b/src/plop.erl
index 7c7ded7..770c494 100644
--- a/src/plop.erl
+++ b/src/plop.erl
@@ -29,7 +29,7 @@
consistency/2, inclusion/2, inclusion_and_entry/2]).
-export([generate_timestamp/0, save_sth/1, verify_sth/4]).
-export([get_by_leaf_hash/1, entry_for_leafhash/1, spt_data_from_entry/1,
- get_spt_sig/1, add_spt_sig/2]).
+ get_spt_sig/1, add_spt_sig/2, verify_entry/1]).
%% API for tests.
-export([testing_get_pubkey/0]).
@@ -360,6 +360,12 @@ entry_for_leafhash(LeafHash) ->
unwrap_entry(Entry)
end.
+verify_entry(Entry) ->
+ UnwrappedEntry = unwrap_entry(Entry),
+ {ok, {Module, Function}} = application:get_env(plop, verify_entry),
+ Module:Function(UnwrappedEntry).
+
+
fill_in_entry({_Index, LeafHash, notfetched}) ->
get_by_leaf_hash(LeafHash).
diff --git a/src/plop_httputil.erl b/src/plop_httputil.erl
index 81a99b1..b6bbf8e 100644
--- a/src/plop_httputil.erl
+++ b/src/plop_httputil.erl
@@ -92,7 +92,7 @@ request(DebugTag, URL, Headers, RequestBody, Method) ->
get -> "GET";
post -> "POST"
end,
- #hackney_url{path = Path, host = Host} = ParsedURL,
+ #hackney_url{path = Path, host = Host, qs = QueryString, raw_path = RawPath} = ParsedURL,
lager:debug("~s: sending http request to ~p",
[DebugTag, URL]),
case hackney:connect(ParsedURL,
@@ -105,11 +105,20 @@ request(DebugTag, URL, Headers, RequestBody, Method) ->
lager:debug("~s: connected to ~p",
[DebugTag, URL]),
{ok, StatusCode, RespHeaders, ClientRef} =
- hackney:send_request(ConnRef,
- {Method, Path,
- add_auth(MethodString, Path, Headers,
- RequestBody),
- {fun chunk_data/1, RequestBody}}),
+ case Method of
+ post ->
+ hackney:send_request(ConnRef,
+ {Method, Path,
+ add_auth(MethodString, Path, Headers,
+ RequestBody),
+ {fun chunk_data/1, RequestBody}});
+ get ->
+ hackney:send_request(ConnRef,
+ {Method, RawPath,
+ add_auth(MethodString, Path, Headers,
+ QueryString),
+ <<>>})
+ end,
lager:debug("~s: received headers for ~p: ~p",
[DebugTag, URL, RespHeaders]),
{ok, Body} = hackney:body(ClientRef),