diff --git a/guides/telemetry.md b/guides/telemetry.md index b80acfdb..f822f540 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -84,8 +84,12 @@ Events related to internals of the throttle processes, these might expose unstab might want to log or reconfigure: ```erlang event_name: [amoc, throttle, process] -measurements: #{msg := binary(), process := pid()} -metadata: #{monotonic_time := integer(), name := atom(), printable_state => map()} +measurements: #{logger:level() => 1} +metadata: #{log_level := logger:level(), + msg := binary(), + rate => non_neg_integer(), + state => map(), + _ => _} ``` ## Coordinator @@ -105,8 +109,8 @@ metadata: #{monotonic_time := integer(), name := atom()} There are related to bad configuration events, they might deserve logging ```erlang event_name: [amoc, config, get | verify | env] -measurements: #{} -metadata: #{log_class => syslog_level(), _ => _} +measurements: #{logger:level() => 1} +metadata: #{log_level => logger:level(), setting => atom(), msg => binary(), _ => _} ``` ## Cluster @@ -116,5 +120,5 @@ There are related to clustering events ```erlang event_name: [amoc, cluster, connect_nodes | nodedown | master_node_down] measurements: #{count => non_neg_integer()}, -metadata: #{node => node(), nodes => nodes(), state => map()} +metadata: #{nodes => nodes(), state => map()} ``` diff --git a/src/amoc_config/amoc_config.erl b/src/amoc_config/amoc_config.erl index c20f3375..0f1792ec 100644 --- a/src/amoc_config/amoc_config.erl +++ b/src/amoc_config/amoc_config.erl @@ -19,17 +19,16 @@ get(Name) -> get(Name, Default) when is_atom(Name) -> case ets:lookup(amoc_config, Name) of [] -> - telemetry:execute([amoc, config, get], #{}, - #{log_class => error, msg => <<"no scenario setting">>, - scenario => Name}), + amoc_telemetry:execute_log( + error, [config, get], #{setting => Name}, <<"no scenario setting">>), throw({invalid_setting, Name}); [#module_parameter{name = Name, value = undefined}] -> Default; [#module_parameter{name = Name, value = Value}] -> Value; InvalidLookupRet -> - telemetry:execute([amoc, config, get], #{}, - #{log_class => error, msg => <<"invalid lookup return value">>, - scenario => Name, return => InvalidLookupRet}), + amoc_telemetry:execute_log( + error, [config, get], #{setting => Name, return => InvalidLookupRet}, + <<"invalid lookup return value">>), throw({invalid_lookup_ret_value, InvalidLookupRet}) end. diff --git a/src/amoc_config/amoc_config_env.erl b/src/amoc_config/amoc_config_env.erl index dc0fb811..f3512d2f 100644 --- a/src/amoc_config/amoc_config_env.erl +++ b/src/amoc_config/amoc_config_env.erl @@ -39,11 +39,11 @@ get_os_env(Name, Default) -> case parse_value(Value, Default) of {ok, Term} -> Term; {error, Error} -> - telemetry:execute( - [amoc, config, env], #{error => 1}, - #{log_class => error, error => Error, variable_name => EnvName, - variable_value => Value, default_value => Default, - msg => <<"cannot parse environment variable, using default value">>}), + amoc_telemetry:execute_log( + error, [config, env], + #{error => Error, variable_name => EnvName, + variable_value => Value, default_value => Default}, + <<"cannot parse environment variable, using default value">>), Default end. diff --git a/src/amoc_config/amoc_config_verification.erl b/src/amoc_config/amoc_config_verification.erl index 71dc93d2..7e86cbb4 100644 --- a/src/amoc_config/amoc_config_verification.erl +++ b/src/amoc_config/amoc_config_verification.erl @@ -39,17 +39,17 @@ verify(Fun, Value) -> {true, NewValue} -> {true, NewValue}; {false, Reason} -> {false, {verification_failed, Reason}}; Ret -> - telemetry:execute([amoc, config, verify], #{error => 1}, - #{log_class => error, verification_method => Fun, - verification_arg => Value, verification_return => Ret, - msg => <<"invalid verification method">>}), + amoc_telemetry:execute_log( + error, [config, verify], + #{verification_method => Fun, verification_arg => Value, verification_return => Ret}, + <<"invalid verification method">>), {false, {invalid_verification_return_value, Ret}} catch C:E:S -> - telemetry:execute([amoc, config, verify], #{error => 1}, - #{log_class => error, verification_method => Fun, - verification_arg => Value, - kind => C, reason => E, stacktrace => S, - msg => <<"invalid verification method">>}), + amoc_telemetry:execute_log( + error, [config, verify], + #{verification_method => Fun, verification_arg => Value, + kind => C, reason => E, stacktrace => S}, + <<"invalid verification method">>), {false, {exception_during_verification, {C, E, S}}} end. diff --git a/src/amoc_distribution/amoc_cluster.erl b/src/amoc_distribution/amoc_cluster.erl index e736eca3..e9d95171 100644 --- a/src/amoc_distribution/amoc_cluster.erl +++ b/src/amoc_distribution/amoc_cluster.erl @@ -132,8 +132,7 @@ handle_call(_Request, _From, State) -> -spec handle_cast(any(), state()) -> {noreply, state()}. handle_cast({connect_nodes, Nodes}, State) -> - telemetry:execute([amoc, cluster, connect_nodes], #{count => length(Nodes)}, - #{nodes => Nodes, state => state_to_map(State)}), + execute_nodes(connect_nodes, Nodes, state_to_map(State)), NewState = handle_connect_nodes(Nodes, State), schedule_timer(NewState), {noreply, NewState}; @@ -148,14 +147,11 @@ handle_info(timeout, State) -> schedule_timer(NewState), {noreply, NewState}; handle_info({nodedown, Node}, #state{master = Node} = State) -> - telemetry:execute([amoc, cluster, master_node_down], - #{count => 1}, - #{node => Node, state => state_to_map(State)}), + execute_nodes(master_node_down, [Node], state_to_map(State)), erlang:halt(), {noreply, State}; handle_info({nodedown, Node}, State) -> - telemetry:execute([amoc, cluster, nodedown], #{count => 1}, - #{node => Node, state => state_to_map(State)}), + execute_nodes(nodedown, [Node], state_to_map(State)), {noreply, merge(connection_lost, [Node], State)}; handle_info(_Info, State) -> {noreply, State}. @@ -284,3 +280,8 @@ maybe_set_master(Node, #state{new_connection_action = Action}) -> %% to avoid a possibility of the amoc_cluster deadlock while %% running the Action call set_master_node/2 asynchronously spawn(fun() -> set_master_node(Node, Action) end). + +-spec execute_nodes(atom(), [node()], #{any() => any()}) -> ok. +execute_nodes(Name, Nodes, State) -> + PrefixedName = [amoc, cluster, Name], + telemetry:execute(PrefixedName, #{count => length(Nodes)}, #{nodes => Nodes, state => State}). diff --git a/src/amoc_telemetry.erl b/src/amoc_telemetry.erl index 56ec5df1..26534c94 100644 --- a/src/amoc_telemetry.erl +++ b/src/amoc_telemetry.erl @@ -2,7 +2,7 @@ %% @copyright 2023 Erlang Solutions Ltd. -module(amoc_telemetry). --export([execute/3]). +-export([execute/3, execute_log/4]). -spec execute(EventName, Measurements, Metadata) -> ok when EventName :: telemetry:event_name(), @@ -10,6 +10,16 @@ Metadata :: telemetry:event_metadata(). execute(Name, Measurements, Metadata) -> TimeStamp = erlang:monotonic_time(), - NameWithAmocPrefix = [amoc | Name], + PrefixedName = [amoc | Name], MetadataWithTS = Metadata#{monotonic_time => TimeStamp}, - telemetry:execute(NameWithAmocPrefix, Measurements, MetadataWithTS). + telemetry:execute(PrefixedName, Measurements, MetadataWithTS). + +-spec execute_log(Level, EventName, Metadata, Msg) -> ok when + Level :: logger:level(), + EventName :: telemetry:event_name(), + Metadata :: telemetry:event_metadata(), + Msg :: binary(). +execute_log(Level, Name, Metadata, Message) -> + PrefixedName = [amoc | Name], + MetadataWithLog = Metadata#{log_level => Level, msg => Message}, + telemetry:execute(PrefixedName, #{Level => 1}, MetadataWithLog). diff --git a/src/amoc_throttle/amoc_throttle_process.erl b/src/amoc_throttle/amoc_throttle_process.erl index 4271bb66..7503e15b 100644 --- a/src/amoc_throttle/amoc_throttle_process.erl +++ b/src/amoc_throttle/amoc_throttle_process.erl @@ -129,10 +129,14 @@ format_status(_Opt, [_PDict, State]) -> initial_state(Name, Interval, Rate) when Rate >= 0 -> NewRate = case {Rate =:= 0, Rate < 5} of {true, _} -> - internal_event(<<"invalid rate, must be higher than zero">>, Name), + Msg = <<"invalid rate, must be higher than zero">>, + amoc_telemetry:execute_log( + error, [throttle, process], #{name => Name, rate => Rate}, Msg), 1; {_, true} -> - internal_event(<<"too low rate, please reduce NoOfProcesses">>, Name), + Msg = <<"too low rate, please reduce NoOfProcesses">>, + amoc_telemetry:execute_log( + error, [throttle, process], #{name => Name, rate => Rate}, Msg), Rate; {_, false} -> Rate @@ -140,7 +144,9 @@ initial_state(Name, Interval, Rate) when Rate >= 0 -> Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of {0, _, _} -> 0; %% limit only No of simultaneous executions {_, I, _} when I < 10 -> - internal_event(<<"too high rate, please increase NoOfProcesses">>, Name), + Message = <<"too high rate, please increase NoOfProcesses">>, + amoc_telemetry:execute_log( + error, [throttle, process], #{name => Name, rate => Rate}, Message), 10; {_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions; {_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1 @@ -202,26 +208,24 @@ async_runner(Fun) -> timeout(State) -> State#state.interval + ?DEFAULT_MSG_TIMEOUT. -inc_n(#state{n = N, max_n = MaxN} = State) -> +inc_n(#state{name = Name, n = N, max_n = MaxN} = State) -> NewN = N + 1, case MaxN < NewN of true -> - internal_event(<<"throttle proccess has invalid N">>, State), + PrintableState = printable_state(State), + Msg = <<"throttle proccess has invalid N">>, + amoc_telemetry:execute_log( + error, [throttle, process], #{name => Name, n => NewN, state => PrintableState}, Msg), State#state{n = MaxN}; false -> State#state{n = NewN} end. +-spec internal_event(binary(), state()) -> any(). internal_event(Msg, #state{name = Name} = State) -> PrintableState = printable_state(State), - telemetry:execute([amoc, throttle, process], - #{msg => Msg, process => self()}, - #{printable_state => PrintableState, - monotonic_time => erlang:monotonic_time(), name => Name}); -internal_event(Msg, Name) when is_atom(Name) -> - telemetry:execute([amoc, throttle, process], - #{msg => Msg, process => self()}, - #{monotonic_time => erlang:monotonic_time(), name => Name}). + amoc_telemetry:execute_log( + debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg). printable_state(#state{} = State) -> Fields = record_info(fields, state),