diff --git a/tests/profiling/collector/test_stack.py b/tests/profiling/collector/test_stack.py index 690be4b183c..84f3ad60ea6 100644 --- a/tests/profiling/collector/test_stack.py +++ b/tests/profiling/collector/test_stack.py @@ -254,13 +254,12 @@ def test_ignore_profiler_single(): @pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent") -@pytest.mark.subprocess(ddtrace_run=True) +@pytest.mark.subprocess(ddtrace_run=True, env=dict(DD_PROFILING_IGNORE_PROFILER="1", DD_PROFILING_API_TIMEOUT="0.1")) def test_ignore_profiler_gevent_task(): import gevent.monkey gevent.monkey.patch_all() - import os import time from ddtrace.profiling import collector # noqa:F401 @@ -282,28 +281,22 @@ def collect(self): _fib(22) return [] - for ignore in (True, False): - os.environ["DD_PROFILING_API_TIMEOUT"] = "0.1" - os.environ["DD_PROFILING_IGNORE_PROFILER"] = str(ignore) - p = profiler.Profiler() - p.start() - # This test is particularly useful with gevent enabled: create a test collector that run often and for long - # we're sure to catch it with the StackProfiler and that it's not ignored. - c = CollectorTest(p._profiler._recorder, interval=0.00001) - c.start() + p = profiler.Profiler() + p.start() + # This test is particularly useful with gevent enabled: create a test collector that run often and for long + # we're sure to catch it with the StackProfiler and that it's not ignored. + c = CollectorTest(p._profiler._recorder, interval=0.00001) + c.start() - for _ in range(100): - events = p._profiler._recorder.reset() - ids = {e.task_id for e in events[stack_event.StackSampleEvent]} - if (c._worker.ident in ids) != str(ignore): - break - # Give some time for gevent to switch greenlets - time.sleep(0.1) - else: - raise AssertionError("ignore == " + ignore) + for _ in range(100): + events = p._profiler._recorder.reset() + ids = {e.task_id for e in events[stack_event.StackSampleEvent]} + if c._worker.ident in ids: + raise AssertionError("Collector thread found") + time.sleep(0.1) - c.stop() - p.stop(flush=False) + c.stop() + p.stop(flush=False) def test_collect(): diff --git a/tests/profiling/test_accuracy.py b/tests/profiling/test_accuracy.py index d5fcc030ef9..a332068e12b 100644 --- a/tests/profiling/test_accuracy.py +++ b/tests/profiling/test_accuracy.py @@ -31,16 +31,16 @@ def spend_16(): def spend_cpu_2(): - now = time.monotonic_ns() + now = time.process_time_ns() # Active wait for 2 seconds - while time.monotonic_ns() - now < 2e9: + while time.process_time_ns() - now < 2e9: pass def spend_cpu_3(): # Active wait for 3 seconds - now = time.monotonic_ns() - while time.monotonic_ns() - now < 3e9: + now = time.process_time_ns() + while time.process_time_ns() - now < 3e9: pass @@ -51,8 +51,12 @@ def spend_cpu_3(): CPU_TOLERANCE = 0.05 -def almost_equal(value, target, tolerance=TOLERANCE): - return abs(value - target) / target <= tolerance +def assert_almost_equal(value, target, tolerance=TOLERANCE): + if abs(value - target) / target > tolerance: + raise AssertionError( + f"Assertion failed: {value} is not approximately equal to {target} " + f"within tolerance={tolerance}, actual error={abs(value - target) / target}" + ) def total_time(time_data, funcname): @@ -66,7 +70,7 @@ def test_accuracy(): from ddtrace.profiling import profiler from ddtrace.profiling.collector import stack_event from tests.profiling.test_accuracy import CPU_TOLERANCE - from tests.profiling.test_accuracy import almost_equal + from tests.profiling.test_accuracy import assert_almost_equal from tests.profiling.test_accuracy import spend_16 from tests.profiling.test_accuracy import total_time @@ -85,20 +89,13 @@ def test_accuracy(): time_spent_ns[idx][frame[2]] += event.wall_time_ns cpu_spent_ns[idx][frame[2]] += event.cpu_time_ns - assert almost_equal(total_time(time_spent_ns, "spend_3"), 9e9) - assert almost_equal(total_time(time_spent_ns, "spend_1"), 2e9) - assert almost_equal(total_time(time_spent_ns, "spend_4"), 4e9) - assert almost_equal(total_time(time_spent_ns, "spend_16"), 16e9) - assert almost_equal(total_time(time_spent_ns, "spend_7"), 7e9) - - try: - from time import monotonic_ns # noqa:F401 - except ImportError: - # If we don't have access to high resolution clocks, we can't really test accurately things as it's spread in - # various Python implementation of monotonic, etc. - pass - else: - assert almost_equal(total_time(time_spent_ns, "spend_cpu_2"), 2e9) - assert almost_equal(total_time(time_spent_ns, "spend_cpu_3"), 3e9) - assert almost_equal(total_time(time_spent_ns, "spend_cpu_2"), 2e9, CPU_TOLERANCE) - assert almost_equal(total_time(time_spent_ns, "spend_cpu_3"), 3e9, CPU_TOLERANCE) + assert_almost_equal(total_time(time_spent_ns, "spend_3"), 9e9) + assert_almost_equal(total_time(time_spent_ns, "spend_1"), 2e9) + assert_almost_equal(total_time(time_spent_ns, "spend_4"), 4e9) + assert_almost_equal(total_time(time_spent_ns, "spend_16"), 16e9) + assert_almost_equal(total_time(time_spent_ns, "spend_7"), 7e9) + + assert_almost_equal(total_time(time_spent_ns, "spend_cpu_2"), 2e9) + assert_almost_equal(total_time(time_spent_ns, "spend_cpu_3"), 3e9) + assert_almost_equal(total_time(cpu_spent_ns, "spend_cpu_2"), 2e9, CPU_TOLERANCE) + assert_almost_equal(total_time(cpu_spent_ns, "spend_cpu_3"), 3e9, CPU_TOLERANCE) diff --git a/tests/profiling_v2/collector/test_stack.py b/tests/profiling_v2/collector/test_stack.py index af13a1ea237..74def22ed50 100644 --- a/tests/profiling_v2/collector/test_stack.py +++ b/tests/profiling_v2/collector/test_stack.py @@ -13,6 +13,7 @@ from ddtrace.profiling.collector import stack from ddtrace.settings.profiling import config from tests.profiling.collector import pprof_utils +from tests.profiling.collector import test_collector # Python 3.11.9 is not compatible with gevent, https://github.com/gevent/gevent/issues/2040 @@ -24,6 +25,43 @@ ) +# Use subprocess as ddup config persists across tests. +@pytest.mark.subprocess( + env=dict( + DD_PROFILING_MAX_FRAMES="5", + DD_PROFILING_OUTPUT_PPROF="/tmp/test_collect_truncate", + DD_PROFILING_STACK_V2_ENABLED="1", + ) +) +@pytest.mark.skipif(sys.version_info[:2] == (3, 7), reason="stack_v2 is not supported on Python 3.7") +def test_collect_truncate(): + import os + + from ddtrace.profiling import profiler + from tests.profiling.collector import pprof_utils + from tests.profiling.collector.test_stack import func1 + + pprof_prefix = os.environ["DD_PROFILING_OUTPUT_PPROF"] + output_filename = pprof_prefix + "." + str(os.getpid()) + + max_nframes = int(os.environ["DD_PROFILING_MAX_FRAMES"]) + + p = profiler.Profiler() + p.start() + + func1() + + p.stop() + + profile = pprof_utils.parse_profile(output_filename) + samples = pprof_utils.get_samples_with_value_type(profile, "wall-time") + assert len(samples) > 0 + for sample in samples: + # stack v2 adds one extra frame for "%d frames omitted" message + # Also, it allows max_nframes + 1 frames, so we add 2 here. + assert len(sample.location_id) <= max_nframes + 2, len(sample.location_id) + + @pytest.mark.parametrize("stack_v2_enabled", [True, False]) def test_stack_locations(stack_v2_enabled, tmp_path): if sys.version_info[:2] == (3, 7) and stack_v2_enabled: @@ -651,8 +689,23 @@ def _dofib(): assert checked_thread, "No samples found for the expected threads" +def test_max_time_usage(): + with pytest.raises(ValueError): + stack.StackCollector(None, max_time_usage_pct=0) + + +def test_max_time_usage_over(): + with pytest.raises(ValueError): + stack.StackCollector(None, max_time_usage_pct=200) + + @pytest.mark.parametrize( - ("stack_v2_enabled", "ignore_profiler"), [(True, True), (True, False), (False, True), (False, False)] + "stack_v2_enabled", + [True, False], +) +@pytest.mark.parametrize( + "ignore_profiler", + [True, False], ) def test_ignore_profiler(stack_v2_enabled, ignore_profiler, tmp_path): if sys.version_info[:2] == (3, 7) and stack_v2_enabled: @@ -691,3 +744,79 @@ def test_ignore_profiler(stack_v2_enabled, ignore_profiler, tmp_path): assert collector_worker_thread_id in thread_ids else: assert collector_worker_thread_id not in thread_ids + + +# TODO: support ignore profiler with stack_v2 and update this test +@pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent") +@pytest.mark.subprocess( + ddtrace_run=True, + env=dict(DD_PROFILING_IGNORE_PROFILER="1", DD_PROFILING_OUTPUT_PPROF="/tmp/test_ignore_profiler_gevent_task"), +) +def test_ignore_profiler_gevent_task(): + import gevent.monkey + + gevent.monkey.patch_all() + + import os + import time + import typing + + from ddtrace.profiling import collector + from ddtrace.profiling import event as event_mod + from ddtrace.profiling import profiler + from ddtrace.profiling.collector import stack + from tests.profiling.collector import pprof_utils + + def _fib(n): + if n == 1: + return 1 + elif n == 0: + return 0 + else: + return _fib(n - 1) + _fib(n - 2) + + class CollectorTest(collector.PeriodicCollector): + def collect(self) -> typing.Iterable[typing.Iterable[event_mod.Event]]: + _fib(22) + return [] + + output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + + p = profiler.Profiler() + + p.start() + + for c in p._profiler._collectors: + if isinstance(c, stack.StackCollector): + c.ignore_profiler + + c = CollectorTest(None, interval=0.00001) + c.start() + + time.sleep(3) + + worker_ident = c._worker.ident + + c.stop() + p.stop() + + profile = pprof_utils.parse_profile(output_filename + "." + str(os.getpid())) + + samples = pprof_utils.get_samples_with_value_type(profile, "cpu-time") + + thread_ids = set() + for sample in samples: + thread_id_label = pprof_utils.get_label_with_key(profile.string_table, sample, "thread id") + thread_id = int(thread_id_label.num) + thread_ids.add(thread_id) + + assert worker_ident not in thread_ids + + +def test_repr(): + test_collector._test_repr( + stack.StackCollector, + "StackCollector(status=, " + "recorder=Recorder(default_max_events=16384, max_events={}), min_interval_time=0.01, max_time_usage_pct=1.0, " + "nframes=64, ignore_profiler=False, endpoint_collection_enabled=None, tracer=None)", + ) diff --git a/tests/profiling_v2/simple_program.py b/tests/profiling_v2/simple_program.py new file mode 100755 index 00000000000..ed07bc5a402 --- /dev/null +++ b/tests/profiling_v2/simple_program.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +import os +import sys +import time + +from ddtrace.internal import service +from ddtrace.profiling import bootstrap +from ddtrace.profiling.collector import stack + + +for running_collector in bootstrap.profiler._profiler._collectors: + if isinstance(running_collector, stack.StackCollector): + break +else: + raise AssertionError("Unable to find stack collector") + + +print("hello world") +assert running_collector.status == service.ServiceStatus.RUNNING +print(running_collector.interval) + +t0 = time.time() +while time.time() - t0 < (running_collector.interval * 10): + pass + +# Do some serious memory allocations! +for _ in range(5000000): + object() + +print(os.getpid()) +print(bootstrap.profiler._profiler._stack_v2_enabled) +sys.exit(42) diff --git a/tests/profiling_v2/simple_program_fork.py b/tests/profiling_v2/simple_program_fork.py new file mode 100644 index 00000000000..ad8c0541ccd --- /dev/null +++ b/tests/profiling_v2/simple_program_fork.py @@ -0,0 +1,32 @@ +import os +import sys +import threading + +from ddtrace.internal import service +import ddtrace.profiling.auto +import ddtrace.profiling.bootstrap +import ddtrace.profiling.profiler + + +lock = threading.Lock() +lock.acquire() + + +assert ddtrace.profiling.bootstrap.profiler.status == service.ServiceStatus.RUNNING + + +child_pid = os.fork() +if child_pid == 0: + # Release it + lock.release() + + # We track this one though + lock = threading.Lock() + lock.acquire() + lock.release() +else: + lock.release() + assert ddtrace.profiling.bootstrap.profiler.status == service.ServiceStatus.RUNNING + print(child_pid) + pid, status = os.waitpid(child_pid, 0) + sys.exit(os.WEXITSTATUS(status)) diff --git a/tests/profiling_v2/simple_program_gevent.py b/tests/profiling_v2/simple_program_gevent.py new file mode 100644 index 00000000000..f50fa3aa2e0 --- /dev/null +++ b/tests/profiling_v2/simple_program_gevent.py @@ -0,0 +1,34 @@ +# Import from ddtrace before monkey patching to ensure that we grab all the +# necessary references to the unpatched modules. +import ddtrace.auto # noqa: F401, I001 +import ddtrace.profiling.auto # noqa:F401 + + +import gevent.monkey # noqa:F402 + +gevent.monkey.patch_all() + +import threading # noqa: E402, F402, I001 +import time # noqa: E402, F402 + + +def fibonacci(n): + if n == 0: + return 0 + elif n == 1: + return 1 + else: + return fibonacci(n - 1) + fibonacci(n - 2) + + +i = 1 +for _ in range(20): + threads = [] + for _ in range(10): + t = threading.Thread(target=fibonacci, args=(i,)) + t.start() + threads.append(t) + i += 1 + for t in threads: + t.join() + time.sleep(0.1) diff --git a/tests/profiling_v2/test_accuracy.py b/tests/profiling_v2/test_accuracy.py new file mode 100644 index 00000000000..61fbe3322ff --- /dev/null +++ b/tests/profiling_v2/test_accuracy.py @@ -0,0 +1,101 @@ +# -*- encoding: utf-8 -*- +import sys + +import pytest + + +@pytest.mark.subprocess( + env=dict(DD_PROFILING_MAX_TIME_USAGE_PCT="100", DD_PROFILING_OUTPUT_PPROF="/tmp/test_accuracy_libdd.pprof") +) +def test_accuracy_libdd(): + import collections + import os + + from ddtrace.profiling import profiler + from tests.profiling.collector import pprof_utils + from tests.profiling.test_accuracy import assert_almost_equal + from tests.profiling.test_accuracy import spend_16 + + # Set this to 100 so we don't sleep too often and mess with the precision. + p = profiler.Profiler() + p.start() + spend_16() + p.stop() + wall_times = collections.defaultdict(lambda: 0) + cpu_times = collections.defaultdict(lambda: 0) + profile = pprof_utils.parse_profile(os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())) + + for sample in profile.sample: + wall_time_index = pprof_utils.get_sample_type_index(profile, "wall-time") + + wall_time_spent_ns = sample.value[wall_time_index] + cpu_time_index = pprof_utils.get_sample_type_index(profile, "cpu-time") + cpu_time_spent_ns = sample.value[cpu_time_index] + + for location_id in sample.location_id: + location = pprof_utils.get_location_with_id(profile, location_id) + line = location.line[0] + function = pprof_utils.get_function_with_id(profile, line.function_id) + function_name = profile.string_table[function.name] + wall_times[function_name] += wall_time_spent_ns + cpu_times[function_name] += cpu_time_spent_ns + + assert_almost_equal(wall_times["spend_3"], 9e9) + assert_almost_equal(wall_times["spend_1"], 2e9) + assert_almost_equal(wall_times["spend_4"], 4e9) + assert_almost_equal(wall_times["spend_16"], 16e9) + assert_almost_equal(wall_times["spend_7"], 7e9) + + assert_almost_equal(wall_times["spend_cpu_2"], 2e9, tolerance=0.07) + assert_almost_equal(wall_times["spend_cpu_3"], 3e9, tolerance=0.07) + assert_almost_equal(cpu_times["spend_cpu_2"], 2e9, tolerance=0.07) + assert_almost_equal(cpu_times["spend_cpu_3"], 3e9, tolerance=0.07) + + +@pytest.mark.subprocess( + env=dict(DD_PROFILING_STACK_V2_ENABLED="1", DD_PROFILING_OUTPUT_PPROF="/tmp/test_accuracy_stack_v2.pprof") +) +@pytest.mark.skipif(sys.version_info[:2] == (3, 7), reason="stack_v2 is not supported on Python 3.7") +def test_accuracy_stack_v2(): + import collections + import os + + from ddtrace.profiling import profiler + from tests.profiling.collector import pprof_utils + from tests.profiling.test_accuracy import assert_almost_equal + from tests.profiling.test_accuracy import spend_16 + + # Set this to 100 so we don't sleep too often and mess with the precision. + p = profiler.Profiler() + p.start() + spend_16() + p.stop() + wall_times = collections.defaultdict(lambda: 0) + cpu_times = collections.defaultdict(lambda: 0) + profile = pprof_utils.parse_profile(os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())) + + for sample in profile.sample: + wall_time_index = pprof_utils.get_sample_type_index(profile, "wall-time") + + wall_time_spent_ns = sample.value[wall_time_index] + cpu_time_index = pprof_utils.get_sample_type_index(profile, "cpu-time") + cpu_time_spent_ns = sample.value[cpu_time_index] + + for location_id in sample.location_id: + location = pprof_utils.get_location_with_id(profile, location_id) + line = location.line[0] + function = pprof_utils.get_function_with_id(profile, line.function_id) + function_name = profile.string_table[function.name] + wall_times[function_name] += wall_time_spent_ns + cpu_times[function_name] += cpu_time_spent_ns + + assert_almost_equal(wall_times["spend_3"], 9e9) + assert_almost_equal(wall_times["spend_1"], 2e9) + assert_almost_equal(wall_times["spend_4"], 4e9) + assert_almost_equal(wall_times["spend_16"], 16e9) + assert_almost_equal(wall_times["spend_7"], 7e9) + + assert_almost_equal(wall_times["spend_cpu_2"], 2e9, tolerance=0.07) + assert_almost_equal(wall_times["spend_cpu_3"], 3e9, tolerance=0.07) + assert_almost_equal(cpu_times["spend_cpu_2"], 2e9, tolerance=0.07) + assert_almost_equal(cpu_times["spend_cpu_3"], 3e9, tolerance=0.07) diff --git a/tests/profiling_v2/test_main.py b/tests/profiling_v2/test_main.py new file mode 100644 index 00000000000..3142a1fbba8 --- /dev/null +++ b/tests/profiling_v2/test_main.py @@ -0,0 +1,227 @@ +# -*- encoding: utf-8 -*- +import multiprocessing +import os +import sys + +import pytest + +from tests.profiling.collector import lock_utils +from tests.profiling.collector import pprof_utils +from tests.utils import call_program +from tests.utils import flaky + + +@pytest.mark.parametrize("stack_v2_enabled", [True, False]) +def test_call_script(stack_v2_enabled): + if sys.version_info[:2] == (3, 7) and stack_v2_enabled: + pytest.skip("stack_v2 is not supported on Python 3.7") + env = os.environ.copy() + env["DD_PROFILING_ENABLED"] = "1" + env["DD_PROFILING_STACK_V2_ENABLED"] = "1" if stack_v2_enabled else "0" + stdout, stderr, exitcode, _ = call_program( + "ddtrace-run", sys.executable, os.path.join(os.path.dirname(__file__), "simple_program.py"), env=env + ) + if sys.platform == "win32": + assert exitcode == 0, (stdout, stderr) + else: + assert exitcode == 42, (stdout, stderr) + hello, interval, pid, stack_v2 = list(s.strip() for s in stdout.decode().strip().split("\n")) + assert hello == "hello world", stdout.decode().strip() + assert float(interval) >= 0.01, stdout.decode().strip() + assert stack_v2 == str(stack_v2_enabled) + + +@pytest.mark.skipif(not os.getenv("DD_PROFILE_TEST_GEVENT", False), reason="Not testing gevent") +@pytest.mark.parametrize("stack_v2_enabled", [True, False]) +def test_call_script_gevent(stack_v2_enabled): + if sys.version_info[:2] == (3, 7) and stack_v2_enabled: + pytest.skip("stack_v2 is not supported on Python 3.7") + if sys.version_info[:2] == (3, 8) and stack_v2_enabled: + pytest.skip("this test is flaky on 3.8 with stack v2") + env = os.environ.copy() + env["DD_PROFILING_ENABLED"] = "1" + env["DD_PROFILING_STACK_V2_ENABLED"] = "1" if stack_v2_enabled else "0" + stdout, stderr, exitcode, pid = call_program( + sys.executable, os.path.join(os.path.dirname(__file__), "simple_program_gevent.py"), env=env + ) + assert exitcode == 0, (stdout, stderr) + + +@pytest.mark.parametrize("stack_v2_enabled", [True, False]) +def test_call_script_pprof_output(stack_v2_enabled, tmp_path): + if sys.version_info[:2] == (3, 7) and stack_v2_enabled: + pytest.skip("stack_v2 is not supported on Python 3.7") + + """This checks if the pprof output and atexit register work correctly. + + The script does not run for one minute, so if the `stop_on_exit` flag is broken, this test will fail. + """ + filename = str(tmp_path / "pprof") + env = os.environ.copy() + env["DD_PROFILING_OUTPUT_PPROF"] = filename + env["DD_PROFILING_CAPTURE_PCT"] = "1" + env["DD_PROFILING_ENABLED"] = "1" + env["DD_PROFILING_STACK_V2_ENABLED"] = "1" if stack_v2_enabled else "0" + stdout, stderr, exitcode, _ = call_program( + "ddtrace-run", + sys.executable, + os.path.join(os.path.dirname(__file__), "../profiling", "simple_program.py"), + env=env, + ) + if sys.platform == "win32": + assert exitcode == 0, (stdout, stderr) + else: + assert exitcode == 42, (stdout, stderr) + _, _, _, pid = list(s.strip() for s in stdout.decode().strip().split("\n")) + profile = pprof_utils.parse_profile(filename + "." + str(pid)) + samples = pprof_utils.get_samples_with_value_type(profile, "cpu-time") + assert len(samples) > 0 + + +@pytest.mark.parametrize("stack_v2_enabled", [True, False]) +@pytest.mark.skipif(sys.platform == "win32", reason="fork only available on Unix") +def test_fork(stack_v2_enabled, tmp_path): + if sys.version_info[:2] == (3, 7) and stack_v2_enabled: + pytest.skip("stack_v2 is not supported on Python 3.7") + + filename = str(tmp_path / "pprof") + env = os.environ.copy() + env["DD_PROFILING_OUTPUT_PPROF"] = filename + env["DD_PROFILING_CAPTURE_PCT"] = "100" + env["DD_PROFILING_STACK_V2_ENABLED"] = "1" if stack_v2_enabled else "0" + stdout, stderr, exitcode, pid = call_program( + "python", os.path.join(os.path.dirname(__file__), "simple_program_fork.py"), env=env + ) + assert exitcode == 0 + child_pid = stdout.decode().strip() + profile = pprof_utils.parse_profile(filename + "." + str(pid)) + pprof_utils.assert_lock_events( + profile, + expected_acquire_events=[ + pprof_utils.LockAcquireEvent( + caller_name="", + filename="simple_program_fork.py", + linenos=lock_utils.LineNo(create=11, acquire=12, release=28), + lock_name="lock", + ), + ], + expected_release_events=[ + pprof_utils.LockReleaseEvent( + caller_name="", + filename="simple_program_fork.py", + linenos=lock_utils.LineNo(create=11, acquire=12, release=28), + lock_name="lock", + ), + ], + ) + child_profile = pprof_utils.parse_profile(filename + "." + str(child_pid)) + pprof_utils.assert_lock_events( + child_profile, + expected_acquire_events=[ + # After fork(), we clear the samples in child, so we only have one + # lock acquire event + pprof_utils.LockAcquireEvent( + caller_name="", + filename="simple_program_fork.py", + linenos=lock_utils.LineNo(create=24, acquire=25, release=26), + lock_name="lock", + ), + ], + expected_release_events=[ + pprof_utils.LockReleaseEvent( + caller_name="", + filename="simple_program_fork.py", + linenos=lock_utils.LineNo(create=11, acquire=12, release=21), + lock_name="lock", + ), + pprof_utils.LockReleaseEvent( + caller_name="", + filename="simple_program_fork.py", + linenos=lock_utils.LineNo(create=24, acquire=25, release=26), + lock_name="lock", + ), + ], + ) + + +@pytest.mark.parametrize("stack_v2_enabled", [True, False]) +@pytest.mark.skipif(sys.platform == "win32", reason="fork only available on Unix") +@pytest.mark.skipif(not os.getenv("DD_PROFILE_TEST_GEVENT", False), reason="Not testing gevent") +def test_fork_gevent(stack_v2_enabled): + if sys.version_info[:2] == (3, 7) and stack_v2_enabled: + pytest.skip("stack_v2 is not supported on Python 3.7") + env = os.environ.copy() + env["DD_PROFILING_STACK_V2_ENABLED"] = "1" if stack_v2_enabled else "0" + stdout, stderr, exitcode, pid = call_program( + "python", os.path.join(os.path.dirname(__file__), "../profiling", "gevent_fork.py"), env=env + ) + assert exitcode == 0 + + +methods = multiprocessing.get_all_start_methods() + + +@pytest.mark.parametrize("stack_v2_enabled", [True, False]) +@pytest.mark.parametrize( + "method", + set(methods) - {"forkserver", "fork"}, +) +def test_multiprocessing(stack_v2_enabled, method, tmp_path): + if sys.version_info[:2] == (3, 7) and stack_v2_enabled: + pytest.skip("stack_v2 is not supported on Python 3.7") + filename = str(tmp_path / "pprof") + env = os.environ.copy() + env["DD_PROFILING_OUTPUT_PPROF"] = filename + env["DD_PROFILING_ENABLED"] = "1" + env["DD_PROFILING_CAPTURE_PCT"] = "1" + env["DD_PROFILING_STACK_V2_ENABLED"] = "1" if stack_v2_enabled else "0" + stdout, stderr, exitcode, _ = call_program( + "ddtrace-run", + sys.executable, + os.path.join(os.path.dirname(__file__), "../profiling", "_test_multiprocessing.py"), + method, + env=env, + ) + assert exitcode == 0, (stdout, stderr) + pid, child_pid = list(s.strip() for s in stdout.decode().strip().split("\n")) + profile = pprof_utils.parse_profile(filename + "." + str(pid)) + samples = pprof_utils.get_samples_with_value_type(profile, "cpu-time") + assert len(samples) > 0 + child_profile = pprof_utils.parse_profile(filename + "." + str(child_pid)) + child_samples = pprof_utils.get_samples_with_value_type(child_profile, "cpu-time") + assert len(child_samples) > 0 + + +@flaky(1731959126) # Marking as flaky so it will show up in flaky reports +@pytest.mark.skipif(os.environ.get("GITLAB_CI") == "true", reason="Hanging and failing in GitLab CI") +@pytest.mark.subprocess( + ddtrace_run=True, + env=dict(DD_PROFILING_ENABLED="1"), + err=lambda _: "RuntimeError: the memalloc module is already started" not in _, +) +def test_memalloc_no_init_error_on_fork(): + import os + + pid = os.fork() + if not pid: + exit(0) + os.waitpid(pid, 0) + + +# Not parametrizing with stack_v2_enabled as subprocess mark doesn't support +# parametrized tests and this only tests our start up code. +@pytest.mark.subprocess( + ddtrace_run=True, + env=dict( + DD_PROFILING_ENABLED="1", + DD_UNLOAD_MODULES_FROM_SITECUSTOMIZE="1", + ), + out="OK\n", + err=None, +) +def test_profiler_start_up_with_module_clean_up_in_protobuf_app(): + # This can cause segfaults if we do module clean up with later versions of + # protobuf. This is a regression test. + from google.protobuf import empty_pb2 # noqa:F401 + + print("OK") diff --git a/tests/profiling_v2/test_profiler.py b/tests/profiling_v2/test_profiler.py new file mode 100644 index 00000000000..b5a2bb4bae8 --- /dev/null +++ b/tests/profiling_v2/test_profiler.py @@ -0,0 +1,204 @@ +import logging +import sys +import time + +import mock +import pytest + +import ddtrace +from ddtrace.profiling import collector +from ddtrace.profiling import exporter +from ddtrace.profiling import profiler +from ddtrace.profiling import scheduler +from ddtrace.profiling.collector import asyncio +from ddtrace.profiling.collector import stack +from ddtrace.profiling.collector import threading + + +def test_status(): + p = profiler.Profiler() + assert repr(p.status) == "" + p.start() + assert repr(p.status) == "" + p.stop(flush=False) + assert repr(p.status) == "" + + +def test_restart(): + p = profiler.Profiler() + p.start() + p.stop(flush=False) + p.start() + p.stop(flush=False) + + +def test_multiple_stop(): + """Check that the profiler can be stopped twice.""" + p = profiler.Profiler() + p.start() + p.stop(flush=False) + p.stop(flush=False) + + +def test_tracer_api(monkeypatch): + monkeypatch.setenv("DD_API_KEY", "foobar") + prof = profiler.Profiler(tracer=ddtrace.tracer) + assert prof.tracer == ddtrace.tracer + for col in prof._profiler._collectors: + if isinstance(col, stack.StackCollector): + assert col.tracer == ddtrace.tracer + break + else: + pytest.fail("Unable to find stack collector") + + +def test_profiler_init_float_division_regression(run_python_code_in_subprocess): + """ + Regression test for https://github.com/DataDog/dd-trace-py/pull/3751 + When float division is enabled, the value of `max_events` can be a `float`, + this is then passed as `deque(maxlen=float)` which is a type error + + File "/var/task/ddtrace/profiling/recorder.py", line 80, in _get_deque_for_event_type + return collections.deque(maxlen=self.max_events.get(event_type, self.default_max_events)) + TypeError: an integer is required + """ + code = """ +from ddtrace.profiling import profiler +from ddtrace.profiling.collector import stack_event + +prof = profiler.Profiler() + +# The error only happened for this specific kind of event +# DEV: Yes, this is likely a brittle way to test, but quickest/easiest way to trigger the error +prof._recorder.push_event(stack_event.StackExceptionSampleEvent()) + """ + + out, err, status, _ = run_python_code_in_subprocess(code) + assert status == 0, err + assert out == b"", err + assert err == b"" + + +@pytest.mark.subprocess() +def test_default_memory(): + from ddtrace.profiling import profiler + from ddtrace.profiling.collector import memalloc + + assert any(isinstance(col, memalloc.MemoryCollector) for col in profiler.Profiler()._profiler._collectors) + + +@pytest.mark.subprocess(env=dict(DD_PROFILING_MEMORY_ENABLED="true")) +def test_enable_memory(): + from ddtrace.profiling import profiler + from ddtrace.profiling.collector import memalloc + + assert any(isinstance(col, memalloc.MemoryCollector) for col in profiler.Profiler()._profiler._collectors) + + +@pytest.mark.subprocess(env=dict(DD_PROFILING_MEMORY_ENABLED="false")) +def test_disable_memory(): + from ddtrace.profiling import profiler + from ddtrace.profiling.collector import memalloc + + assert all(not isinstance(col, memalloc.MemoryCollector) for col in profiler.Profiler()._profiler._collectors) + + +def test_copy(): + p = profiler._ProfilerInstance(env="123", version="dwq", service="foobar") + c = p.copy() + assert c == p + assert p.env == c.env + assert p.version == c.version + assert p.service == c.service + assert p.tracer == c.tracer + assert p.tags == c.tags + + +def test_failed_start_collector(caplog, monkeypatch): + class ErrCollect(collector.Collector): + def _start_service(self): + raise RuntimeError("could not import required module") + + def _stop_service(self): + pass + + @staticmethod + def collect(): + pass + + @staticmethod + def snapshot(): + raise Exception("error!") + + monkeypatch.setenv("DD_PROFILING_UPLOAD_INTERVAL", "1") + + class Exporter(exporter.Exporter): + def export(self, events, *args, **kwargs): + pass + + class TestProfiler(profiler._ProfilerInstance): + def _build_default_exporters(self, *args, **kargs): + return [Exporter()] + + p = TestProfiler() + err_collector = mock.MagicMock(wraps=ErrCollect(p._recorder)) + p._collectors = [err_collector] + p.start() + + def profiling_tuples(tuples): + return [t for t in tuples if t[0].startswith("ddtrace.profiling")] + + assert profiling_tuples(caplog.record_tuples) == [ + ("ddtrace.profiling.profiler", logging.ERROR, "Failed to start collector %r, disabling." % err_collector) + ] + time.sleep(2) + p.stop() + assert err_collector.snapshot.call_count == 0 + assert profiling_tuples(caplog.record_tuples) == [ + ("ddtrace.profiling.profiler", logging.ERROR, "Failed to start collector %r, disabling." % err_collector) + ] + + +def test_default_collectors(): + p = profiler.Profiler() + assert any(isinstance(c, stack.StackCollector) for c in p._profiler._collectors) + assert any(isinstance(c, threading.ThreadingLockCollector) for c in p._profiler._collectors) + try: + import asyncio as _ # noqa: F401 + except ImportError: + pass + else: + assert any(isinstance(c, asyncio.AsyncioLockCollector) for c in p._profiler._collectors) + p.stop(flush=False) + + +def test_profiler_serverless(monkeypatch): + # type: (...) -> None + monkeypatch.setenv("AWS_LAMBDA_FUNCTION_NAME", "foobar") + p = profiler.Profiler() + assert isinstance(p._scheduler, scheduler.ServerlessScheduler) + assert p.tags["functionname"] == "foobar" + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="Python 3.7 deprecation warning") +@pytest.mark.subprocess() +def test_profiler_ddtrace_deprecation(): + """ + ddtrace interfaces loaded by the profiler can be marked deprecated, and we should update + them when this happens. As reported by https://github.com/DataDog/dd-trace-py/issues/8881 + """ + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + from ddtrace.profiling import _threading # noqa:F401 + from ddtrace.profiling import event # noqa:F401 + from ddtrace.profiling import profiler # noqa:F401 + from ddtrace.profiling import recorder # noqa:F401 + from ddtrace.profiling import scheduler # noqa:F401 + from ddtrace.profiling.collector import _lock # noqa:F401 + from ddtrace.profiling.collector import _task # noqa:F401 + from ddtrace.profiling.collector import _traceback # noqa:F401 + from ddtrace.profiling.collector import memalloc # noqa:F401 + from ddtrace.profiling.collector import stack # noqa:F401 + from ddtrace.profiling.collector import stack_event # noqa:F401 diff --git a/tests/profiling_v2/test_scheduler.py b/tests/profiling_v2/test_scheduler.py new file mode 100644 index 00000000000..dc3c2c0d7d1 --- /dev/null +++ b/tests/profiling_v2/test_scheduler.py @@ -0,0 +1,54 @@ +# -*- encoding: utf-8 -*- +import logging +import time + +import mock + +from ddtrace.profiling import exporter +from ddtrace.profiling import scheduler + + +def test_thread_name(): + exp = exporter.NullExporter() + s = scheduler.Scheduler(None, [exp]) + s.start() + assert s._worker.name == "ddtrace.profiling.scheduler:Scheduler" + s.stop() + + +def test_before_flush(): + x = {} + + def call_me(): + x["OK"] = True + + s = scheduler.Scheduler(None, [exporter.NullExporter()], before_flush=call_me) + s.flush() + assert x["OK"] + + +def test_before_flush_failure(caplog): + def call_me(): + raise Exception("LOL") + + s = scheduler.Scheduler(None, [exporter.NullExporter()], before_flush=call_me) + s.flush() + assert caplog.record_tuples == [ + (("ddtrace.profiling.scheduler", logging.ERROR, "Scheduler before_flush hook failed")) + ] + + +@mock.patch("ddtrace.profiling.scheduler.Scheduler.periodic") +def test_serverless_periodic(mock_periodic): + s = scheduler.ServerlessScheduler(None, [exporter.NullExporter()]) + # Fake start() + s._last_export = time.time_ns() + s.periodic() + assert s._profiled_intervals == 1 + mock_periodic.assert_not_called() + s._last_export = time.time_ns() - 65 + s._profiled_intervals = 65 + s.periodic() + assert s._profiled_intervals == 0 + assert s.interval == 1 + mock_periodic.assert_called()