From f285b51c6561dd3385a3800d1820ff5f5920ea6e Mon Sep 17 00:00:00 2001 From: wantsui Date: Tue, 1 Oct 2024 16:04:19 -0400 Subject: [PATCH] fix(celery): close `celery.apply` spans even without after_task_publish, when using apply_async (#10676) The instrumentation for the Celery integration relies on various [Celery signals ](https://docs.celeryq.dev/en/stable/userguide/signals.html) in order to start and end the span when calling on `apply_async`. The integration can fail if the expected signals don't trigger, which can lead to broken context propagation (and unexpected traces). **Example:** - dd-trace-py expects the signal `before_task_publish` to start the span then `after_task_publish` to close the span. If the `after_task_publish` signal never gets called (which can happen if a Celery exception occurs while processing the app), then the span won't finish. - The same thing above can also happen to `task_prerun` and `task_postrun`. **Solution** This PR patches `apply_async` so that there is a check to see if there is a span lingering around and closes it when `apply_task` is called. If an internal exception happens, the error will be marked on the `celery.apply` span. To track this, I added new logs in debug mode: > The after_task_publish signal was not called, so manually closing span and > The task_postrun signal was not called, so manually closing span There's a related PR https://github.com/DataDog/dd-trace-py/pull/10848 that works to improve how we extract information based on the protocols, that also affects when spans get closed or not. Special Thanks: - Thanks to @tabgok for going through this with me in great detail! - @timmc-edx for helping us track it down! [APMS-13158] ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) APMS-13158 [APMS-13158]: https://datadoghq.atlassian.net/browse/APMS-13158?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --------- Co-authored-by: Emmett Butler <723615+emmettbutler@users.noreply.github.com> (cherry picked from commit 0d28e081c0567dfb68f32ba67dbe9c4e56f7c8ed) --- ddtrace/contrib/internal/celery/app.py | 59 +++++++++++++++++++ ddtrace/contrib/internal/celery/signals.py | 7 +++ ...ply-async-span-close-b7a8db188459f5b5.yaml | 4 ++ tests/contrib/celery/test_integration.py | 29 +++++++++ 4 files changed, 99 insertions(+) create mode 100644 releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 4e305eebec3..e222abd5c17 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -1,3 +1,5 @@ +import sys + import celery from celery import signals @@ -15,9 +17,14 @@ from ddtrace.contrib.internal.celery.signals import trace_retry from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes +from ddtrace.internal import core +from ddtrace.internal.logger import get_logger from ddtrace.pin import _DD_PIN_NAME +log = get_logger(__name__) + + def patch_app(app, pin=None): """Attach the Pin class to the application and connect our handlers to Celery signals. @@ -41,6 +48,9 @@ def patch_app(app, pin=None): trace_utils.wrap("celery.beat", "Scheduler.tick", _traced_beat_function(config.celery, "tick")) pin.onto(celery.beat.Scheduler) + # Patch apply_async + trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async")) + # connect to the Signal framework signals.task_prerun.connect(trace_prerun, weak=False) signals.task_postrun.connect(trace_postrun, weak=False) @@ -65,6 +75,7 @@ def unpatch_app(app): trace_utils.unwrap(celery.beat.Scheduler, "apply_entry") trace_utils.unwrap(celery.beat.Scheduler, "tick") + trace_utils.unwrap(celery.app.task.Task, "apply_async") signals.task_prerun.disconnect(trace_prerun) signals.task_postrun.disconnect(trace_postrun) @@ -96,3 +107,51 @@ def _traced_beat_inner(func, instance, args, kwargs): return func(*args, **kwargs) return _traced_beat_inner + + +def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): + """ + When apply_async is called, it calls various Celery signals in order, which gets used + to start and close the span. + Example: before_task_publish starts the span while after_task_publish closes the span. + If an exception occurs anywhere inside Celery or its dependencies, this can interrupt the + closing signals. + The purpose of _traced_apply_async_function is to close the spans even if one of the closing + signals don't get called over the course of the apply_task lifecycle. + This is done by fetching the stored span and closing it if it hasn't already been closed by a + closing signal. + """ + + def _traced_apply_async_inner(func, instance, args, kwargs): + with core.context_with_data("task_context"): + try: + return func(*args, **kwargs) + except Exception: + # If an internal exception occurs, record the exception in the span, + # then raise the Celery error as usual + task_span = core.get_item("task_span") + if task_span: + task_span.set_exc_info(*sys.exc_info()) + + prerun_span = core.get_item("prerun_span") + if prerun_span: + prerun_span.set_exc_info(*sys.exc_info()) + + raise + finally: + task_span = core.get_item("task_span") + if task_span: + log.debug( + "The after_task_publish signal was not called, so manually closing span: %s", + task_span._pprint(), + ) + task_span.finish() + + prerun_span = core.get_item("prerun_span") + if prerun_span: + log.debug( + "The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint() + ) + prerun_span.finish() + + return _traced_apply_async_inner diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index 1f8dc12dce5..308724c6dae 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -16,6 +16,7 @@ from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes from ddtrace.ext import net +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ddtrace.internal.logger import get_logger from ddtrace.propagation.http import HTTPPropagator @@ -48,6 +49,9 @@ def trace_prerun(*args, **kwargs): service = config.celery["worker_service_name"] span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER) + # Store an item called "prerun span" in case task_postrun doesn't get called + core.set_item("prerun_span", span) + # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) @@ -111,6 +115,9 @@ def trace_before_publish(*args, **kwargs): service = config.celery["producer_service_name"] span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name) + # Store an item called "task span" in case after_task_publish doesn't get called + core.set_item("task_span", span) + span.set_tag_str(COMPONENT, config.celery.integration_name) # set span.kind to the type of request being performed diff --git a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml new file mode 100644 index 00000000000..4ca112a2cfb --- /dev/null +++ b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + tracing(celery): Fixes an issue where ``celery.apply`` spans didn't close if the ``after_task_publish`` or ``task_postrun`` signals didn't get sent when using ``apply_async``, which can happen if there is an internal exception during the handling of the task. This update also marks the span as an error if an exception occurs. \ No newline at end of file diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 267789d325a..3caace9e269 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -6,6 +6,7 @@ import celery from celery.exceptions import Retry +import mock import pytest from ddtrace import Pin @@ -442,6 +443,34 @@ def run(self): assert span.get_tag("span.kind") == "consumer" assert span.error == 0 + @mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError)) + def test_fn_task_apply_async_soft_exception(self): + # If the underlying library runs into an exception that doesn't crash the app + # while calling apply_async, we should still close the span even + # if the closing signals didn't get called and mark the span as an error + + @self.app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + t = None + try: + t = fn_task_parameters.apply_async(args=["user"], kwargs={"force_logout": True}) + except ValueError: + traces = self.pop_traces() + assert 1 == len(traces) + assert traces[0][0].name == "celery.apply" + assert traces[0][0].resource == "tests.contrib.celery.test_integration.fn_task_parameters" + assert traces[0][0].get_tag("celery.action") == "apply_async" + assert traces[0][0].get_tag("component") == "celery" + assert traces[0][0].get_tag("span.kind") == "producer" + # Internal library errors get recorded on the span + assert traces[0][0].error == 1 + assert traces[0][0].get_tag("error.type") == "builtins.ValueError" + assert "ValueError" in traces[0][0].get_tag("error.stack") + # apply_async runs into an internal error (ValueError) so nothing is returned to t + assert t is None + def test_shared_task(self): # Ensure Django Shared Task are supported @celery.shared_task