%%% Copyright (c) 2019, Sunet. %%% See LICENSE for licensing information. %% Create an AF_UNIX socket and accept connections. On connect, spawn %% another p11p_server process. -module(p11p_server). -behaviour(gen_server). -include("p11p_rpc.hrl"). %% API. -export([start_link/1]). -export([reply/2]). %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% Records and types. -record(state, { tokname :: string(), client :: pid() | undefined, socket :: gen_tcp:socket(), msg :: p11rpc_msg() | undefined, recv_count = 0 :: non_neg_integer(), send_count = 0 :: non_neg_integer() %%clientbuf = <<>> :: binary() }). %% API. -spec start_link(gen_tcp:socket()) -> {ok, pid()} | {error, term()}. start_link(Args) -> gen_server:start_link(?MODULE, Args, []). -spec reply(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. reply(Pid, Response) -> gen_server:call(Pid, {respond, Response}). %% Genserver callbacks. init([Token, Socket]) -> lager:debug("~p: p11p_server:init", [self()]), process_flag(trap_exit, true), % Need terminate/2. gen_server:cast(self(), accept), % Invoke accept, returning a socket in state. {ok, #state{tokname = Token, socket = Socket}}. handle_call({respond, R}, _, #state{socket = Sock, send_count = Sent} = S) -> D = p11p_rpc:serialise(R), Buf = case Sent of 0 -> <>; _ -> D end, %%lager:debug("~p: sending ~B octets as response", [self(), size(Buf)]), ok = gen_tcp:send(Sock, Buf), % TODO: what about short writes? {reply, {ok, size(Buf)}, S#state{send_count = Sent + 1}}; handle_call(Call, _, S) -> lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), {reply, unhandled, S}. %% Wait for new connection. handle_cast(accept, State = #state{tokname = TokName, socket = ListenSocket}) -> %% Blocking until client connects or timeout fires. %% Without a timeout our supervisor cannot terminate us. %% On timeout, just invoke ourselves again. case gen_tcp:accept(ListenSocket, 900) of {ok, Sock} -> lager:debug("~p: ~p: new connection accepted", [self(), Sock]), %% Start a new acceptor and return with new socket in state. p11p_server_sup:start_server([TokName, ListenSocket]), {noreply, State#state{socket = Sock}}; {error, timeout} -> gen_server:cast(self(), accept), {noreply, State}; {error, closed} -> lager:debug("~p: listening socket closed", [self()]), {stop, normal, State} end; handle_cast(Cast, State) -> lager:debug("~p: Unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. %% First packet from P11 client. handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) when S#state.client == undefined -> %%lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(Data), Port]), <> = DataIn, case RPCVersion of ?RPC_VERSION -> {noreply, p11_client_data( S#state{client = p11p_manager:client_for_token(TokName)}, p11p_rpc:new(), Data)}; BadVersion -> lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, BadVersion]), {stop, bad_proto, S} end; %% Subsequent packages from P11 client. handle_info({tcp, _Port, DataIn}, #state{msg = Msg} = S) -> %%lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(Data), Port, size(Msg#p11rpc_msg.buffer)]), {noreply, p11_client_data(S, Msg, DataIn)}; handle_info({tcp_closed, Port}, S) -> lager:debug("~p: socket ~p closed", [self(), Port]), {stop, normal, S}; handle_info(Info, S) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), {noreply, S}. terminate(Reason, #state{socket = Sock, tokname = TokName, client = Client}) -> gen_tcp:close(Sock), p11p_manager:client_event(client_gone, [TokName, Client]), lager:debug("~p: terminated with reason ~p", [self(), Reason]), ignored. code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private functions. p11_client_data(#state{client = Client, recv_count = Recv} = S, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, DataIn) of {needmore, Msg} -> S#state{msg = Msg}; {done, Msg} -> {ok, _BytesSent} = p11p_client:request(Client, Msg), S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), recv_count = Recv + 1} end.