From ad84cb6f4d5d6e7154afd9eb05de9bdeb67ca753 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Thu, 30 Jan 2020 14:00:50 +0100 Subject: WIP parts from transparent-failover + half baked toml config The transparent failover experiment, see branch transparent-failover, resulted in a bunch of changes that we want regardless of failover implementation. This commit incorporates these. This commit also has a half baked implementation of TOML file based configuration, to not expose the operator for Erlang syntax when configuring the daemon. TODO: sort this out! --- p11p-daemon/src/p11p_client.erl | 76 +++++++-------- p11p-daemon/src/p11p_config.erl | 188 +++++++++++++++++++++----------------- p11p-daemon/src/p11p_manager.erl | 193 ++++++++++++++++++++++++--------------- p11p-daemon/src/p11p_rpc.erl | 68 +++++++++++++- p11p-daemon/src/p11p_rpc.hrl | 160 ++++++++++++++++++++++++++++++++ p11p-daemon/src/p11p_server.erl | 81 +++++++++++----- 6 files changed, 547 insertions(+), 219 deletions(-) (limited to 'p11p-daemon/src') diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl index 1222505..7dc3457 100644 --- a/p11p-daemon/src/p11p_client.erl +++ b/p11p-daemon/src/p11p_client.erl @@ -6,14 +6,14 @@ %% Receive p11 requests from p11p_server, forward them to the proxy app, %% wait for a reply. If a reply is received within a timeout period, -%% forward the reply to the requesting p11p_server. If the request +%% proxy the reply to the requesting p11p_server. If the request %% times out, inform the manager (our parent). -module(p11p_client). -behaviour(gen_server). %% API. --export([start_link/4]). +-export([start_link/6]). -export([request/2, stop/2]). -include("p11p_rpc.hrl"). @@ -24,38 +24,40 @@ %% Records and types. -record(state, { + token :: string(), % Token name. + timeout :: non_neg_integer(), + port :: port(), replyto :: pid() | undefined, timer :: reference() | undefined, - token :: string(), % Token name. msg :: p11rpc:msg() | undefined, recv_count = 0 :: non_neg_integer(), send_count = 0 :: non_neg_integer() }). %% API. --spec start_link(atom(), string(), string(), list()) -> +-spec start_link(atom(), string(), pid(), string(), list(), non_neg_integer()) -> {ok, pid()} | {error, term()}. -start_link(ServName, TokName, ModPath, ModEnv) -> - lager:info("~p: p11p_client starting for ~s", [ServName, ModPath]), +start_link(ServName, TokName, Server, ModPath, ModEnv, Timeout) -> + lager:info("~p: starting p11p_client for ~s", [self(), TokName]), gen_server:start_link({local, ServName}, ?MODULE, - [TokName, ModPath, ModEnv], []). + [TokName, Server, ModPath, ModEnv, Timeout], []). -spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. request(Client, Request) -> gen_server:call(Client, {request, Request}). %% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether -%% we (Pid) are alive or not. An example of when that can happen is when the -%% manager receives a server_event about a lost p11 app. If the server -%% process terminated on request from us because we timed out on -%% an rpc call, chances are that we have already terminated by -%% the time the manager is to act on the lost app. +%% we (Pid) are alive or not. An example of when that can happen is +%% when the manager receives a server_event about a lost p11 app. If +%% the server process terminated on request from us because we timed +%% out on an rpc call, chances are that we have already terminated by +%% the time the manager acts on the information about the lost app. stop(Pid, Reason) -> gen_server:cast(Pid, {stop, Reason}). %% Genserver callbacks. -init([TokName, ModPath, ModEnv]) -> +init([TokName, Server, ModPath, ModEnv, Timeout]) -> ProxyAppBinPath = p11p_config:proxyapp_bin_path(), Port = open_port({spawn_executable, ProxyAppBinPath}, [stream, @@ -63,9 +65,10 @@ init([TokName, ModPath, ModEnv]) -> {env, ModEnv}, {args, [ModPath, "-v"]} % FIXME: Remove -v ]), + true = is_port(Port), lager:debug("~p: ~s: new proxy app port: ~p", [self(), ProxyAppBinPath, Port]), lager:debug("~p: ~s: module: ~s, env: ~p", [self(), ProxyAppBinPath, ModPath, ModEnv]), - {ok, #state{port = Port, token = TokName}}. + {ok, #state{port = Port, token = TokName, replyto = Server, timeout = Timeout}}. handle_call({request, Request}, {FromPid, _Tag}, #state{port = Port, send_count = Sent} = S) -> @@ -75,8 +78,9 @@ handle_call({request, Request}, {FromPid, _Tag}, 0 -> <>; _ -> D end, - ok = do_send(Port, Buf), - {reply, {ok, sizeBuf}, S#state{replyto = FromPid, timer = start_timer(Port), + {ok, _} = do_send(Port, Buf), + {reply, {ok, sizeBuf}, S#state{replyto = FromPid, + timer = start_timer(S#state.timeout, Port), send_count = Sent + 1}}; handle_call(Call, _From, State) -> @@ -95,7 +99,7 @@ handle_info({Port, {data, Data}}, State) when Port == State#state.port, State#state.msg == undefined -> case hd(Data) of % First octet is RPC protocol version. ?RPC_VERSION -> - {noreply, handle_proxy_app_data(State, p11p_rpc:new(), tl(Data))}; + {noreply, handle_token_data(State, p11p_rpc:new(), tl(Data))}; BadVersion -> lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, BadVersion]), @@ -105,13 +109,13 @@ handle_info({Port, {data, Data}}, State) %% Receiving more data from proxy app. handle_info({Port, {data, Data}}, #state{msg = Msg} = State) when Port == State#state.port -> - {noreply, handle_proxy_app_data(State, Msg, Data)}; + {noreply, handle_token_data(State, Msg, Data)}; %% Proxy app timed out. -handle_info({timeout, Timer, Port}, #state{token = Tok, replyto = Server} = S) +handle_info({timeout, Timer, Port}, S = #state{token = Tok}) when Port == S#state.port, Timer == S#state.timer -> - lager:info("~p: rpc request timed out, exiting", [self()]), - p11p_manager:server_event(timeout, [Tok, Server]), + lager:info("~p: rpc request for ~s timed out, exiting", [self(), Tok]), + p11p_manager:client_event(timeout, Tok), State = S#state{timer = undefined}, {stop, normal, State}; @@ -129,34 +133,34 @@ code_change(_OldVersion, State, _Extra) -> %% Private do_send(Port, Buf) -> - %%lager:debug("~p: sending ~B octets to proxy app", [self(), size(Buf)]), - - %% case rand:uniform(15) of - %% 1 -> - %% lager:debug("~p: faking unresponsive proxy app (~p) by not sending it any.", [self(), Port]); - %% _ -> - %% port_command(Port, Buf) - %% end, - - true = port_command(Port, Buf), - ok. - -handle_proxy_app_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, + Rand = rand:uniform(100), %% + 10, + if + Rand =< 10 -> + lager:debug("~p: faking unresponsive token (~p) by not sending", + [self(), Port]); + true -> + lager:debug("~p: sending ~B octets to token", [self(), size(Buf)]), + true = port_command(Port, Buf) + end, + {ok, size(Buf)}. + +handle_token_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of {needmore, Msg} -> S#state{msg = Msg}; {done, Msg} -> cancel_timer(Timer), + lager:debug("~p: <- ~s", [self(), p11p_rpc:dump(Msg)]), {ok, _BytesSent} = p11p_server:reply(Pid, Msg), %% Saving potential data not consumed by parse/2 in new message. S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), recv_count = Recv + 1} end. -start_timer(Port) -> +start_timer(Timeout, Port) -> %%lager:debug("~p: starting timer", [self()]), - erlang:start_timer(3000, self(), Port). + erlang:start_timer(Timeout, self(), Port). cancel_timer(Timer) -> %%lager:debug("~p: canceling timer", [self()]), diff --git a/p11p-daemon/src/p11p_config.erl b/p11p-daemon/src/p11p_config.erl index 330c490..d24aad6 100644 --- a/p11p-daemon/src/p11p_config.erl +++ b/p11p-daemon/src/p11p_config.erl @@ -4,50 +4,85 @@ -module(p11p_config). -behaviour(gen_server). -%% API +%%% API %%% +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -export([start_link/0]). -%%-export([config/0]). -export([nameof/1]). -export([tokens/0]). -export([proxyapp_bin_path/0, modules_for_token/1, module_path/1, module_env/1, - token_mode/1]). --export_type([token_mode_t/0]). - -%% Genserver callbacks. --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + token_balance/1, token_retries/1, token_timeout/1]). -%% Records and types. +%%% Records and types %%% -record(p11module, { name :: string(), path :: string(), - env :: [{string(), string()}] %FIXME: maches [] too? + env :: [{string(), string()}] }). -type p11module() :: #p11module{}. --type token_mode_t() :: {failover, [timeout]} | {balance, [non_neg_integer()]}. - -record(token, { name :: string(), - mode :: token_mode_t(), + timeout :: non_neg_integer(), + failover :: non_neg_integer(), % How many failover attempts. + balance :: [non_neg_integer()], modules = #{} :: #{string() => p11module()} }). -type token() :: #token{}. -%% Genserver state. -record(state, { proxyapp_bin_path :: string(), tokens :: #{string() => token()} }). -%%%%%%%%%%%%%%%%%%%% -%% API. +%%% Genserver callbacks %%% +init(_Args) -> + case application:get_env(p11p, config_file) of + {ok, ConfigFile} -> + {ok, init_state(ConfigFile)}; + _ -> + {ok, init_state()} + end. + +handle_call(proxyapp_bin_path, _From, S = #state{proxyapp_bin_path = Path}) -> + {reply, Path, S}; +handle_call(tokens, _From, State = #state{tokens = Tokens}) -> + {reply, maps:values(Tokens), State}; +handle_call({modules_for_token, TokName}, _, S = #state{tokens = Tokens}) -> + #{TokName := Token} = Tokens, + {reply, maps:values(Token#token.modules), S}; +handle_call({token_balance, TokName}, _, State = #state{tokens = Tokens}) -> + #{TokName := Token} = Tokens, + {reply, Token#token.balance, State}; +handle_call({token_retries, TokName}, _, State = #state{tokens = Tokens}) -> + #{TokName := Token} = Tokens, + {reply, Token#token.failover, State}; +handle_call({token_timeout, TokName}, _, State = #state{tokens = Tokens}) -> + #{TokName := Token} = Tokens, + {reply, Token#token.timeout, State}; +handle_call(Request, _From, State) -> + lager:warning("Unhandled call: ~p", [Request]), + {reply, unhandled, State}. + +handle_cast(Message, State) -> + lager:warning("Unhandled cast: ~p", [Message]), + {noreply, State}. + +handle_info(Info, State) -> + lager:warning("Unhandled info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVersion, State, _Extra) -> + {ok, State}. + + +%%% External functions %%% start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -%% config() -> -%% gen_server:call(?MODULE, config). - proxyapp_bin_path() -> gen_server:call(?MODULE, proxyapp_bin_path). @@ -55,9 +90,17 @@ proxyapp_bin_path() -> tokens() -> gen_server:call(?MODULE, tokens). --spec token_mode(string()) -> token_mode_t(). -token_mode(TokName) -> - gen_server:call(?MODULE, {token_mode, TokName}). +-spec token_balance(string()) -> [integer()]. +token_balance(TokName) -> + gen_server:call(?MODULE, {token_balance, TokName}). + +-spec token_retries(string()) -> non_neg_integer(). +token_retries(TokName) -> + gen_server:call(?MODULE, {token_retries, TokName}). + +-spec token_timeout(string()) -> non_neg_integer(). +token_timeout(TokName) -> + gen_server:call(?MODULE, {token_timeout, TokName}). -spec modules_for_token(string()) -> [p11module()]. modules_for_token(TokName) -> @@ -78,52 +121,28 @@ nameof(#p11module{name = Name}) -> nameof(List) -> [nameof(E) || E <- List]. -%%%%%%%%%%%%%%%%%%%% -%% Genserver callbacks. -init(_Args) -> - State = init_state(), - {ok, State}. - -%% handle_call(config, _From, State) -> -%% {reply, State, State}; -handle_call(proxyapp_bin_path, _From, #state{proxyapp_bin_path = Path} = State) -> - {reply, Path, State}; -handle_call(tokens, _From, #state{tokens = Tokens} = State) -> - {reply, maps:values(Tokens), State}; -handle_call({modules_for_token, TokName}, _, #state{tokens = Tokens} = S) -> - #{TokName := Token} = Tokens, - {reply, maps:values(Token#token.modules), S}; -handle_call({token_mode, TokName}, _, #state{tokens = Tokens} = State) -> - #{TokName := Token} = Tokens, - {reply, Token#token.mode, State}; -handle_call(Request, _From, State) -> - lager:warning("Unhandled call: ~p", [Request]), - {reply, unhandled, State}. - -handle_cast(Message, State) -> - lager:warning("Unhandled cast: ~p", [Message]), - {noreply, State}. - -handle_info(Info, State) -> - lager:warning("Unhandled info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVersion, State, _Extra) -> - {ok, State}. - -%%%%%%%%%%%%%%%%%%%% -%% Private. +%%% Private functions %%% +-define(PROXYAPP_DEFAULT, "/usr/local/libexec/p11-kit/p11-kit-remote"). init_state() -> - #state { - proxyapp_bin_path = - application:get_env(p11p, proxyapp_bin_path, - "/usr/local/libexec/p11-kit/p11-kit-remote"), - tokens = conf_tokens(application:get_env(p11p, groups, [])) - }. + #state{ + proxyapp_bin_path = application:get_env(p11p, + proxyapp_bin_path, + ?PROXYAPP_DEFAULT), + tokens = conf_tokens(application:get_env(p11p, + vtokens, + []))}. +init_state(Filename) -> + {ok, Config} = p11p_config_file:load_config(Filename), + #state{ + proxyapp_bin_path = p11p_config_file:get(Config, + string, + "proxyapp_bin_path", + ?PROXYAPP_DEFAULT), + tokens = conf_tokens(p11p_config_file:get(Config, + section, + "vtokens", + []))}. conf_tokens(L) -> conf_tokens(L, #{}). @@ -135,14 +154,20 @@ conf_tokens([H = {Name, _}|T], Acc) -> -spec new_token({string(), [tuple()]}) -> token(). new_token({Name, Settings}) -> Modules = conf_modules(proplists:get_value(modules, Settings)), - Mode = mode(proplists:get_value(mode, Settings, {failover, [timeout]}), %FIXME: s/[timeout]/[10]/g or some other sane default? - maps:size(Modules)), #token{ name = Name, - mode = Mode, + timeout = proplists:get_value(timeout, Settings, 25000), + failover = proplists:get_value(failover, Settings, maps:size(Modules) - 1), + balance = balance(proplists:get_value(balance, Settings, []), + maps:size(Modules)), modules = Modules }. +balance([], _) -> + []; +balance(List, NModules) -> + List ++ [1 || _ <- lists:seq(1, NModules - length(List))]. + conf_modules(L) -> conf_modules(L, #{}). conf_modules([], Acc) -> @@ -159,30 +184,23 @@ new_module(Name, Path, Env) -> env = Env }. --spec mode(p11p_config:token_mode_t(), non_neg_integer()) -> - p11p_config:token_mode_t(). -mode({balance, Args}, NModules) -> - {balance, Args ++ [1 || _ <- lists:seq(1, NModules - length(Args))]}; -mode(Conf, _) -> - Conf. - -%%%%%%%%%%%%%% -%% Unit tests. +%%% Unit tests %%% -include_lib("eunit/include/eunit.hrl"). - tokens_init_test_() -> {setup, fun() -> conf_tokens( [ {"vtoken0", - [{mode, {balance, [3]}}, + [{balance, [3]}, {modules, [{"bogusmod0_0", "/path/to/bogusmod0_0"}, {"bogusmod0_1", "/path/to/bogusmod0_1"} ]}]}, {"vtoken1", - [{modules, [{"bogusmod1_0", "/path/to/bogusmod1_0"}, + [{timeout, 12000}, + {failover, 3}, + {modules, [{"bogusmod1_0", "/path/to/bogusmod1_0"}, {"bogusmod1_1", "/path/to/bogusmod1_1", [{"MYENV", "myenv"}]} ]}]} ]) end, @@ -191,14 +209,18 @@ tokens_init_test_() -> [?_assertEqual( #{"vtoken0" => {token,"vtoken0", - {balance,[3,1]}, + 25000, + 1, + [3,1], #{"bogusmod0_0" => {p11module,"bogusmod0_0", "/path/to/bogusmod0_0", []}, "bogusmod0_1" => {p11module,"bogusmod0_1", "/path/to/bogusmod0_1", []}}}, "vtoken1" => {token,"vtoken1", - {failover,[timeout]}, + 12000, + 3, + [], #{"bogusmod1_0" => {p11module,"bogusmod1_0", "/path/to/bogusmod1_0", []}, "bogusmod1_1" => diff --git a/p11p-daemon/src/p11p_manager.erl b/p11p-daemon/src/p11p_manager.erl index 7c3bdb9..2dbdf6c 100644 --- a/p11p-daemon/src/p11p_manager.erl +++ b/p11p-daemon/src/p11p_manager.erl @@ -25,8 +25,8 @@ %% API. -export([start_link/0]). --export([client_for_token/1, client_event/2]). % For servers. --export([server_event/2]). % For clients. +-export([client_for_token/1, server_event/2]). % For servers. +-export([client_event/2]). % For clients. %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -43,9 +43,16 @@ }). -record(vtoken, { - mode :: p11p_config:token_mode_t(), - balance_count :: integer(), - clients :: [#client{}] % Active client in hd(). + clients :: [#client{}], % Current client in hd(). + + %% Invokations left for current client or -1 for no + %% balancing. + balance_count = -1 :: integer(), + + timeout :: non_neg_integer(), + retries :: non_neg_integer(), + + server :: pid() | undefined % Active server, if any. }). -record(state, { @@ -59,23 +66,25 @@ start_link() -> -spec client_for_token(string()) -> pid(). client_for_token(TokName) -> - gen_server:call(?MODULE, {client_for_token, TokName}). -client_event(Event, Args) -> - gen_server:cast(?MODULE, {client_event, Event, Args}). + gen_server:call(?MODULE, {client_for_token, self(), TokName}). server_event(Event, Args) -> gen_server:cast(?MODULE, {server_event, Event, Args}). +client_event(Event, Args) -> + gen_server:cast(?MODULE, {client_event, Event, Args}). + %% Genserver callbacks. init([]) -> {ok, #state{vtokens = init_vtokens(p11p_config:tokens())}}. -handle_call({client_for_token, TokNameIn}, _, #state{vtokens = Tokens} = S) -> - #{TokNameIn := TokenIn} = Tokens, - ClientsIn = TokenIn#vtoken.clients, +handle_call({client_for_token, Server, TokNameIn}, _From, + S = #state{vtokens = VTokensIn}) -> + #{TokNameIn := VTokenIn} = VTokensIn, + ClientsIn = VTokenIn#vtoken.clients, lager:debug("all clients: ~p", [ClientsIn]), {Clients, BalanceCount} = - case TokenIn#vtoken.balance_count of + case VTokenIn#vtoken.balance_count of 0 -> lager:debug("~p: balancing: next client", [self()]), Rotated = rotate_clients(ClientsIn), @@ -87,52 +96,51 @@ handle_call({client_for_token, TokNameIn}, _, #state{vtokens = Tokens} = S) -> -1 -> {ClientsIn, -1} end, - #client{tokname = TokNameIn, - servid = ServId, - modpath = ModPath, - modenv = ModEnv, - pid = PidIn} = SelectedClient = hd(Clients), - case PidIn of + Current = hd(Clients), + case Current#client.pid of undefined -> - {ok, Pid} = - p11p_client:start_link(ServId, TokNameIn, ModPath, ModEnv), - Client = SelectedClient#client{pid = Pid}, - Token = TokenIn#vtoken{clients = [Client | tl(Clients)], - balance_count = BalanceCount}, - {reply, Pid, S#state{vtokens = Tokens#{TokNameIn := Token}}}; - _ -> - {reply, PidIn, S} + Client = start_client(hd(Clients), Server, VTokenIn#vtoken.timeout), + VToken = VTokenIn#vtoken{clients = [Client | tl(Clients)], + server = Server, + balance_count = BalanceCount}, + {reply, Client#client.pid, S#state{vtokens = VTokensIn#{TokNameIn := VToken}}}; + Pid -> + {reply, Pid, S} end; handle_call(Call, _From, State) -> lager:debug("Unhandled call: ~p~n", [Call]), {reply, unhandled, State}. -handle_cast({server_event, timeout, [TokNameIn, Server]}, - #state{vtokens = Tokens} = S) -> - lager:debug("~p: ~s: timed out, stopping ~p", [self(), TokNameIn, Server]), - gen_server:stop(Server), % Hang up on p11 client. - %% TODO: do some code dedup with client_for_token? +%% Server done with client. +handle_cast({server_event, server_gone, TokNameIn}, S = #state{vtokens = Tokens}) -> #{TokNameIn := TokenIn} = Tokens, - Clients = TokenIn#vtoken.clients, - SelectedClient = hd(Clients), - Client = SelectedClient#client{pid = undefined}, - Token = TokenIn#vtoken{clients = tl(Clients) ++ [Client]}, - lager:debug("~p: ~s: updated token: ~p", [self(), TokNameIn, Token]), - {noreply, S#state{vtokens = Tokens#{TokNameIn := Token}}}; - -handle_cast({client_event, client_gone, [TokName, Pid]}, - #state{vtokens = Tokens} = S) -> - lager:debug("~p: asking client ~p to stop", [self(), Pid]), - p11p_client:stop(Pid, normal), - #{TokName := TokenIn} = Tokens, - Clients = lists:map(fun(E) -> + CurClient = hd(TokenIn#vtoken.clients), + ClientPid = CurClient#client.pid, + ok = p11p_client:stop(ClientPid, normal), + Clients = lists:map(fun(E) -> % Find and update. case E#client.pid of - Pid -> E#client{pid = undefined}; + ClientPid -> E#client{pid = undefined}; _ -> E - end - end, TokenIn#vtoken.clients), - Token = TokenIn#vtoken{clients = Clients}, - {noreply, S#state{vtokens = Tokens#{TokName := Token}}}; + end end, + TokenIn#vtoken.clients), + Token = TokenIn#vtoken{clients = Clients, server = undefined}, + {noreply, S#state{vtokens = Tokens#{TokNameIn := Token}}}; + +%% Client reporting that a token has timed out -- mark current client +%% not running, inform server, rotate client list and start new +%% client. +handle_cast({client_event, timeout, TokName}, State) -> + #{TokName := VToken} = State#state.vtokens, + client_timeout(TokName, VToken, State); + +handle_cast({start_client, VTokName}, State = #state{vtokens = VTokens}) -> + #{VTokName := VToken} = VTokens, + Client = start_client(hd(VToken#vtoken.clients), + VToken#vtoken.server, + VToken#vtoken.timeout), + NewVToken = VToken#vtoken{clients = [Client | tl(VToken#vtoken.clients)]}, + lager:debug("~p: vtoken updated: ~p", [self(), NewVToken]), + {noreply, State#state{vtokens = VTokens#{VTokName := NewVToken}}}; handle_cast(Cast, State) -> lager:debug("Unhandled cast: ~p~n", [Cast]), @@ -164,40 +172,40 @@ init_vtokens([H|T], Acc)-> new_vtoken(Conf) -> Name = p11p_config:nameof(Conf), - Mode = p11p_config:token_mode(Name), + Balances = p11p_config:token_balance(Name), Clients = clients(Name, p11p_config:modules_for_token(Name), - Mode), - R0 = hd(Clients), + Balances), + CurrentClient = hd(Clients), #vtoken{ - mode = p11p_config:token_mode(Name), - balance_count = R0#client.balance, - clients = Clients + clients = Clients, + balance_count = CurrentClient#client.balance, + timeout = p11p_config:token_timeout(Name), + retries = p11p_config:token_retries(Name) }. -clients(TokName, ConfModules, ConfMode) -> - clients(TokName, ConfModules, ConfMode, []). +clients(TokName, Modules, []) -> + clients(TokName, Modules, [-1 || _ <- lists:seq(1, length(Modules))]); +clients(TokName, ConfModules, ConfBalance) -> + clients(TokName, ConfModules, ConfBalance, []). + clients(_, [], _, Acc) -> Acc; -clients(TokName, [H|T], ConfMode, Acc) -> - ModName = p11p_config:nameof(H), +clients(TokName, [Module|Modules], [Balance|Balances], Acc) -> + ModName = p11p_config:nameof(Module), ServName = "p11p_client:" ++ TokName ++ ":" ++ ModName, - ModPath = p11p_config:module_path(H), - ModEnv = p11p_config:module_env(H), - clients(TokName, T, ConfMode, [#client{ - tokname = TokName, - servid = list_to_atom(ServName), - modpath = ModPath, - modenv = ModEnv, - balance = balance(ConfMode, length(T) + 1) - } - | Acc]). - --spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer(). -balance({balance, Ratios}, N) -> - lists:nth(N, Ratios); -balance(_, _) -> - -1. + clients(TokName, Modules, Balances, + [#client{tokname = TokName, + servid = list_to_atom(ServName), + modpath = p11p_config:module_path(Module), + modenv = p11p_config:module_env(Module), + balance = Balance} | Acc]). + +%% -spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer(). +%% balance({balance, Ratios}, N) -> +%% lists:nth(N, Ratios); +%% balance(_, _) -> +%% -1. %% -spec balance_count(p11p_config:token_mode_t()) -> integer(). %% balance_count(#vtoken{mode = {balance, _}, balance_count = C}) -> @@ -207,3 +215,38 @@ balance(_, _) -> rotate_clients(L) -> lists:reverse([hd(L) | lists:reverse(tl(L))]). + +rotate_clients(L, UpdatedCurr) -> + lists:reverse([UpdatedCurr | lists:reverse(tl(L))]). + +next_client(VToken = #vtoken{clients = Clients}) -> + OldC = hd(Clients), + NewClients = rotate_clients(Clients, OldC#client{pid = undefined}), + gen_server:cast(self(), {start_client, OldC#client.tokname}), + VToken#vtoken{clients = NewClients}. + +client_timeout(TokName, + VToken = #vtoken{retries = Retries}, + State = #state{vtokens = VTokens}) + when Retries > 0 -> + lager:debug("~p: ~s: token timed out, switching token", [self(), TokName]), + p11p_server:token_gone(VToken#vtoken.server, false), + NewToken = next_client(VToken), + NewVTokens = VTokens#{TokName := NewToken#vtoken{retries = Retries - 1}}, + {noreply, State#state{vtokens = NewVTokens}}; + +client_timeout(TokName, + VToken, + State) -> + lager:debug("~p: ~s: token timed out, disconnecting app", [self(), TokName]), + p11p_server:token_gone(VToken#vtoken.server, true), + {stop, State}. + +start_client(Client, Server, Timeout) -> + {ok, Pid} = p11p_client:start_link(Client#client.servid, + Client#client.tokname, + Server, + Client#client.modpath, + Client#client.modenv, + Timeout), + Client#client{pid = Pid}. diff --git a/p11p-daemon/src/p11p_rpc.erl b/p11p-daemon/src/p11p_rpc.erl index a775d30..03a476c 100644 --- a/p11p-daemon/src/p11p_rpc.erl +++ b/p11p-daemon/src/p11p_rpc.erl @@ -5,10 +5,37 @@ -module(p11p_rpc). --export([parse/2, new/0, new/1, serialise/1]). +-export([ + dump/1, + error/2, + new/0, new/1, + parse/2, + serialise/1 + ]). -include("p11p_rpc.hrl"). +dump(Msg = #p11rpc_msg{data = Data}) -> + {ReqId, Data2} = parse_req_id(Data), + {ArgsDesc, Data3} = parse_args_desc(Data2), + {Name, _ReqArgs, _RespArgs} = lists:nth(ReqId + 1, ?REQIDS), + io_lib:format("RPC [~B]: ~s (~B), args \"~s\":~n~p", + [Msg#p11rpc_msg.call_code, + Name, + ReqId, + ArgsDesc, + Data3 + ]). + +error(CallCode, ErrorCode) -> + DataBuf = serialise_error(ErrorCode), + #p11rpc_msg{ + state = done, + call_code = CallCode, + opt_len = 0, + data_len = size(DataBuf), + data = DataBuf}. + parse(M) -> parse(M, <<>>). @@ -100,6 +127,44 @@ move_between_binaries(DstIn, SrcIn, NBytes) -> Src = binary:part(SrcIn, N, size(SrcIn) - N), {Dst, Src}. +serialise_byte_array(Bin) -> + Len = size(Bin), + <>. + +serialise_error(ErrCode) -> + ReqId = ?P11_RPC_CALL_ERROR, + ArgsDescString = "u", % TODO: look this up and generalise. + + ReqIdBin = serialise_uint32(ReqId), + ArgsDescBin = serialise_byte_array(list_to_binary(ArgsDescString)), + ArgBin = serialise_uint64(ErrCode), + + <>. + +serialise_uint32(U32) -> + <>. + +serialise_uint64(U64) -> + <>. + +-spec parse_req_id(binary()) -> {integer(), binary()}. +parse_req_id(Data) -> + {binary:decode_unsigned(binary:part(Data, 0, 4)), + binary:part(Data, 4, size(Data) - 4)}. + +parse_args_desc(Data) -> + parse_byte_array(Data). + +-spec parse_byte_array(binary()) -> {binary(), binary()}. +parse_byte_array(Data) -> + case binary:decode_unsigned(binary:part(Data, 0, 4)) of + 16#ffffffff -> + {<<>>, binary:part(Data, 4, size(Data) - 4)}; + Len -> % TODO: refuse Len >= 0x7fffffff. + {binary:part(Data, 4, Len), binary:part(Data, 4 + Len, + size(Data) - 4 - Len)} + end. + %%%%%%%%%%%%%% %% Unit tests. @@ -173,3 +238,4 @@ parse3_test_() -> {p11rpc_msg, 47, 2, 3, <<"o1">>, <<"d12">>, <<"rest">>, done}, Msg)] end}. + diff --git a/p11p-daemon/src/p11p_rpc.hrl b/p11p-daemon/src/p11p_rpc.hrl index c511e20..0014f57 100644 --- a/p11p-daemon/src/p11p_rpc.hrl +++ b/p11p-daemon/src/p11p_rpc.hrl @@ -16,3 +16,163 @@ state = header :: header | opts | data | done }). -type p11rpc_msg() :: #p11rpc_msg{}. + +%% From p11-kit/rpc-message.h. +-define(P11_RPC_CALL_ERROR, 0). +-define(P11_RPC_CALL_C_Initialize, 1). +-define(P11_RPC_CALL_C_Finalize, 2). +-define(P11_RPC_CALL_C_GetInfo, 3). +-define(P11_RPC_CALL_C_GetSlotList, 4). +-define(P11_RPC_CALL_C_GetSlotInfo, 5). +-define(P11_RPC_CALL_C_GetTokenInfo, 6). +-define(P11_RPC_CALL_C_GetMechanismList, 7). +-define(P11_RPC_CALL_C_GetMechanismInfo, 8). +-define(P11_RPC_CALL_C_InitToken, 9). +-define(P11_RPC_CALL_C_OpenSession, 10). +-define(P11_RPC_CALL_C_CloseSession, 11). +-define(P11_RPC_CALL_C_CloseAllSessions, 12). +-define(P11_RPC_CALL_C_GetSessionInfo, 13). +-define(P11_RPC_CALL_C_InitPIN, 14). +-define(P11_RPC_CALL_C_SetPIN, 15). +-define(P11_RPC_CALL_C_GetOperationState, 16). +-define(P11_RPC_CALL_C_SetOperationState, 17). +-define(P11_RPC_CALL_C_Login, 18). +-define(P11_RPC_CALL_C_Logout, 19). +-define(P11_RPC_CALL_C_CreateObject, 20). +-define(P11_RPC_CALL_C_CopyObject, 21). +-define(P11_RPC_CALL_C_DestroyObject, 22). +-define(P11_RPC_CALL_C_GetObjectSize, 23). +-define(P11_RPC_CALL_C_GetAttributeValue, 24). +-define(P11_RPC_CALL_C_SetAttributeValue, 25). +-define(P11_RPC_CALL_C_FindObjectsInit, 26). +-define(P11_RPC_CALL_C_FindObjects, 27). +-define(P11_RPC_CALL_C_FindObjectsFinal, 28). +-define(P11_RPC_CALL_C_EncryptInit, 29). +-define(P11_RPC_CALL_C_Encrypt, 30). +-define(P11_RPC_CALL_C_EncryptUpdate, 31). +-define(P11_RPC_CALL_C_EncryptFinal, 32). +-define(P11_RPC_CALL_C_DecryptInit, 33). +-define(P11_RPC_CALL_C_Decrypt, 34). +-define(P11_RPC_CALL_C_DecryptUpdate, 35). +-define(P11_RPC_CALL_C_DecryptFinal, 36). +-define(P11_RPC_CALL_C_DigestInit, 37). +-define(P11_RPC_CALL_C_Digest, 38). +-define(P11_RPC_CALL_C_DigestUpdate, 39). +-define(P11_RPC_CALL_C_DigestKey, 40). +-define(P11_RPC_CALL_C_DigestFinal, 41). +-define(P11_RPC_CALL_C_SignInit, 42). +-define(P11_RPC_CALL_C_Sign, 43). +-define(P11_RPC_CALL_C_SignUpdate, 44). +-define(P11_RPC_CALL_C_SignFinal, 45). +-define(P11_RPC_CALL_C_SignRecoverInit, 46). +-define(P11_RPC_CALL_C_SignRecover, 47). +-define(P11_RPC_CALL_C_VerifyInit, 48). +-define(P11_RPC_CALL_C_Verify, 49). +-define(P11_RPC_CALL_C_VerifyUpdate, 50). +-define(P11_RPC_CALL_C_VerifyFinal, 51). +-define(P11_RPC_CALL_C_VerifyRecoverInit, 52). +-define(P11_RPC_CALL_C_VerifyRecover, 53). +-define(P11_RPC_CALL_C_DigestEncryptUpdate, 54). +-define(P11_RPC_CALL_C_DecryptDigestUpdate, 55). +-define(P11_RPC_CALL_C_SignEncryptUpdate, 60). +-define(P11_RPC_CALL_C_DecryptVerifyUpdate, 61). +-define(P11_RPC_CALL_C_GenerateKey, 62). +-define(P11_RPC_CALL_C_GenerateKeyPair, 63). +-define(P11_RPC_CALL_C_WrapKey, 64). +-define(P11_RPC_CALL_C_UnwrapKey, 65). +-define(P11_RPC_CALL_C_DeriveKey, 66). +-define(P11_RPC_CALL_C_SeedRandom, 67). +-define(P11_RPC_CALL_C_GenerateRandom, 68). +-define(P11_RPC_CALL_C_WaitForSlotEvent, 69). +-define(P11_RPC_CALL_MAX, 70). + +%% Return values, some of them. From pcks11.h. +-define(CKR_OK, 0). +-define(CKR_GENERAL_ERROR, 5). +-define(CKR_FUNCTION_FAILED, 6). +-define(CKR_DEVICE_ERROR, 16#30). % 48 +-define(CKR_SESSION_CLOSED, 16#B0). % 176 + +%% Argument descriptions. From p11-kit/rpc-message.h p11_rpc_calls[]. +%% * a_ = prefix denotes array of _ +%% * A = CK_ATTRIBUTE +%% * f_ = prefix denotes buffer for _ +%% * M = CK_MECHANISM +%% * u = CK_ULONG +%% * s = space padded string +%% * v = CK_VERSION +%% * y = CK_BYTE +%% * z = null terminated string +%% Needed for generating our own messages, like ERROR. +%% They're being sent in the messages, after the request id. +%% TOOD: Complete argument descrptions, at least for messages +%% we generate. +-define(REQIDS, + [ % {name, request argdesc, response argdesc} + {"ERROR", "", "u"}, + {"C_Initialize", "ayyay", ""}, + {"C_Finalize", "", ""}, + {"C_GetInfo", "", "vsusv"}, + {"C_GetSlotList", "TODO", "TODO"}, + {"C_GetSlotInfo", "TODO", "TODO"}, + {"C_GetTokenInfo", "TODO", "TODO"}, + {"C_GetMechanismList", "TODO", "TODO"}, + {"C_GetMechanismInfo", "TODO", "TODO"}, + {"C_InitToken", "TODO", "TODO"}, + {"C_OpenSession", "uu", "u"}, + {"C_CloseSession", "u", ""}, + {"C_CloseAllSessions", "TODO", "TODO"}, + {"C_GetSessionInfo", "TODO", "TODO"}, + {"C_InitPIN", "TODO", "TODO"}, + {"C_SetPIN", "TODO", "TODO"}, + {"C_GetOperationState", "TODO", "TODO"}, + {"C_SetOperationState", "TODO", "TODO"}, + {"C_Login", "uuay", ""}, + {"C_Logout", "u", ""}, + {"C_CreateObject", "TODO", "TODO"}, + {"C_CopyObject", "TODO", "TODO"}, + {"C_DestroyObject", "TODO", "TODO"}, + {"C_GetObjectSize", "TODO", "TODO"}, + {"C_GetAttributeValue", "TODO", "TODO"}, + {"C_SetAttributeValue", "TODO", "TODO"}, + {"C_FindObjectsInit", "TODO", "TODO"}, + {"C_FindObjects", "TODO", "TODO"}, + {"C_FindObjectsFinal", "TODO", "TODO"}, + {"C_EncryptInit", "TODO", "TODO"}, + {"C_Encrypt", "TODO", "TODO"}, + {"C_EncryptUpdate", "TODO", "TODO"}, + {"C_EncryptFinal", "TODO", "TODO"}, + {"C_DecryptInit", "TODO", "TODO"}, + {"C_Decrypt", "TODO", "TODO"}, + {"C_DecryptUpdate", "TODO", "TODO"}, + {"C_DecryptFinal", "TODO", "TODO"}, + {"C_DigestInit", "TODO", "TODO"}, + {"C_Digest", "TODO", "TODO"}, + {"C_DigestUpdate", "TODO", "TODO"}, + {"C_DigestKey", "TODO", "TODO"}, + {"C_DigestFinal", "TODO", "TODO"}, + {"C_SignInit", "TODO", "TODO"}, + {"C_Sign", "TODO", "TODO"}, + {"C_SignUpdate", "TODO", "TODO"}, + {"C_SignFinal", "TODO", "TODO"}, + {"C_SignRecoverInit", "TODO", "TODO"}, + {"C_SignRecover", "TODO", "TODO"}, + {"C_VerifyInit", "TODO", "TODO"}, + {"C_Verify", "TODO", "TODO"}, + {"C_VerifyUpdate", "TODO", "TODO"}, + {"C_VerifyFinal", "TODO", "TODO"}, + {"C_VerifyRecoverInit", "TODO", "TODO"}, + {"C_VerifyRecover", "TODO", "TODO"}, + {"C_DigestEncryptUpdate", "TODO", "TODO"}, + {"C_DecryptDigestUpdate", "TODO", "TODO"}, + {"C_SignEncryptUpdate", "TODO", "TODO"}, + {"C_DecryptVerifyUpdate", "TODO", "TODO"}, + {"C_GenerateKey", "TODO", "TODO"}, + {"C_GenerateKeyPair", "TODO", "TODO"}, + {"C_WrapKey", "TODO", "TODO"}, + {"C_UnwrapKey", "TODO", "TODO"}, + {"C_DeriveKey", "TODO", "TODO"}, + {"C_SeedRandom", "TODO", "TODO"}, + {"C_GenerateRandom", "TODO", "TODO"}, + {"C_WaitForSlotEvent" "TODO", "TODO"} + ]). diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index cbc00df..ef8877d 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -11,7 +11,7 @@ %% API. -export([start_link/1]). --export([reply/2]). +-export([reply/2, token_gone/2]). %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -22,10 +22,10 @@ tokname :: string(), client :: pid() | undefined, socket :: gen_tcp:socket(), - msg :: p11rpc_msg() | undefined, - recv_count = 0 :: non_neg_integer(), - send_count = 0 :: non_neg_integer() - %%clientbuf = <<>> :: binary() + req_in :: p11rpc_msg() | undefined, + req_out :: p11rpc_msg() | undefined, + recv_count = 0 :: non_neg_integer(), % received from app + send_count = 0 :: non_neg_integer() % sent to token }). %% API. @@ -37,22 +37,24 @@ start_link(Args) -> reply(Pid, Response) -> gen_server:call(Pid, {respond, Response}). +-spec token_gone(pid(), boolean()) -> ok. +token_gone(Pid, Hangup) -> + case process_info(Pid) of undefined -> error(bad_server_pid); _ -> nop end, + gen_server:cast(Pid, {token_gone, Hangup}). + + %% Genserver callbacks. init([Token, Socket]) -> - lager:debug("~p: p11p_server:init", [self()]), + lager:debug("~p: p11p_server starting for ~s", [self(), Token]), process_flag(trap_exit, true), % Need terminate/2. gen_server:cast(self(), accept), % Invoke accept, returning a socket in state. {ok, #state{tokname = Token, socket = Socket}}. -handle_call({respond, R}, _, #state{socket = Sock, send_count = Sent} = S) -> - D = p11p_rpc:serialise(R), - Buf = case Sent of - 0 -> <>; - _ -> D - end, - %%lager:debug("~p: sending ~B octets as response", [self(), size(Buf)]), - ok = gen_tcp:send(Sock, Buf), % TODO: what about short writes? - {reply, {ok, size(Buf)}, S#state{send_count = Sent + 1}}; +%% FIXME: make this a cast +handle_call({respond, Resp}, _, State = #state{send_count = Sent}) -> + N = send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent), + {reply, {ok, N}, State#state{req_out = undefined, + send_count = Sent + 1}}; handle_call(Call, _, S) -> lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), @@ -77,6 +79,21 @@ handle_cast(accept, State = #state{tokname = TokName, socket = ListenSocket}) -> {stop, normal, State} end; + +handle_cast({token_gone, Hangup}, State = #state{send_count = Sent}) -> + Resp = p11p_rpc:error(State#state.req_out#p11rpc_msg.call_code, + ?CKR_DEVICE_ERROR), + {ok, _} = send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent), + NewState = State#state{client = undefined, + req_out = undefined, + send_count = Sent + 1}, + case Hangup of + true -> + {close, NewState}; + false -> + {noreply, NewState} + end; + handle_cast(Cast, State) -> lager:debug("~p: Unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. @@ -89,7 +106,7 @@ handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) case RPCVersion of ?RPC_VERSION -> {noreply, - p11_client_data( + p11_app_data( S#state{client = p11p_manager:client_for_token(TokName)}, p11p_rpc:new(), Data)}; @@ -100,9 +117,9 @@ handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) end; %% Subsequent packages from P11 client. -handle_info({tcp, _Port, DataIn}, #state{msg = Msg} = S) -> +handle_info({tcp, _Port, DataIn}, #state{req_in = Msg} = S) -> %%lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(Data), Port, size(Msg#p11rpc_msg.buffer)]), - {noreply, p11_client_data(S, Msg, DataIn)}; + {noreply, p11_app_data(S, Msg, DataIn)}; handle_info({tcp_closed, Port}, S) -> lager:debug("~p: socket ~p closed", [self(), Port]), @@ -112,9 +129,14 @@ handle_info(Info, S) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), {noreply, S}. -terminate(Reason, #state{socket = Sock, tokname = TokName, client = Client}) -> - gen_tcp:close(Sock), - p11p_manager:client_event(client_gone, [TokName, Client]), +terminate(Reason, #state{socket = Sock, tokname = TokName}) -> + ok = gen_tcp:close(Sock), + + %% FIXME: tell manager, so that the client can be stopped. we + %% don't want to risk that another app (socket client) uses it + + p11p_manager:server_event(server_gone, TokName), + lager:debug("~p: terminated with reason ~p", [self(), Reason]), ignored. @@ -122,13 +144,24 @@ code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private functions. -p11_client_data(#state{client = Client, recv_count = Recv} = S, MsgIn, +p11_app_data(#state{client = Client, recv_count = Recv} = S, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, DataIn) of {needmore, Msg} -> - S#state{msg = Msg}; + S#state{req_in = Msg}; {done, Msg} -> + lager:debug("~p: -> ~s", [self(), p11p_rpc:dump(Msg)]), {ok, _BytesSent} = p11p_client:request(Client, Msg), - S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), + S#state{req_out = Msg, + req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), recv_count = Recv + 1} end. + +send_response(Sock, Inbuf, Sent) -> + Buf = case Sent of + 0 -> <>; + _ -> Inbuf + end, + %%lager:debug("~p: sending ~B octets as response", [self(), size(Inbuf)]), + ok = gen_tcp:send(Sock, Buf), + {ok, size(Inbuf)}. -- cgit v1.1 From 9f50fa4e8d7d82605116e07ea376da7ebedb8a57 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Tue, 11 Feb 2020 11:03:47 +0100 Subject: WIP track p11 state and shortcut responses when needed --- p11p-daemon/src/p11p_client.erl | 117 +++++++++++++++++++++++++++++++-------- p11p-daemon/src/p11p_manager.erl | 8 +-- p11p-daemon/src/p11p_rpc.erl | 14 +++++ p11p-daemon/src/p11p_rpc.hrl | 4 +- p11p-daemon/src/p11p_server.erl | 46 +++++++++++---- 5 files changed, 149 insertions(+), 40 deletions(-) (limited to 'p11p-daemon/src') diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl index 7dc3457..87c2949 100644 --- a/p11p-daemon/src/p11p_client.erl +++ b/p11p-daemon/src/p11p_client.erl @@ -23,14 +23,18 @@ code_change/3]). %% Records and types. +-type token_state() :: started | initialized | session | loggedin | opact | finalized. + -record(state, { token :: string(), % Token name. timeout :: non_neg_integer(), port :: port(), replyto :: pid() | undefined, + + p11state = started :: token_state(), timer :: reference() | undefined, - msg :: p11rpc:msg() | undefined, + response :: p11rpc:msg() | undefined, recv_count = 0 :: non_neg_integer(), send_count = 0 :: non_neg_integer() }). @@ -43,7 +47,7 @@ start_link(ServName, TokName, Server, ModPath, ModEnv, Timeout) -> gen_server:start_link({local, ServName}, ?MODULE, [TokName, Server, ModPath, ModEnv, Timeout], []). --spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. +-spec request(pid(), p11rpc_msg()) -> ack | nack | {ok, non_neg_integer()}. request(Client, Request) -> gen_server:call(Client, {request, Request}). @@ -70,18 +74,46 @@ init([TokName, Server, ModPath, ModEnv, Timeout]) -> lager:debug("~p: ~s: module: ~s, env: ~p", [self(), ProxyAppBinPath, ModPath, ModEnv]), {ok, #state{port = Port, token = TokName, replyto = Server, timeout = Timeout}}. -handle_call({request, Request}, {FromPid, _Tag}, - #state{port = Port, send_count = Sent} = S) -> - %%lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]), - D = p11p_rpc:serialise(Request), - Buf = case Sent of - 0 -> <>; - _ -> D - end, - {ok, _} = do_send(Port, Buf), - {reply, {ok, sizeBuf}, S#state{replyto = FromPid, - timer = start_timer(S#state.timeout, Port), - send_count = Sent + 1}}; +handle_call({request, Request}, + {FromPid, _Tag}, + S = #state{port = Port, send_count = Sent}) -> + case + case S#state.p11state of + started -> + case p11p_rpc:req_id(Request) of + ?P11_RPC_CALL_C_Logout -> ack; + ?P11_RPC_CALL_C_CloseSession -> ack; + ?P11_RPC_CALL_C_Finalize -> ack; + + ?P11_RPC_CALL_C_Initialize -> pass; + ?P11_RPC_CALL_C_OpenSession -> pass; + ?P11_RPC_CALL_C_Login -> pass; + + _ -> nack + end; + _ -> + pass + end + of + ack -> + {reply, ack, S}; + nack -> + {reply, nack, S}; + pass -> + lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]), + D = p11p_rpc:serialise(Request), + Buf = case Sent of + 0 -> <>; + _ -> D + end, + {ok, _} = do_send(Port, Buf), + + {reply, + {ok, size(Buf)}, + S#state{replyto = FromPid, + timer = start_timer(S#state.timeout, Port), + send_count = Sent + 1}} + end; handle_call(Call, _From, State) -> lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), @@ -96,10 +128,10 @@ handle_cast(Cast, State) -> %% Receiving the very first response from proxy app since it was started. handle_info({Port, {data, Data}}, State) - when Port == State#state.port, State#state.msg == undefined -> + when Port == State#state.port, State#state.response == undefined -> case hd(Data) of % First octet is RPC protocol version. ?RPC_VERSION -> - {noreply, handle_token_data(State, p11p_rpc:new(), tl(Data))}; + {noreply, response_in(State, p11p_rpc:new(), tl(Data))}; BadVersion -> lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, BadVersion]), @@ -107,9 +139,9 @@ handle_info({Port, {data, Data}}, State) end; %% Receiving more data from proxy app. -handle_info({Port, {data, Data}}, #state{msg = Msg} = State) +handle_info({Port, {data, Data}}, #state{response = Msg} = State) when Port == State#state.port -> - {noreply, handle_token_data(State, Msg, Data)}; + {noreply, response_in(State, Msg, Data)}; %% Proxy app timed out. handle_info({timeout, Timer, Port}, S = #state{token = Tok}) @@ -144,17 +176,18 @@ do_send(Port, Buf) -> end, {ok, size(Buf)}. -handle_token_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, - MsgIn, DataIn) -> +response_in(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, + MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of {needmore, Msg} -> - S#state{msg = Msg}; + S#state{response = Msg}; {done, Msg} -> cancel_timer(Timer), lager:debug("~p: <- ~s", [self(), p11p_rpc:dump(Msg)]), {ok, _BytesSent} = p11p_server:reply(Pid, Msg), %% Saving potential data not consumed by parse/2 in new message. - S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), + S#state{response = p11p_rpc:new(Msg#p11rpc_msg.buffer), + p11state = runstate(S#state.p11state, p11p_rpc:req_id(Msg)), recv_count = Recv + 1} end. @@ -165,3 +198,43 @@ start_timer(Timeout, Port) -> cancel_timer(Timer) -> %%lager:debug("~p: canceling timer", [self()]), erlang:cancel_timer(Timer, [{async, true}, {info, false}]). + +-spec runstate(token_state(), non_neg_integer()) -> token_state(). +runstate(started, ReqId) -> + case ReqId of + ?P11_RPC_CALL_C_Initialize -> + initialized; + _ -> + started + end; +runstate(initialized, ReqId) -> + case ReqId of + ?P11_RPC_CALL_C_OpenSession -> + session; + ?P11_RPC_CALL_C_Finalize -> + finalized; + _ -> + initialized + end; +runstate(session, ReqId) -> + case ReqId of + ?P11_RPC_CALL_C_Login -> + loggedin; + ?P11_RPC_CALL_C_CloseSession -> + initialized; + ?P11_RPC_CALL_C_Finalize -> + finalized; + _ -> + session + end; +runstate(loggedin, ReqId) -> + case ReqId of + ?P11_RPC_CALL_C_Logout -> + session; + ?P11_RPC_CALL_C_CloseSession -> + initialized; + ?P11_RPC_CALL_C_Finalize -> + finalized; + _ -> + loggedin + end. diff --git a/p11p-daemon/src/p11p_manager.erl b/p11p-daemon/src/p11p_manager.erl index 2dbdf6c..209d08e 100644 --- a/p11p-daemon/src/p11p_manager.erl +++ b/p11p-daemon/src/p11p_manager.erl @@ -126,9 +126,7 @@ handle_cast({server_event, server_gone, TokNameIn}, S = #state{vtokens = Tokens} Token = TokenIn#vtoken{clients = Clients, server = undefined}, {noreply, S#state{vtokens = Tokens#{TokNameIn := Token}}}; -%% Client reporting that a token has timed out -- mark current client -%% not running, inform server, rotate client list and start new -%% client. +%% Client reporting that a token has timed out. handle_cast({client_event, timeout, TokName}, State) -> #{TokName := VToken} = State#state.vtokens, client_timeout(TokName, VToken, State); @@ -225,6 +223,8 @@ next_client(VToken = #vtoken{clients = Clients}) -> gen_server:cast(self(), {start_client, OldC#client.tokname}), VToken#vtoken{clients = NewClients}. +%% Mark current client not running, inform its server, rotate client +%% list and start a new client. client_timeout(TokName, VToken = #vtoken{retries = Retries}, State = #state{vtokens = VTokens}) @@ -240,7 +240,7 @@ client_timeout(TokName, State) -> lager:debug("~p: ~s: token timed out, disconnecting app", [self(), TokName]), p11p_server:token_gone(VToken#vtoken.server, true), - {stop, State}. + {stop, normal, State}. start_client(Client, Server, Timeout) -> {ok, Pid} = p11p_client:start_link(Client#client.servid, diff --git a/p11p-daemon/src/p11p_rpc.erl b/p11p-daemon/src/p11p_rpc.erl index 03a476c..0e52bc5 100644 --- a/p11p-daemon/src/p11p_rpc.erl +++ b/p11p-daemon/src/p11p_rpc.erl @@ -9,7 +9,9 @@ dump/1, error/2, new/0, new/1, + ok/1, parse/2, + req_id/1, serialise/1 ]). @@ -36,6 +38,13 @@ error(CallCode, ErrorCode) -> data_len = size(DataBuf), data = DataBuf}. +ok(CallCode) -> + #p11rpc_msg{ + state = done, + call_code = CallCode, + opt_len = 0, + data_len = 0}. + parse(M) -> parse(M, <<>>). @@ -71,6 +80,11 @@ parse(#p11rpc_msg{buffer = MsgBuf} = M, DataIn) {done, Msg} end. +req_id(Msg) + when Msg#p11rpc_msg.data_len >= 4 -> + {ReqId, _} = parse_req_id(Msg#p11rpc_msg.data), + ReqId. + -spec serialise(p11rpc_msg()) -> binary(). serialise(M) when M#p11rpc_msg.state == done, M#p11rpc_msg.call_code > -1, diff --git a/p11p-daemon/src/p11p_rpc.hrl b/p11p-daemon/src/p11p_rpc.hrl index 0014f57..9d2b3f8 100644 --- a/p11p-daemon/src/p11p_rpc.hrl +++ b/p11p-daemon/src/p11p_rpc.hrl @@ -9,8 +9,8 @@ opt_len = -1 :: integer(), % Length is 4 data_len = -1 :: integer(), % Length is 4 - options = <<>> :: binary(), % Length is header.opt_len - data = <<>> :: binary(), % Length is header.buf_len + options = <<>> :: binary(), % Length is opt_len + data = <<>> :: binary(), % Length is data_len buffer = <<>> :: binary(), state = header :: header | opts | data | done diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index ef8877d..7b05da7 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -84,14 +84,17 @@ handle_cast({token_gone, Hangup}, State = #state{send_count = Sent}) -> Resp = p11p_rpc:error(State#state.req_out#p11rpc_msg.call_code, ?CKR_DEVICE_ERROR), {ok, _} = send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent), - NewState = State#state{client = undefined, - req_out = undefined, + NewState = State#state{req_out = undefined, send_count = Sent + 1}, case Hangup of true -> - {close, NewState}; + lager:info("~p: Token reported gone, no more retries, closing.", [self()]), + {stop, normal, NewState}; %FIXME: no need to update state, i think false -> - {noreply, NewState} + lager:info("~p: Token reported gone, retrying with new token.", [self()]), + {noreply, + NewState#state{client = + p11p_manager:client_for_token(State#state.tokname)}} end; handle_cast(Cast, State) -> @@ -101,7 +104,7 @@ handle_cast(Cast, State) -> %% First packet from P11 client. handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) when S#state.client == undefined -> - %%lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(Data), Port]), + lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(DataIn), Port]), <> = DataIn, case RPCVersion of ?RPC_VERSION -> @@ -117,8 +120,8 @@ handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) end; %% Subsequent packages from P11 client. -handle_info({tcp, _Port, DataIn}, #state{req_in = Msg} = S) -> - %%lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(Data), Port, size(Msg#p11rpc_msg.buffer)]), +handle_info({tcp, Port, DataIn}, #state{req_in = Msg} = S) -> + lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(DataIn), Port, size(Msg#p11rpc_msg.buffer)]), {noreply, p11_app_data(S, Msg, DataIn)}; handle_info({tcp_closed, Port}, S) -> @@ -151,10 +154,29 @@ p11_app_data(#state{client = Client, recv_count = Recv} = S, MsgIn, S#state{req_in = Msg}; {done, Msg} -> lager:debug("~p: -> ~s", [self(), p11p_rpc:dump(Msg)]), - {ok, _BytesSent} = p11p_client:request(Client, Msg), - S#state{req_out = Msg, - req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), - recv_count = Recv + 1} + case p11p_client:request(Client, Msg) of + ack -> + lager:debug("~p: acking request", [self()]), + Resp = p11p_rpc:ok(Msg#p11rpc_msg.call_code), + {ok, _} = send_response(S#state.socket, + p11p_rpc:serialise(Resp), + S#state.send_count), + S#state{req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), + send_count = S#state.send_count + 1}; + nack -> + lager:debug("~p: nacking request", [self()]), + Resp = p11p_rpc:error(Msg#p11rpc_msg.call_code, + ?CKR_DEVICE_ERROR), + {ok, _} = send_response(S#state.socket, + p11p_rpc:serialise(Resp), + S#state.send_count), + S#state{req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), + send_count = S#state.send_count + 1}; + {ok, _BytesSent} -> + S#state{req_out = Msg, + req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), + recv_count = Recv + 1} + end end. send_response(Sock, Inbuf, Sent) -> @@ -162,6 +184,6 @@ send_response(Sock, Inbuf, Sent) -> 0 -> <>; _ -> Inbuf end, - %%lager:debug("~p: sending ~B octets as response", [self(), size(Inbuf)]), + lager:debug("~p: sending ~B octets as response", [self(), size(Inbuf)]), ok = gen_tcp:send(Sock, Buf), {ok, size(Inbuf)}. -- cgit v1.1 From 37277c3ba0119f50af8ffff014ce13b93f225557 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Tue, 11 Feb 2020 13:14:28 +0100 Subject: Some more documentation and cosmetic changes --- p11p-daemon/src/p11p_client.erl | 94 ++++++++++++++++++------------ p11p-daemon/src/p11p_rpc.erl | 12 ++-- p11p-daemon/src/p11p_server.erl | 126 +++++++++++++++++++++------------------- 3 files changed, 133 insertions(+), 99 deletions(-) (limited to 'p11p-daemon/src') diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl index 87c2949..d6c73ac 100644 --- a/p11p-daemon/src/p11p_client.erl +++ b/p11p-daemon/src/p11p_client.erl @@ -1,20 +1,31 @@ %%% Copyright (c) 2019, Sunet. %%% See LICENSE for licensing information. -%% A client spawns an Erlang port running a proxy app, i.e. the -%% 'remote' program from p11-kit. - -%% Receive p11 requests from p11p_server, forward them to the proxy app, -%% wait for a reply. If a reply is received within a timeout period, -%% proxy the reply to the requesting p11p_server. If the request -%% times out, inform the manager (our parent). +%% Spawn an Erlang port running a proxy app. We use the 'remote' +%% program from p11-kit as the proxy app. + +%% Receive PKCS#11 requests from a p11p_server, forward them to the +%% proxy app, wait for a reply. If a reply is received within a +%% timeout period, proxy the reply to the requesting p11p_server. If +%% the request times out, inform the manager (our parent) and exit. + +%% Track a subset of the PKCS#11 state in order to handle token +%% restarts. We start in state 'started'. While in 'started', we allow +%% only a few "opening" calls (Initialize, OpenSession and Login) +%% through to the token. Corresponding "closing" calls (Finalize, +%% CloseSession and Logout) are sent an immediate OK response without +%% forwarding them to the token. Any other call is rejected by +%% responding with an error. This should make well behaving P11 +%% applications be able to deal with us switching the token under +%% their feet. -module(p11p_client). -behaviour(gen_server). %% API. -export([start_link/6]). --export([request/2, stop/2]). +-export([request/2, % Request from p11p-server. + stop/2]). % Manager stopping us. -include("p11p_rpc.hrl"). @@ -23,7 +34,12 @@ code_change/3]). %% Records and types. --type token_state() :: started | initialized | session | loggedin | opact | finalized. +-type token_state() :: started | + initialized | + session | + loggedin | + opact | + finalized. -record(state, { token :: string(), % Token name. @@ -40,8 +56,8 @@ }). %% API. --spec start_link(atom(), string(), pid(), string(), list(), non_neg_integer()) -> - {ok, pid()} | {error, term()}. +-spec start_link(atom(), string(), pid(), string(), list(), + non_neg_integer()) -> {ok, pid()} | {error, term()}. start_link(ServName, TokName, Server, ModPath, ModEnv, Timeout) -> lager:info("~p: starting p11p_client for ~s", [self(), TokName]), gen_server:start_link({local, ServName}, ?MODULE, @@ -51,12 +67,13 @@ start_link(ServName, TokName, Server, ModPath, ModEnv, Timeout) -> request(Client, Request) -> gen_server:call(Client, {request, Request}). -%% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether -%% we (Pid) are alive or not. An example of when that can happen is -%% when the manager receives a server_event about a lost p11 app. If -%% the server process terminated on request from us because we timed -%% out on an rpc call, chances are that we have already terminated by -%% the time the manager acts on the information about the lost app. +%% You should invoke stop/1 instead of gen_server:stop/1 if you're +%% uncertain whether we (Pid) are alive or not. An example of when +%% that can happen is when the manager receives a server_event about a +%% lost P11 app -- if the server process terminated on request from us +%% because we timed out on an RPC call, chances are that we have +%% already terminated by the time the manager acts on the information +%% about the lost app. stop(Pid, Reason) -> gen_server:cast(Pid, {stop, Reason}). @@ -72,13 +89,16 @@ init([TokName, Server, ModPath, ModEnv, Timeout]) -> true = is_port(Port), lager:debug("~p: ~s: new proxy app port: ~p", [self(), ProxyAppBinPath, Port]), lager:debug("~p: ~s: module: ~s, env: ~p", [self(), ProxyAppBinPath, ModPath, ModEnv]), - {ok, #state{port = Port, token = TokName, replyto = Server, timeout = Timeout}}. + {ok, #state{port = Port, + token = TokName, + replyto = Server, + timeout = Timeout}}. handle_call({request, Request}, {FromPid, _Tag}, - S = #state{port = Port, send_count = Sent}) -> + State = #state{port = Port, send_count = Sent}) -> case - case S#state.p11state of + case State#state.p11state of started -> case p11p_rpc:req_id(Request) of ?P11_RPC_CALL_C_Logout -> ack; @@ -96,9 +116,9 @@ handle_call({request, Request}, end of ack -> - {reply, ack, S}; + {reply, ack, State}; nack -> - {reply, nack, S}; + {reply, nack, State}; pass -> lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]), D = p11p_rpc:serialise(Request), @@ -110,9 +130,9 @@ handle_call({request, Request}, {reply, {ok, size(Buf)}, - S#state{replyto = FromPid, - timer = start_timer(S#state.timeout, Port), - send_count = Sent + 1}} + State#state{replyto = FromPid, + timer = start_timer(State#state.timeout, Port), + send_count = Sent + 1}} end; handle_call(Call, _From, State) -> @@ -126,12 +146,13 @@ handle_cast(Cast, State) -> lager:debug("~p: unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. -%% Receiving the very first response from proxy app since it was started. +%% Receiving the very first octets from proxy app since it was started. handle_info({Port, {data, Data}}, State) when Port == State#state.port, State#state.response == undefined -> case hd(Data) of % First octet is RPC protocol version. ?RPC_VERSION -> - {noreply, response_in(State, p11p_rpc:new(), tl(Data))}; + NewState = response_in(State, p11p_rpc:new(), tl(Data)), + {noreply, NewState}; BadVersion -> lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, BadVersion]), @@ -139,17 +160,18 @@ handle_info({Port, {data, Data}}, State) end; %% Receiving more data from proxy app. -handle_info({Port, {data, Data}}, #state{response = Msg} = State) +handle_info({Port, {data, Data}}, State) when Port == State#state.port -> - {noreply, response_in(State, Msg, Data)}; + NewState = response_in(State, State#state.response, Data), + {noreply, NewState}; %% Proxy app timed out. -handle_info({timeout, Timer, Port}, S = #state{token = Tok}) - when Port == S#state.port, Timer == S#state.timer -> - lager:info("~p: rpc request for ~s timed out, exiting", [self(), Tok]), - p11p_manager:client_event(timeout, Tok), - State = S#state{timer = undefined}, - {stop, normal, State}; +handle_info({timeout, Timer, Port}, State) + when Port == State#state.port, Timer == State#state.timer -> + lager:info("~p: rpc request for ~s timed out, exiting", [self(), State#state.token]), + p11p_manager:client_event(timeout, State#state.token), + NewState = State#state{timer = undefined}, + {stop, normal, NewState}; handle_info(Info, State) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), @@ -176,7 +198,7 @@ do_send(Port, Buf) -> end, {ok, size(Buf)}. -response_in(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, +response_in(S = #state{replyto = Pid, timer = Timer, recv_count = Recv}, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of {needmore, Msg} -> diff --git a/p11p-daemon/src/p11p_rpc.erl b/p11p-daemon/src/p11p_rpc.erl index 0e52bc5..b04cbbf 100644 --- a/p11p-daemon/src/p11p_rpc.erl +++ b/p11p-daemon/src/p11p_rpc.erl @@ -6,10 +6,11 @@ -module(p11p_rpc). -export([ + call_code/1, dump/1, - error/2, + msg_error/2, + msg_ok/1, new/0, new/1, - ok/1, parse/2, req_id/1, serialise/1 @@ -17,6 +18,9 @@ -include("p11p_rpc.hrl"). +call_code(Msg) -> + Msg#p11rpc_msg.call_code. + dump(Msg = #p11rpc_msg{data = Data}) -> {ReqId, Data2} = parse_req_id(Data), {ArgsDesc, Data3} = parse_args_desc(Data2), @@ -29,7 +33,7 @@ dump(Msg = #p11rpc_msg{data = Data}) -> Data3 ]). -error(CallCode, ErrorCode) -> +msg_error(CallCode, ErrorCode) -> DataBuf = serialise_error(ErrorCode), #p11rpc_msg{ state = done, @@ -38,7 +42,7 @@ error(CallCode, ErrorCode) -> data_len = size(DataBuf), data = DataBuf}. -ok(CallCode) -> +msg_ok(CallCode) -> #p11rpc_msg{ state = done, call_code = CallCode, diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index 7b05da7..c27d825 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -1,8 +1,14 @@ %%% Copyright (c) 2019, Sunet. %%% See LICENSE for licensing information. -%% Create an AF_UNIX socket and accept connections. On connect, spawn -%% another p11p_server process. +%% Create an AF_UNIX socket and accept connections from a P11 app. On +%% connect, spawn another p11p_server process. + +%% Recevie PKCS#11 requests on the socket and forward them to a +%% p11p-client. + +%% Receive responses from our p11p-client and forward them to the P11 +%% app. -module(p11p_server). -behaviour(gen_server). @@ -11,7 +17,8 @@ %% API. -export([start_link/1]). --export([reply/2, token_gone/2]). +-export([reply/2, % Replies from p11p-client. + token_gone/2]). % p11p-client disappeared. %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -19,13 +26,13 @@ %% Records and types. -record(state, { - tokname :: string(), - client :: pid() | undefined, - socket :: gen_tcp:socket(), - req_in :: p11rpc_msg() | undefined, - req_out :: p11rpc_msg() | undefined, - recv_count = 0 :: non_neg_integer(), % received from app - send_count = 0 :: non_neg_integer() % sent to token + tokname :: string(), % Virtual token being served. + socket :: gen_tcp:socket(), % AF_UNIX socket. + client :: pid() | undefined, % Our p11p-client. + req_in :: p11rpc_msg() | undefined, % Request received from P11 app. + req_out :: p11rpc_msg() | undefined, % Request sent to p11p-client. + recv_count = 0 :: non_neg_integer(), % Counting requests from P11 app. + send_count = 0 :: non_neg_integer() % Conting requests to p11p-client. }). %% API. @@ -46,15 +53,17 @@ token_gone(Pid, Hangup) -> %% Genserver callbacks. init([Token, Socket]) -> lager:debug("~p: p11p_server starting for ~s", [self(), Token]), - process_flag(trap_exit, true), % Need terminate/2. - gen_server:cast(self(), accept), % Invoke accept, returning a socket in state. + process_flag(trap_exit, true), % Call terminate/2 on exit. + %% Invoking gen_tcp:accept(), updating state with a new socket. + gen_server:cast(self(), accept), {ok, #state{tokname = Token, socket = Socket}}. -%% FIXME: make this a cast +%% FIXME: make this a cast? handle_call({respond, Resp}, _, State = #state{send_count = Sent}) -> - N = send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent), - {reply, {ok, N}, State#state{req_out = undefined, - send_count = Sent + 1}}; + {reply, + {ok, send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent)}, + State#state{req_out = undefined, + send_count = Sent + 1}}; handle_call(Call, _, S) -> lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), @@ -62,13 +71,13 @@ handle_call(Call, _, S) -> %% Wait for new connection. handle_cast(accept, State = #state{tokname = TokName, socket = ListenSocket}) -> - %% Blocking until client connects or timeout fires. + %% Blocking until P11 app connects or the timeout fires. %% Without a timeout our supervisor cannot terminate us. %% On timeout, just invoke ourselves again. case gen_tcp:accept(ListenSocket, 900) of {ok, Sock} -> lager:debug("~p: ~p: new connection accepted", [self(), Sock]), - %% Start a new acceptor and return with new socket in state. + %% Start a new acceptor and return with the new socket in state. p11p_server_sup:start_server([TokName, ListenSocket]), {noreply, State#state{socket = Sock}}; {error, timeout} -> @@ -79,65 +88,65 @@ handle_cast(accept, State = #state{tokname = TokName, socket = ListenSocket}) -> {stop, normal, State} end; - handle_cast({token_gone, Hangup}, State = #state{send_count = Sent}) -> - Resp = p11p_rpc:error(State#state.req_out#p11rpc_msg.call_code, - ?CKR_DEVICE_ERROR), + Resp = p11p_rpc:msg_error(p11p_rpc:call_code(State#state.req_out), + ?CKR_DEVICE_ERROR), {ok, _} = send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent), NewState = State#state{req_out = undefined, send_count = Sent + 1}, case Hangup of true -> - lager:info("~p: Token reported gone, no more retries, closing.", [self()]), + lager:info("~p: Token reported gone, no more retries, closing.", + [self()]), {stop, normal, NewState}; %FIXME: no need to update state, i think false -> - lager:info("~p: Token reported gone, retrying with new token.", [self()]), - {noreply, - NewState#state{client = - p11p_manager:client_for_token(State#state.tokname)}} + lager:info("~p: Token reported gone, retrying with new token.", + [self()]), + NewClient = p11p_manager:client_for_token(State#state.tokname), + {noreply, NewState#state{client = NewClient}} end; handle_cast(Cast, State) -> lager:debug("~p: Unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. -%% First packet from P11 client. -handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) - when S#state.client == undefined -> +%% First chunk from P11 app. +handle_info({tcp, Port, DataIn}, State) + when State#state.client == undefined -> lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(DataIn), Port]), <> = DataIn, case RPCVersion of ?RPC_VERSION -> - {noreply, - p11_app_data( - S#state{client = p11p_manager:client_for_token(TokName)}, - p11p_rpc:new(), - Data)}; + NewClient = p11p_manager:client_for_token(State#state.tokname), + NewState = request_in(State#state{client = NewClient}, + p11p_rpc:new(), Data), + {noreply, NewState}; BadVersion -> lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, BadVersion]), - {stop, bad_proto, S} + {stop, bad_proto, State} end; -%% Subsequent packages from P11 client. -handle_info({tcp, Port, DataIn}, #state{req_in = Msg} = S) -> +%% Subsequent packages from P11 app. +handle_info({tcp, Port, DataIn}, State) -> + Msg = State#state.req_in, lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(DataIn), Port, size(Msg#p11rpc_msg.buffer)]), - {noreply, p11_app_data(S, Msg, DataIn)}; + NewState = request_in(State, State#state.req_in, DataIn), + {noreply, NewState}; -handle_info({tcp_closed, Port}, S) -> +handle_info({tcp_closed, Port}, State) -> lager:debug("~p: socket ~p closed", [self(), Port]), - {stop, normal, S}; + {stop, normal, State}; -handle_info(Info, S) -> +handle_info(Info, State) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), - {noreply, S}. + {noreply, State}. terminate(Reason, #state{socket = Sock, tokname = TokName}) -> ok = gen_tcp:close(Sock), - %% FIXME: tell manager, so that the client can be stopped. we - %% don't want to risk that another app (socket client) uses it - + %% Let manager know, so that the client can be stopped. We don't + %% want to risk that another P11 app uses it. p11p_manager:server_event(server_gone, TokName), lager:debug("~p: terminated with reason ~p", [self(), Reason]), @@ -147,17 +156,16 @@ code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private functions. -p11_app_data(#state{client = Client, recv_count = Recv} = S, MsgIn, - DataIn) -> +request_in(S, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, DataIn) of {needmore, Msg} -> S#state{req_in = Msg}; {done, Msg} -> lager:debug("~p: -> ~s", [self(), p11p_rpc:dump(Msg)]), - case p11p_client:request(Client, Msg) of + case p11p_client:request(S#state.client, Msg) of ack -> lager:debug("~p: acking request", [self()]), - Resp = p11p_rpc:ok(Msg#p11rpc_msg.call_code), + Resp = p11p_rpc:msg_ok(p11p_rpc:call_code(Msg)), {ok, _} = send_response(S#state.socket, p11p_rpc:serialise(Resp), S#state.send_count), @@ -165,8 +173,8 @@ p11_app_data(#state{client = Client, recv_count = Recv} = S, MsgIn, send_count = S#state.send_count + 1}; nack -> lager:debug("~p: nacking request", [self()]), - Resp = p11p_rpc:error(Msg#p11rpc_msg.call_code, - ?CKR_DEVICE_ERROR), + Resp = p11p_rpc:msg_error(p11p_rpc:call_code(Msg), + ?CKR_DEVICE_ERROR), {ok, _} = send_response(S#state.socket, p11p_rpc:serialise(Resp), S#state.send_count), @@ -175,15 +183,15 @@ p11_app_data(#state{client = Client, recv_count = Recv} = S, MsgIn, {ok, _BytesSent} -> S#state{req_out = Msg, req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), - recv_count = Recv + 1} + recv_count = S#state.recv_count + 1} end end. send_response(Sock, Inbuf, Sent) -> - Buf = case Sent of - 0 -> <>; - _ -> Inbuf - end, - lager:debug("~p: sending ~B octets as response", [self(), size(Inbuf)]), - ok = gen_tcp:send(Sock, Buf), - {ok, size(Inbuf)}. + Outbuf = case Sent of + 0 -> <>; + _ -> Inbuf + end, + lager:debug("~p: sending ~B octets as response", [self(), size(Outbuf)]), + ok = gen_tcp:send(Sock, Outbuf), + {ok, size(Outbuf)}. -- cgit v1.1 From 920995ca6e2ef7c4993b0196c2556409eade04d1 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 12 Feb 2020 17:12:43 +0100 Subject: Make probability of dropping a request, for testing, configurable --- p11p-daemon/src/p11p_client.erl | 7 ++++--- p11p-daemon/src/p11p_config.erl | 27 +++++++++++++++++++++++---- 2 files changed, 27 insertions(+), 7 deletions(-) (limited to 'p11p-daemon/src') diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl index d6c73ac..fd101c5 100644 --- a/p11p-daemon/src/p11p_client.erl +++ b/p11p-daemon/src/p11p_client.erl @@ -186,10 +186,11 @@ code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private -do_send(Port, Buf) -> - Rand = rand:uniform(100), %% + 10, +send_request(Port, Buf) -> + Rand = rand:uniform(100), + Prob = p11p_config:testing_drop_prob(), if - Rand =< 10 -> + Rand =< Prob -> lager:debug("~p: faking unresponsive token (~p) by not sending", [self(), Port]); true -> diff --git a/p11p-daemon/src/p11p_config.erl b/p11p-daemon/src/p11p_config.erl index d24aad6..c4bfbcd 100644 --- a/p11p-daemon/src/p11p_config.erl +++ b/p11p-daemon/src/p11p_config.erl @@ -8,10 +8,16 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([start_link/0]). --export([nameof/1]). --export([tokens/0]). --export([proxyapp_bin_path/0, modules_for_token/1, module_path/1, module_env/1, - token_balance/1, token_retries/1, token_timeout/1]). +-export([modules_for_token/1, + module_path/1, + module_env/1, + nameof/1, + proxyapp_bin_path/0, + testing_drop_prob/0, + tokens/0, + token_balance/1, + token_retries/1, + token_timeout/1]). %%% Records and types %%% -record(p11module, { @@ -32,6 +38,7 @@ -record(state, { proxyapp_bin_path :: string(), + testing_drop_prob :: non_neg_integer(), tokens :: #{string() => token()} }). @@ -46,6 +53,8 @@ init(_Args) -> handle_call(proxyapp_bin_path, _From, S = #state{proxyapp_bin_path = Path}) -> {reply, Path, S}; +handle_call(testing_drop_prob, _From, S = #state{testing_drop_prob = P}) -> + {reply, P, S}; handle_call(tokens, _From, State = #state{tokens = Tokens}) -> {reply, maps:values(Tokens), State}; handle_call({modules_for_token, TokName}, _, S = #state{tokens = Tokens}) -> @@ -86,6 +95,9 @@ start_link() -> proxyapp_bin_path() -> gen_server:call(?MODULE, proxyapp_bin_path). +testing_drop_prob() -> + gen_server:call(?MODULE, testing_drop_prob). + -spec tokens() -> [token()]. tokens() -> gen_server:call(?MODULE, tokens). @@ -129,6 +141,9 @@ init_state() -> proxyapp_bin_path = application:get_env(p11p, proxyapp_bin_path, ?PROXYAPP_DEFAULT), + testing_drop_prob = application:get_env(p11p, + testing_drop_prob, + 0), tokens = conf_tokens(application:get_env(p11p, vtokens, []))}. @@ -139,6 +154,10 @@ init_state(Filename) -> string, "proxyapp_bin_path", ?PROXYAPP_DEFAULT), + testing_drop_prob = p11p_config_file:get(Config, + integer, + "testing_drop_prob", + 0), tokens = conf_tokens(p11p_config_file:get(Config, section, "vtokens", -- cgit v1.1 From 089ae7c716352ba6690aa701deee8f5aeaa06655 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Fri, 14 Feb 2020 12:18:31 +0100 Subject: better balance settings --- p11p-daemon/src/p11p_client.erl | 11 ++++------- p11p-daemon/src/p11p_config.erl | 20 +++++++++----------- p11p-daemon/src/p11p_manager.erl | 6 +++--- 3 files changed, 16 insertions(+), 21 deletions(-) (limited to 'p11p-daemon/src') diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl index fd101c5..5fd3ff1 100644 --- a/p11p-daemon/src/p11p_client.erl +++ b/p11p-daemon/src/p11p_client.erl @@ -115,10 +115,6 @@ handle_call({request, Request}, pass end of - ack -> - {reply, ack, State}; - nack -> - {reply, nack, State}; pass -> lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]), D = p11p_rpc:serialise(Request), @@ -126,13 +122,14 @@ handle_call({request, Request}, 0 -> <>; _ -> D end, - {ok, _} = do_send(Port, Buf), - + {ok, _} = send_request(Port, Buf), {reply, {ok, size(Buf)}, State#state{replyto = FromPid, timer = start_timer(State#state.timeout, Port), - send_count = Sent + 1}} + send_count = Sent + 1}}; + Ret -> + {reply, Ret, State} end; handle_call(Call, _From, State) -> diff --git a/p11p-daemon/src/p11p_config.erl b/p11p-daemon/src/p11p_config.erl index c4bfbcd..13723ce 100644 --- a/p11p-daemon/src/p11p_config.erl +++ b/p11p-daemon/src/p11p_config.erl @@ -31,7 +31,7 @@ name :: string(), timeout :: non_neg_integer(), failover :: non_neg_integer(), % How many failover attempts. - balance :: [non_neg_integer()], + balance :: [integer()], modules = #{} :: #{string() => p11module()} }). -type token() :: #token{}. @@ -177,15 +177,16 @@ new_token({Name, Settings}) -> name = Name, timeout = proplists:get_value(timeout, Settings, 25000), failover = proplists:get_value(failover, Settings, maps:size(Modules) - 1), - balance = balance(proplists:get_value(balance, Settings, []), - maps:size(Modules)), + balance = lists:map(fun(N) -> case N of 0 -> -1; _ -> N end end, + balance(proplists:get_value(balance, Settings, []), + maps:size(Modules))), modules = Modules }. -balance([], _) -> - []; +balance([], NModules) -> + balance([0], NModules - 1); balance(List, NModules) -> - List ++ [1 || _ <- lists:seq(1, NModules - length(List))]. + List ++ [0 || _ <- lists:seq(1, NModules - length(List))]. conf_modules(L) -> conf_modules(L, #{}). @@ -230,7 +231,7 @@ tokens_init_test_() -> {token,"vtoken0", 25000, 1, - [3,1], + [3,-1], #{"bogusmod0_0" => {p11module,"bogusmod0_0", "/path/to/bogusmod0_0", []}, "bogusmod0_1" => @@ -239,13 +240,10 @@ tokens_init_test_() -> {token,"vtoken1", 12000, 3, - [], + [-1], #{"bogusmod1_0" => {p11module,"bogusmod1_0", "/path/to/bogusmod1_0", []}, "bogusmod1_1" => {p11module,"bogusmod1_1", "/path/to/bogusmod1_1", [{"MYENV", "myenv"}]}}} }, Conf)] end}. -%% modules_for_token_test_() -> -%% {setup, -%% fun() -> diff --git a/p11p-daemon/src/p11p_manager.erl b/p11p-daemon/src/p11p_manager.erl index 209d08e..6f9e977 100644 --- a/p11p-daemon/src/p11p_manager.erl +++ b/p11p-daemon/src/p11p_manager.erl @@ -85,6 +85,8 @@ handle_call({client_for_token, Server, TokNameIn}, _From, lager:debug("all clients: ~p", [ClientsIn]), {Clients, BalanceCount} = case VTokenIn#vtoken.balance_count of + -1 -> + {ClientsIn, -1}; 0 -> lager:debug("~p: balancing: next client", [self()]), Rotated = rotate_clients(ClientsIn), @@ -92,9 +94,7 @@ handle_call({client_for_token, Server, TokNameIn}, _From, {Rotated, First#client.balance - 1}; N when N > 0 -> lager:debug("~p: balancing: ~B more invocations", [self(), N]), - {ClientsIn, N - 1}; - -1 -> - {ClientsIn, -1} + {ClientsIn, N - 1} end, Current = hd(Clients), case Current#client.pid of -- cgit v1.1