Skip to content

Commit

Permalink
fix(profiling): asyncio task fixes for stack v2 (#11493)
Browse files Browse the repository at this point in the history
This PR fixes 2 issues.

1. Associate `asyncio` tasks with spans

Task rendering begins with calling `render_task_begin`, and it is then
followed by call to `FrameStack::render()`. However, we only add span id
association via call to `render_thread_begin()` so tasks are missing
span id labels. To properly link spans to tasks, we add the label when
we create a new sample for task. And while debugging this issue,
discovered the following.


2. Propagate `asyncio` task names properly

Echion adds a dummy frame having task name, code
[here](https://github.com/taegyunkim/echion/blob/9d5bcc5867d7aefff73c837adcba4ef46eecebc6/echion/threads.h#L282).
When it tries to render tasks, it [peeks at the top of the
frame](https://github.com/taegyunkim/echion/blob/9d5bcc5867d7aefff73c837adcba4ef46eecebc6/echion/threads.h#L351)
stack's name and uses that. However, there could be frames preceding the
frame stack added by [this
line](https://github.com/taegyunkim/echion/blob/9d5bcc5867d7aefff73c837adcba4ef46eecebc6/echion/threads.h#L276).
Leading to using incorrect task names.



## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
taegyunkim authored Dec 10, 2024
1 parent b875079 commit 80b648d
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class StackRenderer : public RendererInterface
{
Sample* sample = nullptr;
ThreadState thread_state = {};
// Whether task name has been pushed for the current sample. Whenever
// the sample is created, this has to be reset.
bool pushed_task_name = false;

virtual void render_message(std::string_view msg) override;
virtual void render_thread_begin(PyThreadState* tstate,
Expand Down
26 changes: 23 additions & 3 deletions ddtrace/internal/datadog/profiling/stack_v2/src/stack_renderer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ StackRenderer::render_thread_begin(PyThreadState* tstate,
thread_state.cpu_time_ns = 0; // Walltime samples are guaranteed, but CPU times are not. Initialize to 0
// since we don't know if we'll get a CPU time here.

pushed_task_name = false;

// Finalize the thread information we have
ddup_push_threadinfo(sample, static_cast<int64_t>(thread_id), static_cast<int64_t>(native_id), name);
ddup_push_walltime(sample, thread_state.wall_time_ns, 1);
Expand All @@ -64,7 +66,7 @@ StackRenderer::render_thread_begin(PyThreadState* tstate,
}

void
StackRenderer::render_task_begin(std::string_view name)
StackRenderer::render_task_begin(std::string_view)
{
static bool failed = false;
if (failed) {
Expand All @@ -89,9 +91,18 @@ StackRenderer::render_task_begin(std::string_view name)
ddup_push_walltime(sample, thread_state.wall_time_ns, 1);
ddup_push_cputime(sample, thread_state.cpu_time_ns, 1); // initialized to 0, so possibly a no-op
ddup_push_monotonic_ns(sample, thread_state.now_time_ns);
}

ddup_push_task_name(sample, name);
// We also want to make sure the tid -> span_id mapping is present in the sample for the task
const std::optional<Span> active_span =
ThreadSpanLinks::get_instance().get_active_span_from_thread_id(thread_state.id);
if (active_span) {
ddup_push_span_id(sample, active_span->span_id);
ddup_push_local_root_span_id(sample, active_span->local_root_span_id);
ddup_push_trace_type(sample, std::string_view(active_span->span_type));
}

pushed_task_name = false;
}
}

void
Expand Down Expand Up @@ -119,6 +130,15 @@ StackRenderer::render_python_frame(std::string_view name, std::string_view file,
if (!utf8_check_is_valid(file.data(), file.size())) {
file = invalid;
}
// DEV: Echion pushes a dummy frame containing task name, and its line
// number is set to 0.
if (!pushed_task_name and line == 0) {
ddup_push_task_name(sample, name);
pushed_task_name = true;
// And return early to avoid pushing task name as a frame
return;
}

ddup_push_frame(sample, name, file, 0, line);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
fixes:
- |
Fixes an issue where ``asyncio`` task names are not properly propagated
when using stack v2, i.e. when ``DD_PROFILING_STACK_V2_ENABLED`` is set.
Fixes an issue where ``asyncio`` tasks are not associated with spans
when using stack v2, i.e. when ``DD_PROFILING_STACK_V2_ENABLED`` is set.
8 changes: 8 additions & 0 deletions tests/profiling_v2/collector/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pytest

import ddtrace


@pytest.fixture
def tracer():
return ddtrace.Tracer()
156 changes: 83 additions & 73 deletions tests/profiling_v2/collector/test_stack_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import asyncio
import glob
import os
import sys
import time

import pytest

from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.profiling import _asyncio
from ddtrace.profiling import profiler
from ddtrace.settings.profiling import config
from tests.profiling.collector import _asyncio_compat
from tests.profiling.collector import pprof_utils


@pytest.mark.skipif(sys.version_info < (3, 8), reason="stack v2 is available only on 3.8+ as echion does")
def test_asyncio(monkeypatch):
pprof_output_prefix = "/tmp/test_asyncio"
monkeypatch.setattr(config.stack, "v2_enabled", True)
monkeypatch.setattr(config, "output_pprof", pprof_output_prefix)
@pytest.mark.subprocess(
env=dict(
DD_PROFILING_OUTPUT_PPROF="/tmp/test_stack_asyncio",
DD_PROFILING_STACK_V2_ENABLED="true",
),
)
def test_asyncio():
import asyncio
import os
import time
import uuid

from ddtrace import ext
from ddtrace import tracer
from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.profiling import profiler
from tests.profiling.collector import _asyncio_compat
from tests.profiling.collector import pprof_utils

assert stack_v2.is_available, stack_v2.failure_msg

Expand All @@ -36,79 +39,86 @@ async def hello():
await stuff()
return (t1, t2)

p = profiler.Profiler()
resource = str(uuid.uuid4())
span_type = ext.SpanTypes.WEB

p = profiler.Profiler(tracer=tracer)
assert p._profiler._stack_v2_enabled
p.start()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if _asyncio_compat.PY38_AND_LATER:
with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span:
span_id = span.span_id
local_root_span_id = span._local_root.span_id

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
maintask = loop.create_task(hello(), name="main")
else:
maintask = loop.create_task(hello())

t1, t2 = loop.run_until_complete(maintask)
t1, t2 = loop.run_until_complete(maintask)
p.stop()

t1_name = _asyncio._task_get_name(t1)
t2_name = _asyncio._task_get_name(t2)
t1_name = t1.get_name()
t2_name = t2.get_name()

assert t1_name == "sleep 1"
assert t2_name == "sleep 2"

output_filename = pprof_output_prefix + "." + str(os.getpid())
output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())

profile = pprof_utils.parse_profile(output_filename)

samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id")
assert len(samples_with_span_id) > 0

# get samples with task_name
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
# The next fails if stack_v2 is not properly configured with asyncio task
# tracking via ddtrace.profiling._asyncio
assert len(samples) > 0

# We'd like to check whether there exist samples with
# 1. task name label "main"
# - function name label "hello"
# - and line number is between
# 2. task name label t1_name or t2_name
# - function name label "stuff"
# And they all have thread name "MainThread"

checked_main = False
checked_t1 = False
checked_t2 = False

for sample in samples:
task_name_label = pprof_utils.get_label_with_key(profile.string_table, sample, "task name")
task_name = profile.string_table[task_name_label.str]

thread_name_label = pprof_utils.get_label_with_key(profile.string_table, sample, "thread name")
thread_name = profile.string_table[thread_name_label.str]

location_id = sample.location_id[0]
location = pprof_utils.get_location_with_id(profile, location_id)
line = location.line[0]
function = pprof_utils.get_function_with_id(profile, line.function_id)
function_name = profile.string_table[function.name]

if task_name == "main":
assert thread_name == "MainThread"
assert function_name == "hello"
checked_main = True
elif task_name == t1_name or task_name == t2_name:
assert thread_name == "MainThread"
assert function_name == "stuff"
if task_name == t1_name:
checked_t1 = True
if task_name == t2_name:
checked_t2 = True

assert checked_main
assert checked_t1
assert checked_t2

# cleanup output file
for f in glob.glob(pprof_output_prefix + ".*"):
try:
os.remove(f)
except Exception as e:
print("Error removing file: {}".format(e))
pass
pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="main",
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="hello", filename="test_stack_asyncio.py", line_no=hello.__code__.co_firstlineno + 3
)
],
),
)

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name=t1_name,
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="stuff", filename="test_stack_asyncio.py", line_no=stuff.__code__.co_firstlineno + 3
),
],
),
)

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name=t2_name,
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="stuff", filename="test_stack_asyncio.py", line_no=stuff.__code__.co_firstlineno + 3
),
],
),
)

0 comments on commit 80b648d

Please sign in to comment.