%%% Copyright (c) 2014, NORDUnet A/S. %%% See LICENSE for licensing information. -module(fsyncport). -behaviour(gen_server). -export([start_link/0, stop/0]). -export([fsync/1, fsyncall/2]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [code:priv_dir(plop) ++ "/fsynchelper"], []). stop() -> gen_server:call(?MODULE, stop). fsync(Path) -> gen_server:call(?MODULE, {fsync, [Path]}). fsyncall([], _Timeout) -> ok; fsyncall(Paths, Timeout) -> gen_server:call(?MODULE, {fsync, Paths}, Timeout). -record(state, {idleports, busyports, waiting, requests}). init(ExtPrg) -> lager:debug("starting fsync service"), process_flag(trap_exit, true), Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg}, [{packet, 2}]) end, lists:seq(1, 32)), lager:debug("fsync service started", []), {ok, #state{idleports = Ports, busyports = dict:new(), waiting = queue:new(), requests = dict:new()}}. handle_call(stop, _From, State) -> lager:debug("fsync stop request received"), lists:foreach(fun (Port) -> Port ! {self(), close} end, State#state.idleports), lists:foreach(fun ({Port, {_Caller, _Starttime}}) -> Port ! {self(), close} end, dict:to_list(State#state.busyports)), receive {Port, closed} when is_port(Port) -> exit(normal) %% XXX exits when first port is closed end, {stop, normal, stopped, State}; handle_call({fsync, Paths}, From, State) -> lager:debug("fsync incoming request: ~p", [Paths]), AddedState = lists:foldl( fun(Path, StateAcc) -> add_subrequest(StateAcc, From, Path) end, State, Paths), NewState = dequeue(AddedState), {noreply, NewState}. check_overload(State) -> case queue:is_empty(State#state.waiting) of true -> none; false -> lager:info("~p fsync requests waiting", [queue:len(State#state.waiting)]) end. add_subrequest(State, From, Path) -> State#state{ waiting = queue:in({From, Path}, State#state.waiting), requests = dict:update_counter(From, 1, State#state.requests) }. dequeue(State) -> case try_dequeue(State) of {last, NewState} -> check_overload(NewState), NewState; {continue, NewState} -> dequeue(NewState) end. try_dequeue(State) -> case State#state.idleports of [] -> {last, State}; [Port | Rest] -> case queue:out(State#state.waiting) of {empty, _} -> {last, State}; {{value, {Caller, Path}}, Waiting} -> lager:debug("fsync port ~p assigned to request ~p", [Port, Path]), Port ! {self(), {command, Path}}, {continue, State#state{ idleports = Rest, busyports = dict:store(Port, {Caller, os:timestamp()}, State#state.busyports), waiting = Waiting }} end end. subrequest_finished(State, Caller, Data) -> NewRequests = case dict:fetch(Caller, State#state.requests) of 1 -> gen_server:reply(Caller, list_to_atom(Data)), dict:erase(Caller, State#state.requests); _ -> dict:update_counter(Caller, -1, State#state.requests) end, State#state{requests = NewRequests}. handle_info({Port, {data, Data}}, State) when is_port(Port) -> lager:debug("fsync request finished: ~p", [Port]), {Caller, Starttime} = dict:fetch(Port, State#state.busyports), Stoptime = os:timestamp(), statreport({fsync, Stoptime, Starttime}), State2 = subrequest_finished(State, Caller, Data), State3 = State2#state{ busyports = dict:erase(Port, State2#state.busyports), idleports = [Port | State2#state.idleports] }, State4 = dequeue(State3), {noreply, State4}; handle_info({'EXIT', Port, _Reason}, State) when is_port(Port) -> lager:debug("fsync port ~p exited, exiting", [Port]), {stop, portexit, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. handle_cast(_Request, State) -> {noreply, State}. terminate(Reason, _State) -> lager:info("fsyncport terminating: ~p", [Reason]), ok. statreport(_Entry) -> none.