%%% Copyright (c) 2015, NORDUnet A/S. %%% See LICENSE for licensing information. -module(permdb). -behaviour(gen_server). -export([start_link/2, stop/1, init_module/0]). -export([getvalue/2, addvalue/3, commit/1, commit/2]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -define(KEYSIZE, 32). -define(BITSPERLEVEL, 2). -define(ENTRIESPERNODE, 4). -define(BYTESPERENTRY, 8). -define(BITSPERENTRY, (?BYTESPERENTRY*8-1)). -define(MAGICSIZE, 2). -define(NODEMAGIC, <<16#8a, 16#44>>). -define(DATAMAGIC, <<16#cb, 16#0e>>). -define(NODESIZE, (?MAGICSIZE+(?ENTRIESPERNODE*?BYTESPERENTRY))). -record(state, {cachename, port, datafile, indexfile, requests}). openfile(Filename) -> {ok, File} = file:open(Filename, [read, write, binary, raw]), File. getvalue_port_command(Port, Key) -> Port ! {self(), {command, <<0:8, Key/binary>>}}. addvalue_port_command(Port, Key, Value) -> Port ! {self(), {command, <<1:8, Key:32/binary, Value/binary>>}}. commit_port_command(Port) -> Port ! {self(), {command, <<2:8>>}}. getdatakey(State, Offset) -> {ok, DataBinary} = file:pread(State#state.datafile, Offset, ?MAGICSIZE+?KEYSIZE+?BYTESPERENTRY), Datamagic = ?DATAMAGIC, <> = DataBinary, {Key, Length}. getdata(State, Offset, Length) -> {ok, DataBinary} = file:pread(State#state.datafile, Offset+?MAGICSIZE+?KEYSIZE+?BYTESPERENTRY, Length), DataBinary. getnode(State, Offset) -> case ets:lookup(State#state.cachename, Offset) of [] -> {ok, NodeBinary} = file:pread(State#state.indexfile, Offset, ?NODESIZE), Nodemagic = ?NODEMAGIC, <> = NodeBinary, ets:insert(State#state.cachename, {Offset, Node}), Node; [{_, Node}] -> Node end. getroot(State) -> case ets:lookup(State#state.cachename, root) of [] -> {ok, _Position} = file:position(State#state.indexfile, {eof, -?NODESIZE}), {ok, RootNodeBinary} = file:read(State#state.indexfile, ?NODESIZE), Nodemagic = ?NODEMAGIC, <> = RootNodeBinary, ets:insert(State#state.cachename, {root, Node}), Node; [{root, Node}] -> Node end. getendentry(State, Key) -> Root = getroot(State), %io:format("Root: ~p~n", [Root]), getendentry(State, Key, Root). getendentry(State, <>, Node) -> case binary_part(Node, KeyHead*?BYTESPERENTRY, ?BYTESPERENTRY) of <<0:?BYTESPERENTRY/integer-unit:8>> -> none; <<0:1, Entry:?BITSPERENTRY>> -> NewNode = getnode(State, Entry), getendentry(State, KeyRest, NewNode); <<1:1, Entry:?BITSPERENTRY>> -> Entry end. getvalue_file(State, Key) -> case getendentry(State, Key) of none -> none; Entry -> case getdatakey(State, Entry) of {Key, Length} -> getdata(State, Entry, Length); _ -> none end end. getvalue(Name, Key) -> gen_server:call(Name, {getvalue, Key}). addvalue(Name, Key, Value) -> gen_server:call(Name, {addvalue, Key, Value}). commit(Name) -> gen_server:call(Name, {commit}). commit(Name, Timeout) -> gen_server:call(Name, {commit}, Timeout). init([Name, Filename]) -> Cachename = list_to_atom(atom_to_list(Name) ++ "_cache"), ets:new(Cachename, [set, public, named_table]), process_flag(trap_exit, true), Port = open_port({spawn_executable, code:priv_dir(plop) ++ "/permdbport"}, [{packet, 4}, {args, [Filename]}, binary]), DataFile = none,%%openfile(Filename), IndexFile = none,%%openfile(Filename ++ ".idx"), {ok, #state{cachename = Cachename, port = Port, datafile = DataFile, indexfile = IndexFile, requests = queue:new()}}. init_module() -> ok. start_link(Name, Filename) -> gen_server:start_link({local, Name}, ?MODULE, [Name, Filename], []). stop(Name) -> gen_server:call(Name, stop). handle_cast(_Request, State) -> {noreply, State}. handle_info({Port, {data, Data}}, State) when is_port(Port) -> lager:debug("response: ~p", [Data]), {{value, {From, Action}}, Requests} = queue:out(State#state.requests), lager:debug("response: ~p", [Action]), gen_server:reply(From, case Action of getvalue -> case Data of <<>> -> noentry; _ -> Data end; addvalue -> case Data of <<>> -> util:exit_with_error(putvalue, "Error in putvalue"); _ -> ok end; commit -> Data end), {noreply, State#state{requests = Requests}}; handle_info(_Info, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, State) -> io:format("~p terminating~n", [?MODULE]), State#state.port ! {self(), {command, <<>>}}, ok. add_request(State, From, Action) -> State#state{ requests = queue:in({From, Action}, State#state.requests) }. handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call({getvalue, Key}, From, State) -> lager:debug("getvalue: ~p", [Key]), Method = port, case Method of port -> getvalue_port_command(State#state.port, Key), {noreply, add_request(State, From, getvalue)}; file -> Value = getvalue_file(State, Key), {reply, Value, State} end; handle_call({addvalue, Key, Value}, From, State) -> lager:debug("addvalue: ~p ~p", [Key, Value]), addvalue_port_command(State#state.port, Key, Value), {noreply, add_request(State, From, addvalue)}; handle_call({commit}, From, State) -> lager:debug("commit", []), commit_port_command(State#state.port), {noreply, add_request(State, From, commit)}.