%%% Copyright (c) 2014-2015, NORDUnet A/S. %%% See LICENSE for licensing information. %% Implements an interface to a file pair (basename and %% basename.chksum) that stores an ordered list of fixed-size entries. %% Entries can be added at the end and are retrieved by index. Entries %% can also be added at already existing indices, but then the %% contents must be the same. %% %% Writes(add, addlast) need to be serialized. %% TODO: Checksums -module(index). -behaviour(gen_server). -export([start_link/2, stop/1, init_module/0]). -export([get/2, getrange/3, add/3, add_nosync/3, addlast_nosync/2, indexsize/1, sync/1]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -record(state, {name, add_file}). -define(DIRECTORY_TABLE, index_directory). init_module() -> case ets:info(?DIRECTORY_TABLE) of undefined -> ok; _ -> ets:delete(?DIRECTORY_TABLE) end, ets:new(?DIRECTORY_TABLE, [set, public, named_table]). start_link(Name, Filename) -> gen_server:start_link({local, Name}, ?MODULE, [Name, Filename], []). stop(Name) -> gen_server:call(Name, stop). init([Name, Filename]) -> lager:debug("registering ~p with file name ~p", [Name, Filename]), true = ets:insert(?DIRECTORY_TABLE, {Name, Filename}), case file:open(Filename, [read, write, binary]) of {ok, File} -> {ok, #state{name = Name, add_file = File}}; {error, Error} -> {stop, Error} end. handle_cast(_Request, State) -> {noreply, State}. handle_info(_Info, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, _State) -> io:format("~p terminating~n", [?MODULE]), ok. -spec add(string(), integer() | last, binary()) -> ok. add(Name, Index, Entry) -> ok = gen_server:call(Name, {add, Index, Entry}), sync(Name). -spec add_nosync(string(), integer() | last, binary()) -> ok. add_nosync(Name, Index, Entry) -> gen_server:call(Name, {add, Index, Entry}). handle_call({add, Index, Entry}, _From, State) -> Result = add_internal(State#state.add_file, Index, Entry), {reply, Result, State}; handle_call(stop, _From, State) -> {stop, normal, stopped, State}. -define(ENTRYSIZE, 32). -define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). add_internal(File, Index, Entry) when is_binary(Entry), size(Entry) == ?ENTRYSIZE -> {ok, Position} = file:position(File, eof), Mode = case Index of last when Position rem ?ENTRYSIZEINFILE == 0 -> write; Index when is_integer(Index), Index * ?ENTRYSIZEINFILE == Position -> write; Index when is_integer(Index), Index * ?ENTRYSIZEINFILE < Position -> read; _ -> util:exit_with_error(invalid, writefile, "Index not valid") end, EntryText = hex:bin_to_hexstr(Entry) ++ "\n", case Mode of write -> ok = file:write(File, EntryText); read -> {ok, _Position} = file:position(File, {bof, Index * ?ENTRYSIZEINFILE}), {ok, OldEntryText} = file:read(File, ?ENTRYSIZEINFILE), %% check that the written content is the same as %% the old content case binary_to_list(OldEntryText) of EntryText -> ok; _ -> util:exit_with_error(invalid, writefile, "Written content not the" ++ " same as old content") end end. -spec sync(string()) -> ok. sync(Name) -> [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name), util:fsync([Basepath, filename:dirname(Basepath)]). -spec addlast_nosync(string(), binary()) -> ok. addlast_nosync(Name, Entry) -> add_nosync(Name, last, Entry). decodedata(Binary) -> lists:reverse(decodedata(Binary, [])). decodedata(<<>>, Acc) -> Acc; decodedata(<>, Acc) -> decodedata(Rest, [mochihex:to_bin(binary_to_list(Entry)) | Acc]); decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) -> util:exit_with_error(badformat, readindex, "Index line not ending with linefeed"). -spec indexsize(string()) -> integer(). indexsize(Name) -> [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name), case file:open(Basepath, [read, binary]) of {ok, File} -> {ok, Filesize} = file:position(File, eof), file:close(File), lager:debug("file ~p size ~p", [Basepath, Filesize]), Filesize div ?ENTRYSIZEINFILE; {error, Error} -> util:exit_with_error(Error, readfile, "Error opening file for reading") end. -spec get(string(), integer()) -> binary() | noentry. get(Name, Index) -> case getrange(Name, Index, Index) of noentry -> noentry; [Entry] -> Entry end. -spec getrange(string(), integer(), integer()) -> [binary()] | noentry. getrange(Name, Start, End) when Start =< End -> lager:debug("db ~p", [Name]), [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name), lager:debug("path ~p start ~p end ~p", [Basepath, Start, End]), case file:open(Basepath, [read, binary]) of {ok, File} -> {ok, Filesize} = file:position(File, eof), if End * ?ENTRYSIZEINFILE + ?ENTRYSIZEINFILE =< Filesize -> {ok, _Position} = file:position(File, Start * ?ENTRYSIZEINFILE), {ok, EntryText} = file:read(File, ?ENTRYSIZEINFILE * (End - Start + 1)), Entry = decodedata(EntryText), lager:debug("entries ~p", [length(Entry)]), file:close(File), Entry; true -> file:close(File), noentry end; {error, Error} -> util:exit_with_error(Error, readfile, "Error opening file for reading") end.