summaryrefslogtreecommitdiff
path: root/src/permdb.erl
blob: faca986111d73a3a5f098b80fe6fee9f0923d50a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
%%% Copyright (c) 2015-2017, NORDUnet A/S.
%%% See LICENSE for licensing information.

-module(permdb).

-behaviour(gen_server).

-export([start_link/3, stop/1, init_module/0]).
-export([getvalue/2, addvalue/3, commit/1, commit/2, keyexists/2]).

%% gen_server callbacks.
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
         code_change/3]).

-record(state, {cachename, name, port, requests, requestcounter}).

getvalue_port_command(Port, Key) ->
    Port ! {self(), {command, <<0:8, Key/binary>>}}.

addvalue_port_command(Port, Key, Value) ->
    Port ! {self(), {command, <<1:8, Key:32/binary, Value/binary>>}}.

commit_port_command(Port) ->
    Port ! {self(), {command, <<2:8>>}}.

keyexists_port_command(Port, Key) ->
    Port ! {self(), {command, <<3:8, Key/binary>>}}.

getvalue(Name, Key) ->
    gen_server:call(Name, {getvalue, Key}, 600000).

keyexists(Name, Key) ->
    gen_server:call(Name, {keyexists, Key}, 600000).

addvalue(Name, Key, Value) ->
    gen_server:call(Name, {addvalue, Key, Value}).

commit(Name) ->
    gen_server:call(Name, {commit}).
commit(Name, Timeout) ->
    gen_server:call(Name, {commit}, Timeout).

init([Name, Filename, WriteFlag]) ->
    Cachename = list_to_atom(atom_to_list(Name) ++ "_cache"),
    ets:new(Cachename, [set, public, named_table]),
    process_flag(trap_exit, true),
    WriteFlagArg = case WriteFlag of
                       write ->
                           [];
                       _ ->
                           ["nolock"]
                   end,
    Port = open_port({spawn_executable, code:priv_dir(plop) ++ "/permdbport"},
                     [{packet, 4}, {args, [Filename | WriteFlagArg]}, binary]),
    {ok, #state{cachename = Cachename,
                name = Name,
                port = Port,
                requestcounter = 0,
                requests = queue:new()}}.

init_module() ->
    ok.

start_link(Name, Filename, Options) ->
    WriteFlag = proplists:get_value(write_flag, Options, write),
    gen_server:start_link({local, Name}, ?MODULE,
                          [Name, Filename, WriteFlag], []).

stop(Name) ->
    gen_server:call(Name, stop).

handle_cast(_Request, State) ->
    {noreply, State}.

handle_info({Port, {data, Data}}, State) when is_port(Port) ->
    lager:debug("response: ~p", [Data]),
    {{value, {From, Action}}, Requests} = queue:out(State#state.requests),
    lager:debug("response ~p ~p: ~p",  [State#state.name, State#state.requestcounter - queue:len(State#state.requests), Action]),
    gen_server:reply(From, case Action of
                               getvalue ->
                                   case Data of
                                       <<>> ->
                                           noentry;
                                       _ ->
                                           Data
                                   end;
                               addvalue ->
                                   case Data of
                                       <<>> ->
                                           util:exit_with_error(addvalue, unknown, "Error in addvalue");
                                       _ ->
                                           ok
                                   end;
                               commit ->
                                   Data;
                               keyexists ->
                                   case Data of
                                       <<0>> ->
                                           false;
                                       <<1>> ->
                                           true
                                   end
                           end),
    {noreply, State#state{requests = Requests}};
handle_info(_Info, State) ->
    {noreply, State}.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(_Reason, State) ->
    io:format("~p terminating~n", [?MODULE]),
    State#state.port ! {self(), {command, <<>>}},
    ok.

add_request(State, From, Action) ->
    State#state{
      requests = queue:in({From, Action}, State#state.requests),
      requestcounter = State#state.requestcounter + 1
     }.

handle_call(stop, _From, State) ->
    {stop, normal, stopped, State};

handle_call({getvalue, Key}, From, State) ->
    lager:debug("getvalue ~p ~p: ~p", [State#state.name, State#state.requestcounter, Key]),
    getvalue_port_command(State#state.port, Key),
    {noreply, add_request(State, From, getvalue)};

handle_call({addvalue, Key, Value}, From, State) ->
    lager:debug("addvalue ~p ~p: ~p ~p", [State#state.name, State#state.requestcounter, Key, Value]),
    addvalue_port_command(State#state.port, Key, Value),
    {noreply, add_request(State, From, addvalue)};

handle_call({commit}, From, State) ->
    lager:debug("commit ~p ~p", [State#state.name, State#state.requestcounter]),
    commit_port_command(State#state.port),
    {noreply, add_request(State, From, commit)};

handle_call({keyexists, Key}, From, State) ->
    lager:debug("keyexists ~p ~p: ~p", [State#state.name, State#state.requestcounter, Key]),
    keyexists_port_command(State#state.port, Key),
    {noreply, add_request(State, From, keyexists)}.