From b8196365aa5ceb7401d75907d50c282a3eab7315 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Mon, 18 Mar 2024 18:46:25 -0400 Subject: [PATCH] chore(asyncio): parent spans on task creation (#8671) - Reverts deprecating the asyncio integration. - Adds support for propagating the current active trace context to newly created tasks (currently context is propagated when a task is executed and not when it is created). Resolves: https://github.com/DataDog/dd-trace-py/issues/8637 ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. - [x] If change touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Emmett Butler <723615+emmettbutler@users.noreply.github.com> --- ddtrace/_monkey.py | 2 +- ddtrace/contrib/asyncio/__init__.py | 47 +------------------ ddtrace/contrib/asyncio/compat.py | 12 +++++ ddtrace/contrib/asyncio/helpers.py | 21 +++++++++ ddtrace/contrib/asyncio/patch.py | 33 ++++++++++++- ...asyncontext-provider-f72ea7035a630063.yaml | 2 +- tests/contrib/asyncio/test_tracer.py | 30 ++++++++++++ 7 files changed, 98 insertions(+), 49 deletions(-) diff --git a/ddtrace/_monkey.py b/ddtrace/_monkey.py index 5d25fef52ad..6090e9e923d 100644 --- a/ddtrace/_monkey.py +++ b/ddtrace/_monkey.py @@ -25,7 +25,7 @@ "aioredis": True, "aiomysql": True, "aredis": True, - "asyncio": False, + "asyncio": True, "boto": True, "botocore": True, "bottle": True, diff --git a/ddtrace/contrib/asyncio/__init__.py b/ddtrace/contrib/asyncio/__init__.py index 922f842ec8e..776159605ca 100644 --- a/ddtrace/contrib/asyncio/__init__.py +++ b/ddtrace/contrib/asyncio/__init__.py @@ -1,47 +1,7 @@ """ This integration provides context management for tracing the execution flow of concurrent execution of ``asyncio.Task``. - -This integration is only necessary in Python < 3.7 (where contextvars is not supported). -For Python > 3.7 this works automatically without configuration. - -For asynchronous execution tracing to work properly the tracer must -be configured as follows:: - - import asyncio - from ddtrace import tracer - from ddtrace.contrib.asyncio import context_provider - - # enable asyncio support - tracer.configure(context_provider=context_provider) - - async def some_work(): - with tracer.trace('asyncio.some_work'): - # do something - - # launch your coroutines as usual - loop = asyncio.get_event_loop() - loop.run_until_complete(some_work()) - loop.close() - -In addition, helpers are provided to simplify how the tracing ``Context`` is -handled between scheduled coroutines and ``Future`` invoked in separated -threads: - - * ``set_call_context(task, ctx)``: attach the context to the given ``Task`` - so that it will be available from the ``tracer.current_trace_context()`` - * ``ensure_future(coro_or_future, *, loop=None)``: wrapper for the - ``asyncio.ensure_future`` that attaches the current context to a new - ``Task`` instance - * ``run_in_executor(loop, executor, func, *args)``: wrapper for the - ``loop.run_in_executor`` that attaches the current context to the new - thread so that the trace can be resumed regardless when it's executed - * ``create_task(coro)``: creates a new asyncio ``Task`` that inherits the - current active ``Context`` so that generated traces in the new task are - attached to the main trace """ -from ddtrace.vendor.debtcollector import deprecate - from ...internal.utils.importlib import require_modules @@ -51,12 +11,6 @@ async def some_work(): if not missing_modules: from ddtrace._trace.provider import DefaultContextProvider - deprecate( - "The ddtrace asyncio integration is deprecated." - " The ddtrace library fully supports propagating trace contextes to async tasks." - " No additional configurations are required.", - version="3.0.0", - ) context_provider = DefaultContextProvider() from .helpers import ensure_future @@ -64,5 +18,6 @@ async def some_work(): from .helpers import set_call_context from .patch import get_version from .patch import patch + from .patch import unpatch # noqa: F401 __all__ = ["context_provider", "set_call_context", "ensure_future", "run_in_executor", "patch", "get_version"] diff --git a/ddtrace/contrib/asyncio/compat.py b/ddtrace/contrib/asyncio/compat.py index 4389a36f400..2328d57c6b8 100644 --- a/ddtrace/contrib/asyncio/compat.py +++ b/ddtrace/contrib/asyncio/compat.py @@ -1,9 +1,16 @@ import asyncio +from ddtrace.vendor.debtcollector import deprecate + if hasattr(asyncio, "current_task"): def asyncio_current_task(): + deprecate( + "ddtrace.contrib.asyncio.create_task(..) is deprecated. The ddtrace library fully supports propagating" + " trace contextes to async tasks. No additional configurations are required.", + version="3.0.0", + ) try: return asyncio.current_task() except RuntimeError: @@ -12,4 +19,9 @@ def asyncio_current_task(): else: def asyncio_current_task(): + deprecate( + "ddtrace.contrib.asyncio.create_task(..) is deprecated. The ddtrace library fully supports propagating" + " trace contextes to async tasks. No additional configurations are required.", + version="3.0.0", + ) return asyncio.Task.current_task() diff --git a/ddtrace/contrib/asyncio/helpers.py b/ddtrace/contrib/asyncio/helpers.py index f1803ee40ca..58dfc92d744 100644 --- a/ddtrace/contrib/asyncio/helpers.py +++ b/ddtrace/contrib/asyncio/helpers.py @@ -6,6 +6,7 @@ import asyncio import ddtrace +from ddtrace.vendor.debtcollector import deprecate from .provider import AsyncioContextProvider from .wrappers import wrapped_create_task @@ -19,6 +20,11 @@ def set_call_context(task, ctx): This method is available for backward-compatibility. Use the ``AsyncioContextProvider`` API to set the current active ``Context``. """ + deprecate( + "ddtrace.contrib.asyncio.set_call_context(..) is deprecated. The ddtrace library fully supports propagating" + " trace contextes to async tasks. No additional configurations are required.", + version="3.0.0", + ) setattr(task, AsyncioContextProvider._CONTEXT_ATTR, ctx) @@ -27,6 +33,11 @@ def ensure_future(coro_or_future, *, loop=None, tracer=None): If the current task already has a Context, it will be attached to the new Task so the Trace list will be preserved. """ + deprecate( + "ddtrace.contrib.asyncio.ensure_future(..) is deprecated. The ddtrace library fully supports propagating" + " trace contextes to async tasks. No additional configurations are required.", + version="3.0.0", + ) tracer = tracer or ddtrace.tracer current_ctx = tracer.current_trace_context() task = asyncio.ensure_future(coro_or_future, loop=loop) @@ -51,6 +62,11 @@ def run_in_executor(loop, executor, func, *args, tracer=None): we fallback to the thread-local ``Context`` storage. """ + deprecate( + "ddtrace.contrib.asyncio.run_in_executor(..) is deprecated. The ddtrace library fully supports propagating" + " trace contexts to async tasks. No additional configurations are required.", + version="3.0.0", + ) tracer = tracer or ddtrace.tracer current_ctx = tracer.current_trace_context() @@ -77,5 +93,10 @@ def create_task(*args, **kwargs): """This function spawns a task with a Context that inherits the `trace_id` and the `parent_id` from the current active one if available. """ + deprecate( + "ddtrace.contrib.asyncio.create_task(..) is deprecated. The ddtrace library fully supports propagating" + " trace contextes to async tasks. No additional configurations are required.", + version="3.0.0", + ) loop = asyncio.get_event_loop() return wrapped_create_task(loop.create_task, None, args, kwargs) diff --git a/ddtrace/contrib/asyncio/patch.py b/ddtrace/contrib/asyncio/patch.py index 24620a71bf0..83f1918e9eb 100644 --- a/ddtrace/contrib/asyncio/patch.py +++ b/ddtrace/contrib/asyncio/patch.py @@ -1,5 +1,11 @@ import asyncio +from ddtrace import Pin +from ddtrace.internal.utils import get_argument_value +from ddtrace.internal.utils import set_argument_value +from ddtrace.internal.wrapping import unwrap +from ddtrace.internal.wrapping import wrap + def get_version(): # type: () -> str @@ -13,10 +19,35 @@ def patch(): if getattr(asyncio, "_datadog_patch", False): return asyncio._datadog_patch = True + Pin().onto(asyncio) + wrap(asyncio.BaseEventLoop.create_task, _wrapped_create_task_py37) def unpatch(): """Remove tracing from patched modules.""" if getattr(asyncio, "_datadog_patch", False): - asyncio._datadog_patch = False + return + asyncio._datadog_patch = False + unwrap(asyncio.BaseEventLoop.create_task, _wrapped_create_task_py37) + + +def _wrapped_create_task_py37(wrapped, args, kwargs): + """This function ensures the current active trace context is propagated to scheduled tasks. + By default the trace context is propagated when a task is executed and NOT when it is created. + """ + pin = Pin.get_from(asyncio) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + # override existing co-rountine to ensure the current trace context is propagated + coro = get_argument_value(args, kwargs, 1, "coro") + dd_active = pin.tracer.current_trace_context() + + async def traced_coro(*args_c, **kwargs_c): + if dd_active and dd_active != pin.tracer.current_trace_context(): + pin.tracer.context_provider.activate(dd_active) + return await coro + + args, kwargs = set_argument_value(args, kwargs, 1, "coro", traced_coro()) + return wrapped(*args, **kwargs) diff --git a/releasenotes/notes/deprecate-asyncontext-provider-f72ea7035a630063.yaml b/releasenotes/notes/deprecate-asyncontext-provider-f72ea7035a630063.yaml index 1ccb97dc57d..5bec8c1fa80 100644 --- a/releasenotes/notes/deprecate-asyncontext-provider-f72ea7035a630063.yaml +++ b/releasenotes/notes/deprecate-asyncontext-provider-f72ea7035a630063.yaml @@ -1,5 +1,5 @@ --- deprecations: - | - tracing: Deprecates the asyncio integration and the ``ddtrace.contrib.asyncio.AsyncioContextProvider`` class. + tracing: Deprecates support for ``ddtrace.contrib.asyncio.AsyncioContextProvider``. ddtrace fully support tracing across asyncio tasks. Asyncio no longer requires additional configurations. diff --git a/tests/contrib/asyncio/test_tracer.py b/tests/contrib/asyncio/test_tracer.py index 87baf2b5979..16d16215b5e 100644 --- a/tests/contrib/asyncio/test_tracer.py +++ b/tests/contrib/asyncio/test_tracer.py @@ -4,9 +4,18 @@ import pytest from ddtrace.constants import ERROR_MSG +from ddtrace.contrib.asyncio import patch +from ddtrace.contrib.asyncio import unpatch from ddtrace.contrib.asyncio.compat import asyncio_current_task +@pytest.fixture(autouse=True) +def patch_asyncio(): + patch() + yield + unpatch() + + def test_get_call_context_twice(tracer): # it should return the same Context if called twice assert tracer.current_trace_context() == tracer.current_trace_context() @@ -178,3 +187,24 @@ async def f1(): assert 1 == len(spans) span = spans[0] assert span.duration > 0.25, "span.duration={}".format(span.duration) + + +def test_asyncio_scheduled_tasks_parenting(tracer): + async def task(i): + with tracer.trace(f"task {i}"): + await asyncio.sleep(0.1) + + @tracer.wrap() + async def runner(): + await task(1) + t = asyncio.create_task(task(2)) + return t + + async def test(): + await runner() + + asyncio.run(test()) + + spans = tracer.get_spans() + assert len(spans) == 3 + assert spans[0].trace_id == spans[1].trace_id == spans[2].trace_id