Skip to content

Commit

Permalink
Use a separate tracer process
Browse files Browse the repository at this point in the history
The main motivation is to quickly stop tracing when needed without
waiting for the queued trace messages to be handled.
This way the tracer can be stopped more reliably.
  • Loading branch information
chrzaszcz committed Feb 7, 2024
1 parent e1c7ca5 commit 84b98ab
Showing 1 changed file with 80 additions and 69 deletions.
149 changes: 80 additions & 69 deletions src/tr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
index := index(),
limit := limit(),
trace := none | trace_spec(),
traced_pids := traced_pids_tab()}.
tracer_pid := none | pid()}.

-type trace_spec() :: #{modules := module_spec(),
pids := pids(),
Expand Down Expand Up @@ -509,9 +509,10 @@ ts(#tr{ts = TS}) -> calendar:system_time_to_rfc3339(TS, [{unit, microsecond}]).
%% @private
-spec init(init_options()) -> {ok, state()}.
init(Opts) ->
process_flag(trap_exit, true),
Defaults = #{tab => default_tab(), index => initial_index(), limit => infinity},
FinalOpts = #{tab := Tab} = maps:merge(Defaults, Opts),
State = maps:merge(FinalOpts, #{trace => none, traced_pids => none}),
State = maps:merge(FinalOpts, #{trace => none, tracer_pid => none}),
create_tab(Tab),
{ok, State}.

Expand Down Expand Up @@ -558,20 +559,24 @@ handle_cast(Msg, State) ->

%% @private
-spec handle_info(any(), state()) -> {noreply, state()}.
handle_info(Trace, State = #{trace := none}) when ?is_trace(Trace) ->
{noreply, State};
handle_info(Trace, State = #{index := I, limit := Limit}) when ?is_trace(Trace), I >= Limit ->
logger:warning("Reached trace limit ~p, stopping the tracer.", [Limit]),
{noreply, stop_trace(State)};
handle_info(Trace, State) when ?is_trace(Trace) ->
{noreply, handle_trace(Trace, State)};
handle_info({'EXIT', TracerPid, Reason}, State = #{tracer_pid := TracerPid}) ->
handle_tracer_exit(Reason),
{noreply, stop_trace(State#{tracer_pid := none})};
handle_info(Msg, State) ->
logger:error("Unexpected message ~p.", [Msg]),
{noreply, State}.

handle_tracer_exit(shutdown) ->
ok;
handle_tracer_exit(Reason) ->
logger:error("Tracer exited with reason ~p.", [Reason]).

%% @private
-spec terminate(any(), state()) -> ok.
terminate(_Reason, #{}) ->
terminate(_Reason, #{trace := none}) ->
ok;
terminate(_, State) ->
stop_trace(State),
ok.

%% @private
Expand Down Expand Up @@ -614,22 +619,27 @@ index(Tab) ->

-spec start_trace(state(), trace_spec()) -> state().
start_trace(State, Spec = #{modules := ModSpecs, pids := Pids}) ->
TracedPidsTab = setup_msg_tracing(Spec),
TracerPid = spawn_link(fun() -> start_tracer_loop(State, Spec) end),
[enable_trace_pattern(ModSpec) || ModSpec <- ModSpecs],
set_tracing(Pids, true, trace_flags(Spec)),
State#{trace := Spec, traced_pids := TracedPidsTab}.
set_tracing(Pids, true, trace_flags(TracerPid, Spec)),
State#{trace := Spec, tracer_pid := TracerPid}.

-spec stop_trace(state()) -> state().
stop_trace(State = #{trace := Spec = #{modules := ModSpecs, pids := Pids},
traced_pids := TracedPidsTab}) ->
set_tracing(Pids, false, trace_flags(Spec)),
tracer_pid := TracerPid}) ->
case TracerPid of
none ->
ok;
_ ->
exit(TracerPid, shutdown),
receive {'EXIT', TracerPid, Reason} -> handle_tracer_exit(Reason) end
end,
[disable_trace_pattern(ModSpec) || ModSpec <- ModSpecs],
teardown_msg_tracing(Spec, TracedPidsTab),
State#{trace := none, traced_pids := none}.
State#{trace := none, tracer_pid := none}.

-spec trace_flags(trace_spec()) -> erlang_trace_flags().
trace_flags(Spec) ->
basic_trace_flags() ++ msg_trace_flags(Spec).
-spec trace_flags(pid(), trace_spec()) -> erlang_trace_flags().
trace_flags(TracerPid, Spec) ->
[{tracer, TracerPid} | basic_trace_flags()] ++ msg_trace_flags(Spec).

-spec msg_trace_flags(trace_spec()) -> erlang_trace_flags().
msg_trace_flags(#{msg := all}) -> [send, 'receive'];
Expand Down Expand Up @@ -664,15 +674,6 @@ setup_msg_tracing(#{msg := _, msg_trigger := Trigger}) ->
always -> none
end.

-spec teardown_msg_tracing(trace_spec(), traced_pids_tab()) -> any().
teardown_msg_tracing(#{msg := none}, _) ->
ok;
teardown_msg_tracing(#{msg := _}, TracedPidsTab) ->
case TracedPidsTab of
none -> ok;
_ -> ets:delete(TracedPidsTab)
end.

enable_trace_pattern(ModSpec) ->
{MFA = {M, _, _}, Opts} = trace_pattern_and_opts(ModSpec),
{module, _} = code:ensure_loaded(M),
Expand All @@ -687,55 +688,65 @@ trace_pattern_and_opts({_, _, _} = MFA) -> {MFA, [local, call_time]};
trace_pattern_and_opts({Mod, Opts}) when is_atom(Mod) -> {{Mod, '_', '_'}, Opts};
trace_pattern_and_opts({{_M, _F, _A} = MFA, Opts}) -> {MFA, Opts}.

handle_trace({trace_ts, Pid, call, MFA = {_, _, Args}, TS}, #{tab := Tab, index := I} = State) ->
case are_messages_skipped(Pid, State) of
true -> stop_skipping_messages(Pid, State);
-spec start_tracer_loop(state(), trace_spec()) -> no_return().
start_tracer_loop(#{tab := Tab, index := Index, limit := Limit}, Spec) ->
PidsTab = setup_msg_tracing(Spec),
loop(Tab, Index, Limit, PidsTab).

-spec loop(table(), index(), limit(), traced_pids_tab()) -> no_return().
loop(_Tab, Index, Limit, _PidsTab) when Index > Limit ->
logger:warning("Reached trace limit ~p, stopping the tracer.", [Limit]),
exit(shutdown);
loop(Tab, Index, Limit, PidsTab) ->
receive
Msg ->
NewIndex = case handle_trace(Msg, Index, PidsTab) of
skip ->
Index;
Tr ->
ets:insert(Tab, Tr),
Index + 1
end,
loop(Tab, NewIndex, Limit, PidsTab)
end.

-spec handle_trace(term(), index(), traced_pids_tab()) -> tr() | skip.
handle_trace({trace_ts, Pid, call, MFA = {_, _, Args}, TS}, Index, PidsTab) ->
case are_messages_skipped(Pid, PidsTab) of
true -> stop_skipping_messages(Pid, PidsTab);
false -> ok
end,
ets:insert(Tab, #tr{index = I, pid = Pid, event = call, mfa = mfarity(MFA), data = Args,
ts = usec_from_now(TS)}),
State#{index := I + 1};
handle_trace({trace_ts, Pid, return_from, MFArity, Res, TS}, #{tab := Tab, index := I} = State) ->
ets:insert(Tab, #tr{index = I, pid = Pid, event = return, mfa = MFArity, data = Res,
ts = usec_from_now(TS)}),
State#{index := I + 1};
handle_trace({trace_ts, Pid, exception_from, MFArity, {Class, Value}, TS}, #{tab := Tab, index := I} = State) ->
ets:insert(Tab, #tr{index = I, pid = Pid, event = exception, mfa = MFArity, data = {Class, Value},
ts = usec_from_now(TS)}),
State#{index := I + 1};
handle_trace({trace_ts, Pid, Event, Msg, To, TS}, State)
#tr{index = Index, pid = Pid, event = call, mfa = mfarity(MFA), data = Args,
ts = usec_from_now(TS)};
handle_trace({trace_ts, Pid, return_from, MFArity, Res, TS}, Index, _PidsTab) ->
#tr{index = Index, pid = Pid, event = return, mfa = MFArity, data = Res,
ts = usec_from_now(TS)};
handle_trace({trace_ts, Pid, exception_from, MFArity, {Class, Value}, TS}, Index, _PidsTab) ->
#tr{index = Index, pid = Pid, event = exception, mfa = MFArity, data = {Class, Value},
ts = usec_from_now(TS)};
handle_trace({trace_ts, Pid, Event, Msg, To, TS}, Index, PidsTab)
when Event =:= send orelse Event =:= send_to_non_existing_process ->
case are_messages_skipped(Pid, State) of
true -> State;
false -> handle_send_trace(Pid, Event, Msg, To, TS, State)
case are_messages_skipped(Pid, PidsTab) of
true -> skip;
false -> #tr{index = Index, pid = Pid, event = send, data = Msg, ts = usec_from_now(TS),
info = {To, Event =:= send}}
end;
handle_trace({trace_ts, Pid, 'receive', Msg, TS}, State) ->
case are_messages_skipped(Pid, State) of
true -> State;
false -> handle_recv_trace(Pid, Msg, TS, State)
handle_trace({trace_ts, Pid, 'receive', Msg, TS}, Index, PidsTab) ->
case are_messages_skipped(Pid, PidsTab) of
true -> skip;
false -> #tr{index = Index, pid = Pid, event = recv, data = Msg, ts = usec_from_now(TS)}
end;
handle_trace(Trace, State) ->
handle_trace(Trace, _Index, _State) ->
logger:error("Unexpected trace message ~p.", [Trace]),
State.

handle_send_trace(Pid, Event, Msg, To, TS, #{tab := Tab, index := I} = State) ->
Info = {To, Event =:= send},
ets:insert(Tab, #tr{index = I, pid = Pid, event = send, data = Msg,
ts = usec_from_now(TS), info = Info}),
State#{index := I + 1}.

handle_recv_trace(Pid, Msg, TS, #{tab := Tab, index := I} = State) ->
ets:insert(Tab, #tr{index = I, pid = Pid, event = recv, data = Msg,
ts = usec_from_now(TS)}),
State#{index := I + 1}.
skip.

are_messages_skipped(_Pid, #{traced_pids := none}) ->
are_messages_skipped(_Pid, none) ->
false;
are_messages_skipped(Pid, #{traced_pids := TracedPidsTab}) ->
ets:lookup(TracedPidsTab, Pid) =:= [].
are_messages_skipped(Pid, PidsTab) ->
ets:lookup(PidsTab, Pid) =:= [].

stop_skipping_messages(Pid, #{traced_pids := TracedPidsTab}) ->
ets:insert(TracedPidsTab, {Pid}).
stop_skipping_messages(Pid, PidsTab) ->
ets:insert(PidsTab, {Pid}).

%% Filter tracebacks

Expand Down

0 comments on commit 84b98ab

Please sign in to comment.