diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/leveldb.erl | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/src/leveldb.erl b/src/leveldb.erl new file mode 100644 index 0000000..b1dc02c --- /dev/null +++ b/src/leveldb.erl @@ -0,0 +1,122 @@ +%%% Copyright (c) 2016, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(leveldb). + +-behaviour(gen_server). + +-export([start_link/2, stop/1, init_module/0]). +-export([getvalue/2, addvalue/3, commit/1, commit/2]). + +%% gen_server callbacks. +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-record(state, + {cachename, name, port, requests, requestcounter}). + +getvalue(Name, Key) -> + gen_server:call(Name, {getvalue, Key}). + +addvalue(Name, Key, Value) -> + gen_server:call(Name, {addvalue, Key, Value}). + +commit(Name) -> + gen_server:call(Name, {commit}). +commit(Name, Timeout) -> + gen_server:call(Name, {commit}, Timeout). + +init([Name, Filename]) -> + lager:info("starting leveldb server '~p'", [Name]), + process_flag(trap_exit, true), + Port = open_port({spawn_executable, code:priv_dir(plop) ++ "/leveldbport"}, + [{packet, 4}, {args, [Filename]}, binary]), + {ok, #state{name = Name, + port = Port, + requestcounter = 0, + requests = queue:new()}}. + +init_module() -> + ok. + +start_link(Name, Filename) -> + gen_server:start_link({local, Name}, ?MODULE, [Name, Filename], []). + +stop(Name) -> + gen_server:call(Name, stop). + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({Port, {data, Data}}, State) when is_port(Port) -> + lager:debug("response: ~p", [Data]), + {{value, {From, Action}}, Requests} = queue:out(State#state.requests), + lager:debug("response ~p ~p: ~p", + [State#state.name, + State#state.requestcounter - queue:len(State#state.requests), + Action]), + gen_server:reply(From, case Action of + getvalue -> + case Data of + <<>> -> + noentry; + _ -> + Data + end; + addvalue -> + case Data of + <<>> -> + util:exit_with_error( + putvalue, "Error in putvalue"); + _ -> + ok + end; + commit -> + Data + end), + {noreply, State#state{requests = Requests}}; +handle_info(_Info, State) -> + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + io:format("~p terminating~n", [?MODULE]), + State#state.port ! {self(), {command, <<>>}}, + ok. + +add_request(State, From, Action) -> + State#state{ + requests = queue:in({From, Action}, State#state.requests), + requestcounter = State#state.requestcounter + 1 + }. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; + +handle_call({getvalue, Key}, From, State) -> + lager:debug("getvalue ~p ~p: ~p", [State#state.name, + State#state.requestcounter, Key]), + getvalue_port_command(State#state.port, Key), + {noreply, add_request(State, From, getvalue)}; + +handle_call({addvalue, Key, Value}, From, State) -> + lager:debug("addvalue ~p ~p: ~p ~p", [State#state.name, + State#state.requestcounter, Key, Value]), + addvalue_port_command(State#state.port, Key, Value), + {noreply, add_request(State, From, addvalue)}; + +handle_call({commit}, From, State) -> + lager:debug("commit ~p ~p", [State#state.name, State#state.requestcounter]), + commit_port_command(State#state.port), + {noreply, add_request(State, From, commit)}. + +getvalue_port_command(Port, Key) -> + Port ! {self(), {command, <<0:8, Key/binary>>}}. + +addvalue_port_command(Port, Key, Value) -> + Port ! {self(), {command, <<1:8, Key:32/binary, Value/binary>>}}. + +commit_port_command(Port) -> + Port ! {self(), {command, <<2:8>>}}. |