Support send/receive (WIP)
chrzaszcz committed Jan 11, 2024
1 parent 9be45d4 commit 4b795bb
Showing 3 changed files with 296 additions and 61 deletions.
7 changes: 5 additions & 2 deletions include/tr.hrl
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
-record(msg, {to :: pid(), exists :: boolean()}).

-record(tr, {index :: pos_integer(),
pid :: pid(),
event :: call | return_from | exception_from,
mfa :: {module(), atom(), non_neg_integer()},
mfa :: mfa() | undefined,
data :: term(),
ts :: integer()}).
ts :: integer(),
extra :: #msg{} | undefined}).

-record(node, {module :: module(),
function :: atom(),
164 changes: 124 additions & 40 deletions src/tr.erl
Original file line number Diff line number Diff line change
start/0, start/1,
trace/1, trace/2,
trace/1, trace/2, trace/3,
-record(msg, {to :: pid(), exists :: boolean()}).

%% copied, not included from tr.hrl to make it self-contained
-record(tr, {index :: pos_integer(),
-record(tr, {index :: index(),
pid :: pid(),
event :: call | return | exception,
mfa :: mfa(),
event :: call | return | exception | send | recv,
mfa :: mfa() | undefined,
data :: term(),
ts :: integer()}).
ts :: integer(),
extra :: #msg{} | undefined}).

-define(is_return(Event), (Event =:= return orelse Event =:= exception)).
-define(is_msg(Event), (Event =:= send orelse Event =:= recv)).

-type tr() :: #tr{}.
-type pred() :: fun((tr()) -> boolean()).
-type own_time() :: non_neg_integer().
-type pids() :: [pid()] | all.
-type limit() :: pos_integer() | infinity. % used for a few different limits
-type index() :: non_neg_integer().
-type index() :: pos_integer().

-type init_options() :: #{tab => atom(),
index => index(),
limit := limit(),
trace := none | trace_spec()}.
-type trace_spec() :: #{modules := module_spec(),
pids := pids()}.
pids := pids(),
opts := trace_options()}.
-type module_spec() :: [module() | mfa()].
-type trace_options() :: #{msg => send | recv | all | none,
msg_trigger => always | after_traced_call}.
-type erlang_trace_flags() :: [call | timestamp | send | 'receive'].

-type range_options() :: #{tab => atom() | [tr()],
max_depth => limit()}.
-spec trace(module_spec()) -> ok.
trace(Modules) ->
gen_server:call(?MODULE, {start_trace, call, #{modules => Modules}}).
trace(Modules, all).

-spec trace(module_spec(), pids()) -> ok.
trace(Modules, Pids) ->
gen_server:call(?MODULE, {start_trace, call, #{modules => Modules,
pids => Pids}}).
trace(Modules, Pids, #{}).

-spec trace(module_spec(), pids(), trace_options()) -> ok.
trace(Modules, Pids, Opts) ->
DefaultOpts = #{msg => none, msg_trigger => after_traced_call},
Spec = #{modules => Modules, pids => Pids, opts => maps:merge(DefaultOpts, Opts)},
gen_server:call(?MODULE, {start_trace, call, Spec}).

-spec stop_tracing() -> ok.
stop_tracing() ->
Traces = foldl(fun(Tr, State) -> filter_trace(F, Tr, State) end, [], Tab),

-spec traceback(pred() | tr() | pos_integer()) -> [tr()].
-spec traceback(pred() | tr() | index()) -> [tr()].
traceback(Index) when is_integer(Index) ->
traceback(fun(#tr{index = I}) -> Index =:= I end);
traceback(T = #tr{}) ->
foldl(fun(T, State) -> tb_step(PredF, T, State) end, InitialState, Tab),
finalize_tracebacks(TBs, Output, Format, Options).

-spec range(pred() | tr() | pos_integer()) -> [tr()].
-spec range(pred() | tr() | index()) -> [tr()].
range(Index) when is_integer(Index) ->
range(fun(#tr{index = I}) -> Index =:= I end);
range(T = #tr{}) ->
do(#tr{event = call, mfa = {M, F, Arity}, data = Args}) when length(Args) =:= Arity ->
apply(M, F, Args).

-spec lookup(pos_integer()) -> tr().
-spec lookup(index()) -> tr().
lookup(Index) when is_integer(Index) ->
[T] = ets:lookup(tab(), Index),
Expand Down Expand Up @@ -344,9 +357,8 @@ init(Opts) ->
-spec handle_call(any(), {pid(), any()}, state()) -> {reply, ok, state()}.
handle_call({start_trace, call, Opts}, _From, State = #{trace := none}) ->
DefaultOpts = #{modules => [], pids => all},
{reply, ok, start_trace(State, maps:merge(DefaultOpts, Opts))};
handle_call({start_trace, call, Spec}, _From, State = #{trace := none}) ->
{reply, ok, start_trace(State, Spec)};
handle_call({stop_trace, call}, _From, State = #{trace := #{}}) ->
{reply, ok, stop_trace(State)};
handle_call(clean, _From, State = #{tab := Tab}) ->
index(Tab) ->
case ets:last(Tab) of
I when is_integer(I) -> I;
I when is_integer(I) -> I + 1;
_ -> initial_index()

start_trace(State, Spec = #{modules := ModSpecs, pids := Pids}) ->
start_trace(State, Spec = #{modules := ModSpecs, pids := Pids, opts := Opts}) ->
TracedPids = setup_msg_tracing(Opts),
[enable_trace_pattern(ModSpec) || ModSpec <- ModSpecs],
trace_pids(Pids, true, [call, timestamp]),
State#{trace := Spec}.
set_tracing(Pids, true, trace_flags(Opts)),
State#{trace := Spec, traced_pids => TracedPids}.

stop_trace(State = #{trace := #{modules := ModSpecs, pids := Pids}}) ->
trace_pids(Pids, false, [call, timestamp]),
stop_trace(State = #{trace := #{modules := ModSpecs, pids := Pids, opts := Opts},
traced_pids := TracedPids}) ->
set_tracing(Pids, false, trace_flags(Opts)),
[disable_trace_pattern(ModSpec) || ModSpec <- ModSpecs],
State#{trace := none}.
teardown_msg_tracing(Opts, TracedPids),
State#{trace := none, traced_pids := none}.

trace_flags(Opts) ->
basic_trace_flags() ++ msg_trace_flags(Opts).

-spec msg_trace_flags(trace_options()) -> erlang_trace_flags().
msg_trace_flags(#{msg := all}) -> [send, 'receive'];
msg_trace_flags(#{msg := send}) -> [send];
msg_trace_flags(#{msg := recv}) -> ['receive'];
msg_trace_flags(#{msg := none}) -> [].

trace_pids(all, How, FlagList) ->
-spec set_tracing(pids(), boolean(), erlang_trace_flags()) -> ok.
set_tracing(all, How, FlagList) ->
erlang:trace(all, How, FlagList),
trace_pids(Pids, How, FlagList) when is_list(Pids) ->
set_tracing(Pids, How, FlagList) when is_list(Pids) ->
[trace_pid(Pid, How, FlagList) || Pid <- Pids],

Expand All @@ -456,6 +483,25 @@ trace_pid(Pid, How, FlagList) when is_pid(Pid) ->
[How, Pid, Class, Reason])

setup_msg_tracing(#{msg := none}) ->
setup_msg_tracing(#{msg := _, msg_trigger := Trigger}) ->
erlang:trace_pattern('receive', [{['_', '$1', '_'], [{is_pid, '$1'}], []}], []),
case Trigger of
after_traced_call -> ets:new(traced_pids, []);
always -> none

teardown_msg_tracing(#{msg := none}, _) ->
teardown_msg_tracing(#{msg := _}, TracedPids) ->
case TracedPids of
none -> ok;
_ -> ets:delete(TracedPids)
%% restore default trace pattern
erlang:trace_pattern('receive', true, []).

enable_trace_pattern(ModSpec) ->
{MFA = {M, _, _}, Opts} = trace_pattern_and_opts(ModSpec),
{module, _} = code:ensure_loaded(M),
trace_pattern_and_opts({{_M, _F, _A} = MFA, Opts}) -> {MFA, Opts}.

handle_trace({trace_ts, Pid, call, MFA = {_, _, Args}, TS}, #{tab := Tab, index := I} = State) ->
NextIndex = next_index(I),
ets:insert(Tab, #tr{index = NextIndex, pid = Pid, event = call, mfa = mfarity(MFA), data = Args,
case are_messages_skipped(Pid, State) of
true -> stop_skipping_messages(Pid, State);
false -> ok
ets:insert(Tab, #tr{index = I, pid = Pid, event = call, mfa = mfarity(MFA), data = Args,
ts = usec_from_now(TS)}),
State#{index := NextIndex};
State#{index := I + 1};
handle_trace({trace_ts, Pid, return_from, MFArity, Res, TS}, #{tab := Tab, index := I} = State) ->
NextIndex = next_index(I),
ets:insert(Tab, #tr{index = NextIndex, pid = Pid, event = return, mfa = MFArity, data = Res,
ets:insert(Tab, #tr{index = I, pid = Pid, event = return, mfa = MFArity, data = Res,
ts = usec_from_now(TS)}),
State#{index := NextIndex};
State#{index := I + 1};
handle_trace({trace_ts, Pid, exception_from, MFArity, {Class, Value}, TS}, #{tab := Tab, index := I} = State) ->
NextIndex = next_index(I),
ets:insert(Tab, #tr{index = NextIndex, pid = Pid, event = exception, mfa = MFArity, data = {Class, Value},
ets:insert(Tab, #tr{index = I, pid = Pid, event = exception, mfa = MFArity, data = {Class, Value},
ts = usec_from_now(TS)}),
State#{index := NextIndex};
State#{index := I + 1};
handle_trace({trace_ts, Pid, Event, Msg, To, TS}, State)
when Event =:= send orelse Event =:= send_to_non_existing_process ->
case are_messages_skipped(Pid, State) of
true -> State;
false -> handle_send_trace(Pid, Event, Msg, To, TS, State)
handle_trace({trace_ts, Pid, 'receive', Msg, TS}, State) ->
case are_messages_skipped(Pid, State) of
true -> State;
false -> handle_recv_trace(Pid, Msg, TS, State)
handle_trace(Trace, State) ->
handle_send_trace(Pid, Event, Msg, To, TS, #{tab := Tab, index := I} = State) ->
Extra = #msg{to = To, exists = Event =:= send},
ets:insert(Tab, #tr{index = I, pid = Pid, event = send, data = Msg,
ts = usec_from_now(TS), extra = Extra}),
State#{index := I + 1}.

handle_recv_trace(Pid, Msg, TS, #{tab := Tab, index := I} = State) ->
ets:insert(Tab, #tr{index = I, pid = Pid, event = recv, data = Msg,
ts = usec_from_now(TS)}),
State#{index := I + 1}.

are_messages_skipped(_Pid, #{traced_pids := none}) ->
are_messages_skipped(Pid, #{traced_pids := TracedPidsTab}) ->
ets:lookup(TracedPidsTab, Pid) =:= [].

stop_skipping_messages(Pid, #{traced_pids := TracedPidsTab}) ->
ets:insert(TracedPidsTab, {Pid}).

-spec tb_step(pred(), tr(), map()) -> map().
Expand All @@ -501,7 +578,7 @@ tb_step(PredF, T = #tr{pid = Pid, event = Event},
case catch PredF(T) of
true when Count < Limit ->
TB = if Event =:= call -> NewStack;
?is_return(Event) -> CallStack
?is_return(Event) orelse ?is_msg(Event) -> CallStack
NewState#{tbs := add_tb(lists:reverse(TB), TBs, Output, Format),
count := Count + 1};
Expand All @@ -516,6 +593,8 @@ update_call_stack(#tr{event = Event, mfa = MFArity}, [#tr{mfa = MFArity} | Stack
update_call_stack(#tr{event = Event, mfa = {M, F, Arity}}, Stack) when ?is_return(Event) ->
logger:warning("Found a return trace from ~p:~p/~p without a call trace", [M, F, Arity]),
update_call_stack(#tr{event = Event}, Stack) when ?is_msg(Event) ->

Expand Down Expand Up @@ -584,13 +663,20 @@ filter_range(_PredF, T = #tr{event = Event}, State = #{depth := Depth, trace :=
filter_range(_PredF, #tr{event = Event}, State = #{depth := Depth, trace := Trace}, _) when ?is_return(Event) ->
{incomplete, State#{depth => Depth - 1, trace => Trace}};
filter_range(_PredF, #tr{event = Event}, no_state, _) when ?is_return(Event) ->
filter_range(_PrefF, T = #tr{event = Event}, State = #{depth := Depth, trace := Trace}, MaxDepth)
when Depth < MaxDepth, ?is_msg(Event) ->
{incomplete, State#{depth => Depth, trace => [T|Trace]}};
filter_range(_PrefF, #tr{event = Event}, _State, _MaxDepth) when ?is_msg(Event) ->

sort_by_time(MapStat) ->
lists:keysort(3, [{Key, Count, AccTime, OwnTime} || {Key, {Count, AccTime, OwnTime}} <- maps:to_list(MapStat)]).

call_stat_step(_KeyF, #tr{event = Event}, State) when ?is_msg(Event) ->
call_stat_step(KeyF, Tr = #tr{pid = Pid}, {ProcessStates, TmpStat, Stat}) ->
{LastTr, LastKey, Stack} = maps:get(Pid, ProcessStates, {no_tr, no_key, []}),
{NewStack, Key} = get_key_and_update_stack(KeyF, Stack, Tr),
-spec call_tree_stat_step(tr(), call_tree_stat_state()) -> call_tree_stat_state().
call_tree_stat_step(#tr{event = Event}, State) when ?is_msg(Event) ->
call_tree_stat_step(Tr = #tr{pid = Pid, ts = TS}, State = #{pid_states := PidStates, tab := TreeTab}) ->
PidState = maps:get(Pid, PidStates, []),
Item = simplify_trace_item(Tr),
-spec initial_index() -> index().
initial_index() ->

-spec next_index(index()) -> index().
next_index(I) ->
I + 1.

default_tab() ->
