%%% Copyright (c) 2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(statusreport). -behaviour(gen_server). -export([start_link/0]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -export([report/4]). -export([report_multi/4, bench/6]). -record(state, { timer :: none|reference(), nodename :: string(), statusreports :: dict:dict(), lastsent :: integer() }). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> process_flag(trap_exit, true), ReportInterval = application:get_env(plop, status_report_interval, 1000), lager:info("~p: starting", [?MODULE]), HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000), erlang:start_timer(HeartbeatInterval, self(), heartbeat), {ok, #state{timer = none, nodename = http_auth:own_name(), statusreports = dict:new(), lastsent = plop_compat:monotonic_time(millisecond) - ReportInterval}}. store_status(State, Service, Target, Variable, Status) -> Statusreports = dict:store({{status, Service}, {Target, Variable}}, {single, Status}, State#state.statusreports), State#state{statusreports = Statusreports}. dict_append_set(Key, Value, Dict) -> AppendSet = sets:from_list([Value]), dict:update(Key, fun ({multi, Old}) -> {multi, sets:union(Old, AppendSet)} end, {multi, AppendSet}, Dict). store_multi_status(State, Service, Target, Variable, Status) -> Statusreports = dict_append_set({{status, Service}, {Target, Variable}}, Status, State#state.statusreports), State#state{statusreports = Statusreports}. store_bench(State, Service, Target, Tag, Seq, Starttime, Elapsed) -> Statusreports = dict:store({{bench, Service}, {Target, Tag, Seq}}, {Starttime, Elapsed}, State#state.statusreports), State#state{statusreports = Statusreports}. store_set_status(State, Service, Target, Variable, Statuses) -> Statusreports = dict:store({{status, Service}, {Target, Variable}}, {multi, Statuses}, State#state.statusreports), State#state{statusreports = Statusreports}. heartbeat(State) -> {ok, ConfigVersion} = plopconfig:get_env(version), RunningApps = [atom_to_list(App) ++ " " ++ Vsn || {App, _Desc, Vsn} <- application:which_applications()], NewState1 = store_status(State, "heartbeat", "", "configversion", ConfigVersion), NewState2 = store_set_status(NewState1, "heartbeat", "", "applications", sets:from_list(RunningApps)), NewState2. handle_call(_, _From, State) -> {noreply, State}. handle_cast({report, Service, Target, Variable, Status}, State) -> NewState = store_status(State, Service, Target, Variable, Status), {noreply, try_send(NewState)}; handle_cast({report_multi, Service, Target, Variable, Status}, State) -> NewState = store_multi_status(State, Service, Target, Variable, Status), {noreply, try_send(NewState)}; handle_cast({bench, Service, Target, Tag, Seq, Starttime, Elapsed}, State) -> NewState = store_bench(State, Service, Target, Tag, Seq, Starttime, Elapsed), {noreply, try_send(NewState)}. handle_info({timeout, _Timer, force_send}, State) -> lager:debug("statusreport timer timeout"), {noreply, force_send(State)}; handle_info({timeout, _Timer, heartbeat}, State) -> lager:debug("statusreport timer timeout"), HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000), erlang:start_timer(HeartbeatInterval, self(), heartbeat), NewState = heartbeat(State), {noreply, try_send(NewState)}. code_change(_OldVsn, State, _Extra) -> {ok, State}. cancel_timer(State) -> case State#state.timer of none -> none; _ -> erlang:cancel_timer(State#state.timer) end, State#state{timer = none}. set_timer(State) -> case State#state.timer of none -> ReportInterval = application:get_env(plop, status_report_interval, 1000), Timer = plop_compat:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]), State#state{timer = Timer}; _ -> State end. terminate(Reason, State) -> lager:info("~p terminating: ~p", [?MODULE, Reason]), NewState = cancel_timer(State), case Reason of shutdown -> force_send(NewState); _ -> none end, ok. group_by_service(Statusreports) -> dict:to_list( lists:foldl( fun ({{Service, Group}, Status}, Acc) -> dict:append(Service, {Group, Status}, Acc) end, dict:new(), dict:to_list(Statusreports))). encode_one_status(Status) when is_number(Status) -> Status; encode_one_status(Status) when is_list(Status) -> list_to_binary(Status); encode_one_status(Status) when is_binary(Status) -> Status; encode_one_status(null) -> null. encode_status({single, Status}) -> encode_one_status(Status); encode_status({multi, Statuses}) -> lists:map(fun encode_one_status/1, sets:to_list(Statuses)). encode_report(status, Nodename, {{Target, Variable}, Status}) -> {struct, [{"target", list_to_binary(Target)}, {"source", list_to_binary(Nodename)}, {"key", list_to_binary(Variable)}, {"value", encode_status(Status)}]}; encode_report(bench, Nodename, {{Target, Tag, Seq}, {Starttime, Elapsed}}) -> {struct, [{"target", list_to_binary(Target)}, {"source", list_to_binary(Nodename)}, {"tag", list_to_binary(Tag)}, {"seq", Seq}, {"starttime", Starttime}, {"elapsed", Elapsed}]}. addresses_for_servicetype(status) -> plopconfig:get_env(statusservers, []); addresses_for_servicetype(bench) -> plopconfig:get_env(benchservers, []). send({ServiceType, Service}, Statusreports, Nodename) -> lager:debug("reporting status to ~p ~p: ~p", [ServiceType, Service, Statusreports]), NodeAddresses = addresses_for_servicetype(ServiceType), DebugTag = "statusreport", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary( mochijson2:encode( [ encode_report(ServiceType, Nodename, Statusreport) || Statusreport <- Statusreports ])), lists:foreach(fun (NodeAddress) -> send_one_server(DebugTag, NodeAddress ++ Service, Headers, RequestBody) end, NodeAddresses). send_one_server(DebugTag, URL, Headers, RequestBody) -> case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of {error, Err} -> lager:debug("request error ~p ~p", [DebugTag, Err]); {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> lager:debug("request failure ~p ~p", [DebugTag, StatusCode]); {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> case (catch mochijson2:decode(Body)) of {error, Err} -> lager:debug("error returned ~p ~p", [DebugTag, Err]); {struct, _PropList} -> none end end. force_send(State) -> lists:foreach(fun ({Service, Statusreports}) -> send(Service, Statusreports, State#state.nodename) end, group_by_service(State#state.statusreports)), NewState = cancel_timer(State), NewState#state{statusreports = dict:new(), lastsent = plop_compat:monotonic_time(millisecond)}. try_send(State) -> ReportInterval = application:get_env(plop, status_report_interval, 1000), NextSend = State#state.lastsent + ReportInterval, Now = plop_compat:monotonic_time(millisecond), if NextSend > Now -> lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]), set_timer(State); true -> lager:debug("status report sent long enough ago"), force_send(State) end. report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status); Status == null -> lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). report_multi(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) -> lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). bench(Service, Target, Tag, Seq, Starttime, Elapsed) -> lager:debug("reporting bench ~p ~p ~p ~p ~p ~p", [Service, Target, Tag, Seq, Starttime, Elapsed]), gen_server:cast(?MODULE, {bench, Service, Target, Tag, Seq, Starttime, Elapsed}).