summaryrefslogtreecommitdiff
path: root/p11p-daemon/src/p11p_client.erl
diff options
context:
space:
mode:
Diffstat (limited to 'p11p-daemon/src/p11p_client.erl')
-rw-r--r--p11p-daemon/src/p11p_client.erl163
1 files changed, 163 insertions, 0 deletions
diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl
new file mode 100644
index 0000000..1222505
--- /dev/null
+++ b/p11p-daemon/src/p11p_client.erl
@@ -0,0 +1,163 @@
+%%% Copyright (c) 2019, Sunet.
+%%% See LICENSE for licensing information.
+
+%% A client spawns an Erlang port running a proxy app, i.e. the
+%% 'remote' program from p11-kit.
+
+%% Receive p11 requests from p11p_server, forward them to the proxy app,
+%% wait for a reply. If a reply is received within a timeout period,
+%% forward the reply to the requesting p11p_server. If the request
+%% times out, inform the manager (our parent).
+
+-module(p11p_client).
+-behaviour(gen_server).
+
+%% API.
+-export([start_link/4]).
+-export([request/2, stop/2]).
+
+-include("p11p_rpc.hrl").
+
+%% Genserver callbacks.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+%% Records and types.
+-record(state, {
+ port :: port(),
+ replyto :: pid() | undefined,
+ timer :: reference() | undefined,
+ token :: string(), % Token name.
+ msg :: p11rpc:msg() | undefined,
+ recv_count = 0 :: non_neg_integer(),
+ send_count = 0 :: non_neg_integer()
+ }).
+
+%% API.
+-spec start_link(atom(), string(), string(), list()) ->
+ {ok, pid()} | {error, term()}.
+start_link(ServName, TokName, ModPath, ModEnv) ->
+ lager:info("~p: p11p_client starting for ~s", [ServName, ModPath]),
+ gen_server:start_link({local, ServName}, ?MODULE,
+ [TokName, ModPath, ModEnv], []).
+
+-spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}.
+request(Client, Request) ->
+ gen_server:call(Client, {request, Request}).
+
+%% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether
+%% we (Pid) are alive or not. An example of when that can happen is when the
+%% manager receives a server_event about a lost p11 app. If the server
+%% process terminated on request from us because we timed out on
+%% an rpc call, chances are that we have already terminated by
+%% the time the manager is to act on the lost app.
+stop(Pid, Reason) ->
+ gen_server:cast(Pid, {stop, Reason}).
+
+%% Genserver callbacks.
+init([TokName, ModPath, ModEnv]) ->
+ ProxyAppBinPath = p11p_config:proxyapp_bin_path(),
+ Port = open_port({spawn_executable, ProxyAppBinPath},
+ [stream,
+ exit_status,
+ {env, ModEnv},
+ {args, [ModPath, "-v"]} % FIXME: Remove -v
+ ]),
+ lager:debug("~p: ~s: new proxy app port: ~p", [self(), ProxyAppBinPath, Port]),
+ lager:debug("~p: ~s: module: ~s, env: ~p", [self(), ProxyAppBinPath, ModPath, ModEnv]),
+ {ok, #state{port = Port, token = TokName}}.
+
+handle_call({request, Request}, {FromPid, _Tag},
+ #state{port = Port, send_count = Sent} = S) ->
+ %%lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]),
+ D = p11p_rpc:serialise(Request),
+ Buf = case Sent of
+ 0 -> <<?RPC_VERSION:8, D/binary>>;
+ _ -> D
+ end,
+ ok = do_send(Port, Buf),
+ {reply, {ok, sizeBuf}, S#state{replyto = FromPid, timer = start_timer(Port),
+ send_count = Sent + 1}};
+
+handle_call(Call, _From, State) ->
+ lager:debug("~p: Unhandled call: ~p~n", [self(), Call]),
+ {reply, unhandled, State}.
+
+handle_cast({stop, Reason}, State) ->
+ {stop, Reason, State};
+
+handle_cast(Cast, State) ->
+ lager:debug("~p: unhandled cast: ~p~n", [self(), Cast]),
+ {noreply, State}.
+
+%% Receiving the very first response from proxy app since it was started.
+handle_info({Port, {data, Data}}, State)
+ when Port == State#state.port, State#state.msg == undefined ->
+ case hd(Data) of % First octet is RPC protocol version.
+ ?RPC_VERSION ->
+ {noreply, handle_proxy_app_data(State, p11p_rpc:new(), tl(Data))};
+ BadVersion ->
+ lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port,
+ BadVersion]),
+ {noreply, State}
+ end;
+
+%% Receiving more data from proxy app.
+handle_info({Port, {data, Data}}, #state{msg = Msg} = State)
+ when Port == State#state.port ->
+ {noreply, handle_proxy_app_data(State, Msg, Data)};
+
+%% Proxy app timed out.
+handle_info({timeout, Timer, Port}, #state{token = Tok, replyto = Server} = S)
+ when Port == S#state.port, Timer == S#state.timer ->
+ lager:info("~p: rpc request timed out, exiting", [self()]),
+ p11p_manager:server_event(timeout, [Tok, Server]),
+ State = S#state{timer = undefined},
+ {stop, normal, State};
+
+handle_info(Info, State) ->
+ lager:debug("~p: Unhandled info: ~p~n", [self(), Info]),
+ {noreply, State}.
+
+terminate(Reason, #state{port = Port}) ->
+ lager:debug("~p: client terminating with reason ~p", [self(), Reason]),
+ port_close(Port),
+ ok.
+
+code_change(_OldVersion, State, _Extra) ->
+ {ok, State}.
+
+%% Private
+do_send(Port, Buf) ->
+ %%lager:debug("~p: sending ~B octets to proxy app", [self(), size(Buf)]),
+
+ %% case rand:uniform(15) of
+ %% 1 ->
+ %% lager:debug("~p: faking unresponsive proxy app (~p) by not sending it any.", [self(), Port]);
+ %% _ ->
+ %% port_command(Port, Buf)
+ %% end,
+
+ true = port_command(Port, Buf),
+ ok.
+
+handle_proxy_app_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S,
+ MsgIn, DataIn) ->
+ case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of
+ {needmore, Msg} ->
+ S#state{msg = Msg};
+ {done, Msg} ->
+ cancel_timer(Timer),
+ {ok, _BytesSent} = p11p_server:reply(Pid, Msg),
+ %% Saving potential data not consumed by parse/2 in new message.
+ S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer),
+ recv_count = Recv + 1}
+ end.
+
+start_timer(Port) ->
+ %%lager:debug("~p: starting timer", [self()]),
+ erlang:start_timer(3000, self(), Port).
+
+cancel_timer(Timer) ->
+ %%lager:debug("~p: canceling timer", [self()]),
+ erlang:cancel_timer(Timer, [{async, true}, {info, false}]).