diff options
Diffstat (limited to 'p11p-daemon/src/p11p_client.erl')
-rw-r--r-- | p11p-daemon/src/p11p_client.erl | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl new file mode 100644 index 0000000..1222505 --- /dev/null +++ b/p11p-daemon/src/p11p_client.erl @@ -0,0 +1,163 @@ +%%% Copyright (c) 2019, Sunet. +%%% See LICENSE for licensing information. + +%% A client spawns an Erlang port running a proxy app, i.e. the +%% 'remote' program from p11-kit. + +%% Receive p11 requests from p11p_server, forward them to the proxy app, +%% wait for a reply. If a reply is received within a timeout period, +%% forward the reply to the requesting p11p_server. If the request +%% times out, inform the manager (our parent). + +-module(p11p_client). +-behaviour(gen_server). + +%% API. +-export([start_link/4]). +-export([request/2, stop/2]). + +-include("p11p_rpc.hrl"). + +%% Genserver callbacks. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +%% Records and types. +-record(state, { + port :: port(), + replyto :: pid() | undefined, + timer :: reference() | undefined, + token :: string(), % Token name. + msg :: p11rpc:msg() | undefined, + recv_count = 0 :: non_neg_integer(), + send_count = 0 :: non_neg_integer() + }). + +%% API. +-spec start_link(atom(), string(), string(), list()) -> + {ok, pid()} | {error, term()}. +start_link(ServName, TokName, ModPath, ModEnv) -> + lager:info("~p: p11p_client starting for ~s", [ServName, ModPath]), + gen_server:start_link({local, ServName}, ?MODULE, + [TokName, ModPath, ModEnv], []). + +-spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. +request(Client, Request) -> + gen_server:call(Client, {request, Request}). + +%% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether +%% we (Pid) are alive or not. An example of when that can happen is when the +%% manager receives a server_event about a lost p11 app. If the server +%% process terminated on request from us because we timed out on +%% an rpc call, chances are that we have already terminated by +%% the time the manager is to act on the lost app. +stop(Pid, Reason) -> + gen_server:cast(Pid, {stop, Reason}). + +%% Genserver callbacks. +init([TokName, ModPath, ModEnv]) -> + ProxyAppBinPath = p11p_config:proxyapp_bin_path(), + Port = open_port({spawn_executable, ProxyAppBinPath}, + [stream, + exit_status, + {env, ModEnv}, + {args, [ModPath, "-v"]} % FIXME: Remove -v + ]), + lager:debug("~p: ~s: new proxy app port: ~p", [self(), ProxyAppBinPath, Port]), + lager:debug("~p: ~s: module: ~s, env: ~p", [self(), ProxyAppBinPath, ModPath, ModEnv]), + {ok, #state{port = Port, token = TokName}}. + +handle_call({request, Request}, {FromPid, _Tag}, + #state{port = Port, send_count = Sent} = S) -> + %%lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]), + D = p11p_rpc:serialise(Request), + Buf = case Sent of + 0 -> <<?RPC_VERSION:8, D/binary>>; + _ -> D + end, + ok = do_send(Port, Buf), + {reply, {ok, sizeBuf}, S#state{replyto = FromPid, timer = start_timer(Port), + send_count = Sent + 1}}; + +handle_call(Call, _From, State) -> + lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), + {reply, unhandled, State}. + +handle_cast({stop, Reason}, State) -> + {stop, Reason, State}; + +handle_cast(Cast, State) -> + lager:debug("~p: unhandled cast: ~p~n", [self(), Cast]), + {noreply, State}. + +%% Receiving the very first response from proxy app since it was started. +handle_info({Port, {data, Data}}, State) + when Port == State#state.port, State#state.msg == undefined -> + case hd(Data) of % First octet is RPC protocol version. + ?RPC_VERSION -> + {noreply, handle_proxy_app_data(State, p11p_rpc:new(), tl(Data))}; + BadVersion -> + lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, + BadVersion]), + {noreply, State} + end; + +%% Receiving more data from proxy app. +handle_info({Port, {data, Data}}, #state{msg = Msg} = State) + when Port == State#state.port -> + {noreply, handle_proxy_app_data(State, Msg, Data)}; + +%% Proxy app timed out. +handle_info({timeout, Timer, Port}, #state{token = Tok, replyto = Server} = S) + when Port == S#state.port, Timer == S#state.timer -> + lager:info("~p: rpc request timed out, exiting", [self()]), + p11p_manager:server_event(timeout, [Tok, Server]), + State = S#state{timer = undefined}, + {stop, normal, State}; + +handle_info(Info, State) -> + lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), + {noreply, State}. + +terminate(Reason, #state{port = Port}) -> + lager:debug("~p: client terminating with reason ~p", [self(), Reason]), + port_close(Port), + ok. + +code_change(_OldVersion, State, _Extra) -> + {ok, State}. + +%% Private +do_send(Port, Buf) -> + %%lager:debug("~p: sending ~B octets to proxy app", [self(), size(Buf)]), + + %% case rand:uniform(15) of + %% 1 -> + %% lager:debug("~p: faking unresponsive proxy app (~p) by not sending it any.", [self(), Port]); + %% _ -> + %% port_command(Port, Buf) + %% end, + + true = port_command(Port, Buf), + ok. + +handle_proxy_app_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, + MsgIn, DataIn) -> + case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of + {needmore, Msg} -> + S#state{msg = Msg}; + {done, Msg} -> + cancel_timer(Timer), + {ok, _BytesSent} = p11p_server:reply(Pid, Msg), + %% Saving potential data not consumed by parse/2 in new message. + S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), + recv_count = Recv + 1} + end. + +start_timer(Port) -> + %%lager:debug("~p: starting timer", [self()]), + erlang:start_timer(3000, self(), Port). + +cancel_timer(Timer) -> + %%lager:debug("~p: canceling timer", [self()]), + erlang:cancel_timer(Timer, [{async, true}, {info, false}]). |