diff --git a/guides/telemetry.md b/guides/telemetry.md index d895c666..d4668b81 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -78,6 +78,22 @@ measurements: #{count := 1} metadata: #{monotonic_time := integer(), name := atom()} ``` +### Throttle process internals + +Events related to internals of the throttle processes, these might expose unstable conditions you +might want to log or reconfigure: +```erlang +event_name: [amoc, throttle, process] +measurements: #{logger:level() => 1} +metadata: #{monotonic_time := integer(), + log_level := logger:level(), + msg := binary(), + rate => non_neg_integer(), + interval => non_neg_integer(), + state => map(), + _ => _} +``` + ## Coordinator Indicates when a coordinating event was raised, like a process being added for coordination or a timeout being triggered @@ -88,3 +104,26 @@ event_name: [amoc, coordinator, start | stop | add | reset | timeout] measurements: #{count := 1} metadata: #{monotonic_time := integer(), name := atom()} ``` + +## Config + +### Internal events +There are related to bad configuration events, they might deserve logging +```erlang +event_name: [amoc, config, get | verify | env] +measurements: #{logger:level() => 1} +metadata: #{monotonic_time := integer(), + log_level => logger:level(), + setting => atom(), + msg => binary(), _ => _} +``` + +## Cluster + +### Internal events +There are related to clustering events +```erlang +event_name: [amoc, cluster, connect_nodes | nodedown | master_node_down] +measurements: #{count => non_neg_integer()}, +metadata: #{nodes => nodes(), state => map()} +``` diff --git a/src/amoc_config/amoc_config.erl b/src/amoc_config/amoc_config.erl index 5e0b90df..0f1792ec 100644 --- a/src/amoc_config/amoc_config.erl +++ b/src/amoc_config/amoc_config.erl @@ -3,7 +3,6 @@ %% @doc TODO -module(amoc_config). --include_lib("kernel/include/logger.hrl"). -include("amoc_config.hrl"). -export([get/1, get/2]). @@ -20,13 +19,16 @@ get(Name) -> get(Name, Default) when is_atom(Name) -> case ets:lookup(amoc_config, Name) of [] -> - ?LOG_ERROR("no scenario setting ~p", [Name]), + amoc_telemetry:execute_log( + error, [config, get], #{setting => Name}, <<"no scenario setting">>), throw({invalid_setting, Name}); [#module_parameter{name = Name, value = undefined}] -> Default; [#module_parameter{name = Name, value = Value}] -> Value; InvalidLookupRet -> - ?LOG_ERROR("invalid lookup return value ~p ~p", [Name, InvalidLookupRet]), + amoc_telemetry:execute_log( + error, [config, get], #{setting => Name, return => InvalidLookupRet}, + <<"invalid lookup return value">>), throw({invalid_lookup_ret_value, InvalidLookupRet}) end. diff --git a/src/amoc_config/amoc_config_env.erl b/src/amoc_config/amoc_config_env.erl index d5a38e8d..f3512d2f 100644 --- a/src/amoc_config/amoc_config_env.erl +++ b/src/amoc_config/amoc_config_env.erl @@ -14,8 +14,6 @@ -export([get/1, get/2]). --include_lib("kernel/include/logger.hrl"). - -define(DEFAULT_PARSER_MODULE, amoc_config_parser). -callback(parse_value(string()) -> {ok, amoc_config:value()} | {error, any()}). @@ -41,12 +39,11 @@ get_os_env(Name, Default) -> case parse_value(Value, Default) of {ok, Term} -> Term; {error, Error} -> - ?LOG_ERROR("cannot parse environment variable, using default value.~n" - " parsing error: '~p'~n" - " variable name: '$~s'~n" - " variable value: '~s'~n" - " default value: '~p'~n", - [Error, EnvName, Value, Default]), + amoc_telemetry:execute_log( + error, [config, env], + #{error => Error, variable_name => EnvName, + variable_value => Value, default_value => Default}, + <<"cannot parse environment variable, using default value">>), Default end. diff --git a/src/amoc_config/amoc_config_verification.erl b/src/amoc_config/amoc_config_verification.erl index 94514c05..7e86cbb4 100644 --- a/src/amoc_config/amoc_config_verification.erl +++ b/src/amoc_config/amoc_config_verification.erl @@ -8,7 +8,6 @@ %% API -export([process_scenario_config/2]). --include_lib("kernel/include/logger.hrl"). -include("amoc_config.hrl"). %% @doc Applies the processing as provided by the `required_variable' list to the provided scenario config @@ -40,12 +39,17 @@ verify(Fun, Value) -> {true, NewValue} -> {true, NewValue}; {false, Reason} -> {false, {verification_failed, Reason}}; Ret -> - ?LOG_ERROR("invalid verification method ~p(~p), return value : ~p", - [Fun, Value, Ret]), + amoc_telemetry:execute_log( + error, [config, verify], + #{verification_method => Fun, verification_arg => Value, verification_return => Ret}, + <<"invalid verification method">>), {false, {invalid_verification_return_value, Ret}} catch C:E:S -> - ?LOG_ERROR("invalid verification method ~p(~p), exception: ~p ~p ~p", - [Fun, Value, C, E, S]), + amoc_telemetry:execute_log( + error, [config, verify], + #{verification_method => Fun, verification_arg => Value, + kind => C, reason => E, stacktrace => S}, + <<"invalid verification method">>), {false, {exception_during_verification, {C, E, S}}} end. diff --git a/src/amoc_distribution/amoc_cluster.erl b/src/amoc_distribution/amoc_cluster.erl index 3415d8d7..8266603c 100644 --- a/src/amoc_distribution/amoc_cluster.erl +++ b/src/amoc_distribution/amoc_cluster.erl @@ -5,8 +5,6 @@ -behaviour(gen_server). -define(SERVER, ?MODULE). --include_lib("kernel/include/logger.hrl"). - %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ @@ -134,7 +132,7 @@ handle_call(_Request, _From, State) -> -spec handle_cast(any(), state()) -> {noreply, state()}. handle_cast({connect_nodes, Nodes}, State) -> - ?LOG_INFO("{connect_nodes, ~p}, state: ~p", [Nodes, state_to_map(State)]), + raise_nodes_event(connect_nodes, Nodes, state_to_map(State)), NewState = handle_connect_nodes(Nodes, State), schedule_timer(NewState), {noreply, NewState}; @@ -149,11 +147,11 @@ handle_info(timeout, State) -> schedule_timer(NewState), {noreply, NewState}; handle_info({nodedown, Node}, #state{master = Node} = State) -> - ?LOG_ERROR("Master node ~p is down. Halting.", [Node]), + raise_nodes_event(master_node_down, [Node], state_to_map(State)), erlang:halt(), {noreply, State}; handle_info({nodedown, Node}, State) -> - ?LOG_ERROR("node ~p is down.", [Node]), + raise_nodes_event(nodedown, [Node], state_to_map(State)), {noreply, merge(connection_lost, [Node], State)}; handle_info(_Info, State) -> {noreply, State}. @@ -282,3 +280,8 @@ maybe_set_master(Node, #state{new_connection_action = Action}) -> %% to avoid a possibility of the amoc_cluster deadlock while %% running the Action call set_master_node/2 asynchronously spawn(fun() -> set_master_node(Node, Action) end). + +-spec raise_nodes_event(atom(), [node()], #{any() => any()}) -> ok. +raise_nodes_event(Name, Nodes, State) -> + amoc_telemetry:execute( + [cluster, Name], #{count => length(Nodes)}, #{nodes => Nodes, state => State}). diff --git a/src/amoc_telemetry.erl b/src/amoc_telemetry.erl index 56ec5df1..29e848da 100644 --- a/src/amoc_telemetry.erl +++ b/src/amoc_telemetry.erl @@ -2,7 +2,7 @@ %% @copyright 2023 Erlang Solutions Ltd. -module(amoc_telemetry). --export([execute/3]). +-export([execute/3, execute_log/4]). -spec execute(EventName, Measurements, Metadata) -> ok when EventName :: telemetry:event_name(), @@ -10,6 +10,15 @@ Metadata :: telemetry:event_metadata(). execute(Name, Measurements, Metadata) -> TimeStamp = erlang:monotonic_time(), - NameWithAmocPrefix = [amoc | Name], + PrefixedName = [amoc | Name], MetadataWithTS = Metadata#{monotonic_time => TimeStamp}, - telemetry:execute(NameWithAmocPrefix, Measurements, MetadataWithTS). + telemetry:execute(PrefixedName, Measurements, MetadataWithTS). + +-spec execute_log(Level, EventName, Metadata, Msg) -> ok when + Level :: logger:level(), + EventName :: telemetry:event_name(), + Metadata :: telemetry:event_metadata(), + Msg :: binary(). +execute_log(Level, Name, Metadata, Message) -> + MetadataWithLog = Metadata#{log_level => Level, msg => Message}, + execute(Name, #{Level => 1}, MetadataWithLog). diff --git a/src/amoc_throttle/amoc_throttle_process.erl b/src/amoc_throttle/amoc_throttle_process.erl index a144823a..9a526384 100644 --- a/src/amoc_throttle/amoc_throttle_process.erl +++ b/src/amoc_throttle/amoc_throttle_process.erl @@ -21,8 +21,6 @@ handle_continue/2, format_status/2]). --include_lib("kernel/include/logger.hrl"). - -define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute -record(state, {can_run_fn = true :: boolean(), @@ -76,7 +74,7 @@ get_state(Pid) -> -spec init(list()) -> {ok, state(), timeout()}. init([Name, Interval, Rate]) -> - InitialState = initial_state(Interval, Rate), + InitialState = initial_state(Name, Interval, Rate), StateWithTimer = maybe_start_timer(InitialState), {ok, StateWithTimer#state{name = Name}, timeout(InitialState)}. @@ -86,7 +84,7 @@ handle_info({'DOWN', _, process, _, _}, State) -> handle_info(delay_between_executions, State) -> {noreply, State#state{can_run_fn = true}, {continue, maybe_run_fn}}; handle_info(timeout, State) -> - log_state("is inactive", State), + internal_event(<<"is inactive">>, State), {noreply, State, {continue, maybe_run_fn}}. -spec handle_cast(term(), state()) -> @@ -100,9 +98,9 @@ handle_cast(resume_process, State) -> handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) -> amoc_throttle_controller:telemetry_event(Name, request), {noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}}; -handle_cast({update, Interval, Rate}, State) -> - NewState = merge_state(initial_state(Interval, Rate), State), - log_state("state update", NewState), +handle_cast({update, Interval, Rate}, #state{name = Name} = State) -> + NewState = merge_state(initial_state(Name, Interval, Rate), State), + internal_event(<<"state update">>, NewState), {noreply, NewState, {continue, maybe_run_fn}}. -spec handle_call(term(), term(), state()) -> @@ -128,23 +126,29 @@ format_status(_Opt, [_PDict, State]) -> %% internal functions %%------------------------------------------------------------------------------ -initial_state(Interval, 0) -> - ?LOG_ERROR("invalid rate, must be higher than zero"), - initial_state(Interval, 1); -initial_state(Interval, Rate) when Rate > 0 -> - case Rate < 5 of - true -> ?LOG_ERROR("too low rate, please reduce NoOfProcesses"); - false -> ok - end, - Delay = case {Interval, Interval div Rate, Interval rem Rate} of +initial_state(Name, Interval, Rate) when Rate >= 0 -> + NewRate = case {Rate =:= 0, Rate < 5} of + {true, _} -> + Msg = <<"invalid rate, must be higher than zero">>, + internal_error(Msg, Name, Rate, Interval), + 1; + {_, true} -> + Msg = <<"too low rate, please reduce NoOfProcesses">>, + internal_error(Msg, Name, Rate, Interval), + Rate; + {_, false} -> + Rate + end, + Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of {0, _, _} -> 0; %% limit only No of simultaneous executions {_, I, _} when I < 10 -> - ?LOG_ERROR("too high rate, please increase NoOfProcesses"), + Message = <<"too high rate, please increase NoOfProcesses">>, + internal_error(Message, Name, Rate, Interval), 10; {_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions; {_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1 end, - #state{interval = Interval, n = Rate, max_n = Rate, delay_between_executions = Delay}. + #state{interval = Interval, n = NewRate, max_n = NewRate, delay_between_executions = Delay}. merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = MaxN}, #state{n = OldN, max_n = OldMaxN} = OldState) -> @@ -201,25 +205,34 @@ async_runner(Fun) -> timeout(State) -> State#state.interval + ?DEFAULT_MSG_TIMEOUT. -inc_n(#state{n = N, max_n = MaxN} = State) -> +inc_n(#state{name = Name, n = N, max_n = MaxN} = State) -> NewN = N + 1, case MaxN < NewN of true -> PrintableState = printable_state(State), - ?LOG_ERROR("~nthrottle process ~p: invalid N (~p)~n", [self(), PrintableState]), + Msg = <<"throttle proccess has invalid N">>, + amoc_telemetry:execute_log( + error, [throttle, process], #{name => Name, n => NewN, state => PrintableState}, Msg), State#state{n = MaxN}; false -> State#state{n = NewN} end. -log_state(Msg, State) -> +-spec internal_event(binary(), state()) -> any(). +internal_event(Msg, #state{name = Name} = State) -> PrintableState = printable_state(State), - ?LOG_DEBUG("~nthrottle process ~p: ~s (~p)~n", [self(), Msg, PrintableState]). + amoc_telemetry:execute_log( + debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg). + +-spec internal_error(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any(). +internal_error(Msg, Name, Rate, Interval) -> + amoc_telemetry:execute_log( + error, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg). printable_state(#state{} = State) -> Fields = record_info(fields, state), [_ | Values] = tuple_to_list(State#state{schedule = [], schedule_reversed = []}), StateMap = maps:from_list(lists:zip(Fields, Values)), StateMap#{ - schedule:=length(State#state.schedule), - schedule_reversed:=length(State#state.schedule_reversed)}. + schedule := length(State#state.schedule), + schedule_reversed := length(State#state.schedule_reversed)}.