summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2015-10-01 12:39:28 +0200
committerLinus Nordberg <linus@nordu.net>2015-11-11 13:32:37 +0100
commit0a3e6aafee314eaf9e5343c4cad89a9e2ae1d913 (patch)
tree2ceb97ebf656a26ac384e0e550dc2070d1b7ec72
parent55820add0bda7ac926f11ee49b232dc11d6fe39c (diff)
Change index.erl to use gen_server and named databases.
Prefetch indices in frontend:fetchmissingentries/2.
-rw-r--r--src/db.erl17
-rw-r--r--src/frontend.erl31
-rw-r--r--src/index.erl167
-rw-r--r--src/plop_app.erl1
-rw-r--r--src/plop_sup.erl18
-rw-r--r--src/storagedb.erl12
6 files changed, 151 insertions, 95 deletions
diff --git a/src/db.erl b/src/db.erl
index f204f76..5ad49ec 100644
--- a/src/db.erl
+++ b/src/db.erl
@@ -52,7 +52,7 @@ sendsth_verified() ->
end.
indexsize() ->
- index:indexsize(index_path()).
+ index:indexsize(index_db).
init(_Args) ->
{ok, []}.
@@ -182,11 +182,6 @@ terminate(_Reason, _State) ->
%%%%%%%%%%%%%%%%%%%%
%% The meat.
-% Table for Index -> Leaf hash
-index_path() ->
- {ok, Value} = application:get_env(plop, index_path),
- Value.
-
% File that stores the number of verified entries
verifiedsize_path() ->
{ok, Value} = application:get_env(plop, verifiedsize_path),
@@ -209,10 +204,10 @@ index_for_leafhash(LeafHash) ->
end.
leafhash_for_index(Index) ->
- index:get(index_path(), Index).
+ index:get(index_db, Index).
leafhash_for_indices(Start, End) ->
- index:getrange(index_path(), Start, End).
+ index:getrange(index_db, Start, End).
leafhash_for_entryhash(EntryHash) ->
perm:getvalue(entryhash_db, EntryHash).
@@ -235,7 +230,7 @@ handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
handle_call({add_index_nosync_noreverse, {LeafHash, Index}}, _From, State) ->
- ok = index:add_nosync(index_path(), Index, LeafHash),
+ ok = index:add_nosync(index_db, Index, LeafHash),
{reply, ok, State}.
indexforhash_nosync(LeafHash, Index) ->
@@ -248,6 +243,4 @@ indexforhash_dosync() ->
ok.
index_sync() ->
- Basepath = index_path(),
- ok = util:fsync([Basepath, filename:dirname(Basepath)]),
- ok.
+ index:sync(index_db).
diff --git a/src/frontend.erl b/src/frontend.erl
index 3b7c15b..b7fa4b1 100644
--- a/src/frontend.erl
+++ b/src/frontend.erl
@@ -302,26 +302,41 @@ check_entry_noreverse(LeafHash, Index) ->
end
end.
+prefetchindices(Index, []) ->
+ case db:leafhash_for_indices(Index, Index + 1000) of
+ noentry ->
+ case db:leafhash_for_index(Index) of
+ noentry ->
+ noentry;
+ Hash ->
+ [Hash]
+ end;
+ Hashes ->
+ Hashes
+ end;
+prefetchindices(_Index, PrefetchList) ->
+ PrefetchList.
+
-spec fetchmissingentries(non_neg_integer(), non_neg_integer()) -> [binary() | noentry].
fetchmissingentries(Index, MaxEntries) ->
- lists:reverse(fetchmissingentries(Index, [], MaxEntries)).
+ lists:reverse(fetchmissingentries(Index, [], [], MaxEntries)).
--spec fetchmissingentries(non_neg_integer(), [binary() | noentry], non_neg_integer()) ->
+-spec fetchmissingentries(non_neg_integer(), [binary() | noentry], [binary()], non_neg_integer()) ->
[binary() | noentry].
-fetchmissingentries(_Index, Acc, 0) ->
+fetchmissingentries(_Index, Acc, _PrefetchList, 0) ->
Acc;
-fetchmissingentries(Index, Acc, MaxEntries) ->
+fetchmissingentries(Index, Acc, PrefetchList, MaxEntries) ->
lager:debug("index ~p", [Index]),
- case db:leafhash_for_index(Index) of
+ case prefetchindices(Index, PrefetchList) of
noentry ->
Acc;
- Hash ->
+ [Hash|PrefetchRest] ->
case db:entry_for_leafhash(Hash) of
noentry ->
lager:debug("didn't find hash ~p", [Hash]),
- fetchmissingentries(Index + 1, [Hash | Acc], MaxEntries - 1);
+ fetchmissingentries(Index + 1, [Hash | Acc], PrefetchRest, MaxEntries - 1);
_ ->
- fetchmissingentries(Index + 1, Acc, MaxEntries)
+ fetchmissingentries(Index + 1, Acc, PrefetchRest, MaxEntries)
end
end.
diff --git a/src/index.erl b/src/index.erl
index a91c17c..fe47f59 100644
--- a/src/index.erl
+++ b/src/index.erl
@@ -12,75 +12,121 @@
%% TODO: Checksums
-module(index).
+-behaviour(gen_server).
+
+-export([start_link/2, stop/1, init_module/0]).
-export([get/2, getrange/3, add/3, add_nosync/3, addlast_nosync/2, indexsize/1, sync/1]).
--define(ENTRYSIZE, 32).
--define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)).
+%% gen_server callbacks.
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
--spec add(string(), integer() | last, binary()) -> ok.
-add(Basepath, Index, Entry) ->
- add(Basepath, Index, Entry, sync).
+-record(state, {name, add_file}).
--spec add_nosync(string(), integer() | last, binary()) -> ok.
-add_nosync(Basepath, Index, Entry) ->
- add(Basepath, Index, Entry, nosync).
+-define(DIRECTORY_TABLE, index_directory).
+
+init_module() ->
+ case ets:info(?DIRECTORY_TABLE) of
+ undefined -> ok;
+ _ -> ets:delete(?DIRECTORY_TABLE)
+ end,
+ ets:new(?DIRECTORY_TABLE, [set, public, named_table]).
+
+start_link(Name, Filename) ->
+ gen_server:start_link({local, Name}, ?MODULE,
+ [Name, Filename], []).
-add(Basepath, Index, Entry, Syncflag) when is_binary(Entry), size(Entry) == ?ENTRYSIZE ->
- case file:open(Basepath, [read, write, binary]) of
+stop(Name) ->
+ gen_server:call(Name, stop).
+
+init([Name, Filename]) ->
+ lager:debug("registering ~p with file name ~p", [Name, Filename]),
+ true = ets:insert(?DIRECTORY_TABLE, {Name, Filename}),
+ case file:open(Filename, [read, write, binary]) of
{ok, File} ->
- {ok, Position} = file:position(File, eof),
- Mode = case Index of
- last when Position rem ?ENTRYSIZEINFILE == 0 ->
- write;
- Index when is_integer(Index),
- Index * ?ENTRYSIZEINFILE == Position ->
- write;
- Index when is_integer(Index),
- Index * ?ENTRYSIZEINFILE < Position ->
- read;
- _ ->
- util:exit_with_error(invalid, writefile,
- "Index not valid")
- end,
- EntryText = hex:bin_to_hexstr(Entry) ++ "\n",
- case Mode of
- write ->
- ok = file:write(File, EntryText);
- read ->
- {ok, _Position} =
- file:position(File, {bof, Index * ?ENTRYSIZEINFILE}),
- {ok, OldEntryText} = file:read(File, ?ENTRYSIZEINFILE),
- %% check that the written content is the same as
- %% the old content
- case binary_to_list(OldEntryText) of
- EntryText ->
- ok;
- _ ->
- util:exit_with_error(invalid, writefile,
- "Written content not the" ++
- " same as old content")
- end
- end,
- ok = file:close(File),
- case Syncflag of
- sync ->
- sync(Basepath);
- nosync ->
- ok
- end;
+ {ok, #state{name = Name, add_file = File}};
{error, Error} ->
- util:exit_with_error(Error, writefile,
- "Error opening file for writing")
+ {stop, Error}
+ end.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ io:format("~p terminating~n", [?MODULE]),
+ ok.
+
+
+-spec add(string(), integer() | last, binary()) -> ok.
+add(Name, Index, Entry) ->
+ ok = gen_server:call(Name, {add, Index, Entry}),
+ sync(Name).
+
+-spec add_nosync(string(), integer() | last, binary()) -> ok.
+add_nosync(Name, Index, Entry) ->
+ gen_server:call(Name, {add, Index, Entry}).
+
+handle_call({add, Index, Entry}, _From, State) ->
+ Result = add_internal(State#state.add_file, Index, Entry),
+ {reply, Result, State};
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State}.
+
+
+-define(ENTRYSIZE, 32).
+-define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)).
+
+
+add_internal(File, Index, Entry) when is_binary(Entry), size(Entry) == ?ENTRYSIZE ->
+ {ok, Position} = file:position(File, eof),
+ Mode = case Index of
+ last when Position rem ?ENTRYSIZEINFILE == 0 ->
+ write;
+ Index when is_integer(Index),
+ Index * ?ENTRYSIZEINFILE == Position ->
+ write;
+ Index when is_integer(Index),
+ Index * ?ENTRYSIZEINFILE < Position ->
+ read;
+ _ ->
+ util:exit_with_error(invalid, writefile,
+ "Index not valid")
+ end,
+ EntryText = hex:bin_to_hexstr(Entry) ++ "\n",
+ case Mode of
+ write ->
+ ok = file:write(File, EntryText);
+ read ->
+ {ok, _Position} =
+ file:position(File, {bof, Index * ?ENTRYSIZEINFILE}),
+ {ok, OldEntryText} = file:read(File, ?ENTRYSIZEINFILE),
+ %% check that the written content is the same as
+ %% the old content
+ case binary_to_list(OldEntryText) of
+ EntryText ->
+ ok;
+ _ ->
+ util:exit_with_error(invalid, writefile,
+ "Written content not the" ++
+ " same as old content")
+ end
end.
-spec sync(string()) -> ok.
-sync(Basepath) ->
+sync(Name) ->
+ [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name),
util:fsync([Basepath, filename:dirname(Basepath)]).
-spec addlast_nosync(string(), binary()) -> ok.
-addlast_nosync(Basepath, Entry) ->
- add_nosync(Basepath, last, Entry).
+addlast_nosync(Name, Entry) ->
+ add_nosync(Name, last, Entry).
decodedata(Binary) ->
lists:reverse(decodedata(Binary, [])).
@@ -94,7 +140,8 @@ decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) ->
"Index line not ending with linefeed").
-spec indexsize(string()) -> integer().
-indexsize(Basepath) ->
+indexsize(Name) ->
+ [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name),
case file:open(Basepath, [read, binary]) of
{ok, File} ->
{ok, Filesize} = file:position(File, eof),
@@ -107,8 +154,8 @@ indexsize(Basepath) ->
end.
-spec get(string(), integer()) -> binary() | noentry.
-get(Basepath, Index) ->
- case getrange(Basepath, Index, Index) of
+get(Name, Index) ->
+ case getrange(Name, Index, Index) of
noentry ->
noentry;
[Entry] ->
@@ -116,7 +163,9 @@ get(Basepath, Index) ->
end.
-spec getrange(string(), integer(), integer()) -> [binary()] | noentry.
-getrange(Basepath, Start, End) when Start =< End ->
+getrange(Name, Start, End) when Start =< End ->
+ lager:debug("db ~p", [Name]),
+ [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name),
lager:debug("path ~p start ~p end ~p", [Basepath, Start, End]),
case file:open(Basepath, [read, binary]) of
{ok, File} ->
diff --git a/src/plop_app.erl b/src/plop_app.erl
index dc896e2..611012a 100644
--- a/src/plop_app.erl
+++ b/src/plop_app.erl
@@ -10,6 +10,7 @@ start(normal, Args) ->
http_auth:init_key_table(),
plop:initsize(),
perm:init_module(),
+ index:init_module(),
plop_sup:start_link(Args).
stop(_State) ->
diff --git a/src/plop_sup.erl b/src/plop_sup.erl
index 442eabc..55d6e9a 100644
--- a/src/plop_sup.erl
+++ b/src/plop_sup.erl
@@ -26,11 +26,11 @@ permanent_worker(Name, StartFunc, Modules) ->
10000,
worker, Modules}.
-perm_database_children(DB) ->
- lists:filtermap(fun ({Name, DBName, ConfigName}) ->
+database_children(DB) ->
+ lists:filtermap(fun ({Module, Name, DBName, ConfigName}) ->
case application:get_env(plop, ConfigName) of
{ok, Path} ->
- {true, permanent_worker(Name, {perm, start_link, [DBName, Path]})};
+ {true, permanent_worker(Name, {Module, start_link, [DBName, Path]})};
undefined ->
false
end
@@ -39,11 +39,13 @@ perm_database_children(DB) ->
%% Supervisor callback
init([]) ->
Services = application:get_env(plop, services, []),
- DBChildren = perm_database_children([
- {the_entryhash_db, entryhash_db, entryhash_root_path},
- {the_indexforhash_db, indexforhash_db, indexforhash_root_path},
- {the_entry_db, entry_db, entry_root_path}
- ]),
+ DBChildren = database_children([
+ {perm, the_entryhash_db, entryhash_db, entryhash_root_path},
+ {perm, the_indexforhash_db, indexforhash_db, indexforhash_root_path},
+ {perm, the_entry_db, entry_db, entry_root_path},
+ {index, the_index_db, index_db, index_path},
+ {index, the_newentries_db, newentries_db, newentries_path}
+ ]),
Children = [permanent_worker(the_db, {db, start_link, []}, [db]),
permanent_worker(the_storagedb, {storagedb, start_link, []}),
permanent_worker(fsync, {fsyncport, start_link, []})],
diff --git a/src/storagedb.erl b/src/storagedb.erl
index d781033..9f7da37 100644
--- a/src/storagedb.erl
+++ b/src/storagedb.erl
@@ -31,11 +31,11 @@ lastverifiednewentry_path() ->
%% Public API.
fetchnewhashes(Index) ->
- case index:indexsize(newentries_path()) of
+ case index:indexsize(newentries_db) of
0 ->
[];
Size ->
- index:getrange(newentries_path(), Index, Size - 1)
+ index:getrange(newentries_db, Index, Size - 1)
end.
lastverifiednewentry() ->
@@ -52,7 +52,7 @@ lastverifiednewentry() ->
-spec add(binary()) -> ok.
add(LeafHash) ->
ok = call(?MODULE, {add_nosync, LeafHash}),
- ok = index:sync(newentries_path()),
+ ok = index:sync(newentries_db),
ok.
%%%%%%%%%%%%%%%%%%%%
@@ -73,13 +73,9 @@ terminate(_Reason, _State) ->
%%%%%%%%%%%%%%%%%%%%
-newentries_path() ->
- {ok, Value} = application:get_env(plop, newentries_path),
- Value.
-
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
handle_call({add_nosync, LeafHash}, _From, State) ->
- ok = index:addlast_nosync(newentries_path(), LeafHash),
+ ok = index:addlast_nosync(newentries_db, LeafHash),
{reply, ok, State}.