Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(celery): close celery.apply spans even without after_task_publish, when using apply_async [backport 2.12] #10891

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

import celery
from celery import signals

Expand All @@ -15,9 +17,14 @@
from ddtrace.contrib.internal.celery.signals import trace_retry
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.pin import _DD_PIN_NAME


log = get_logger(__name__)


def patch_app(app, pin=None):
"""Attach the Pin class to the application and connect
our handlers to Celery signals.
Expand All @@ -41,6 +48,9 @@ def patch_app(app, pin=None):
trace_utils.wrap("celery.beat", "Scheduler.tick", _traced_beat_function(config.celery, "tick"))
pin.onto(celery.beat.Scheduler)

# Patch apply_async
trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async"))

# connect to the Signal framework
signals.task_prerun.connect(trace_prerun, weak=False)
signals.task_postrun.connect(trace_postrun, weak=False)
Expand All @@ -65,6 +75,7 @@ def unpatch_app(app):

trace_utils.unwrap(celery.beat.Scheduler, "apply_entry")
trace_utils.unwrap(celery.beat.Scheduler, "tick")
trace_utils.unwrap(celery.app.task.Task, "apply_async")

signals.task_prerun.disconnect(trace_prerun)
signals.task_postrun.disconnect(trace_postrun)
Expand Down Expand Up @@ -96,3 +107,51 @@ def _traced_beat_inner(func, instance, args, kwargs):
return func(*args, **kwargs)

return _traced_beat_inner


def _traced_apply_async_function(integration_config, fn_name, resource_fn=None):
"""
When apply_async is called, it calls various Celery signals in order, which gets used
to start and close the span.
Example: before_task_publish starts the span while after_task_publish closes the span.
If an exception occurs anywhere inside Celery or its dependencies, this can interrupt the
closing signals.
The purpose of _traced_apply_async_function is to close the spans even if one of the closing
signals don't get called over the course of the apply_task lifecycle.
This is done by fetching the stored span and closing it if it hasn't already been closed by a
closing signal.
"""

def _traced_apply_async_inner(func, instance, args, kwargs):
with core.context_with_data("task_context"):
try:
return func(*args, **kwargs)
except Exception:
# If an internal exception occurs, record the exception in the span,
# then raise the Celery error as usual
task_span = core.get_item("task_span")
if task_span:
task_span.set_exc_info(*sys.exc_info())

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.set_exc_info(*sys.exc_info())

raise
finally:
task_span = core.get_item("task_span")
if task_span:
log.debug(
"The after_task_publish signal was not called, so manually closing span: %s",
task_span._pprint(),
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
7 changes: 7 additions & 0 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import net
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.logger import get_logger
from ddtrace.propagation.http import HTTPPropagator
Expand Down Expand Up @@ -48,6 +49,9 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down Expand Up @@ -111,6 +115,9 @@ def trace_before_publish(*args, **kwargs):
service = config.celery["producer_service_name"]
span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name)

# Store an item called "task span" in case after_task_publish doesn't get called
core.set_item("task_span", span)

span.set_tag_str(COMPONENT, config.celery.integration_name)

# set span.kind to the type of request being performed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing(celery): Fixes an issue where ``celery.apply`` spans didn't close if the ``after_task_publish`` or ``task_postrun`` signals didn't get sent when using ``apply_async``, which can happen if there is an internal exception during the handling of the task. This update also marks the span as an error if an exception occurs.
29 changes: 29 additions & 0 deletions tests/contrib/celery/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import celery
from celery.exceptions import Retry
import mock
import pytest

from ddtrace import Pin
Expand Down Expand Up @@ -442,6 +443,34 @@ def run(self):
assert span.get_tag("span.kind") == "consumer"
assert span.error == 0

@mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError))
def test_fn_task_apply_async_soft_exception(self):
# If the underlying library runs into an exception that doesn't crash the app
# while calling apply_async, we should still close the span even
# if the closing signals didn't get called and mark the span as an error

@self.app.task
def fn_task_parameters(user, force_logout=False):
return (user, force_logout)

t = None
try:
t = fn_task_parameters.apply_async(args=["user"], kwargs={"force_logout": True})
except ValueError:
traces = self.pop_traces()
assert 1 == len(traces)
assert traces[0][0].name == "celery.apply"
assert traces[0][0].resource == "tests.contrib.celery.test_integration.fn_task_parameters"
assert traces[0][0].get_tag("celery.action") == "apply_async"
assert traces[0][0].get_tag("component") == "celery"
assert traces[0][0].get_tag("span.kind") == "producer"
# Internal library errors get recorded on the span
assert traces[0][0].error == 1
assert traces[0][0].get_tag("error.type") == "builtins.ValueError"
assert "ValueError" in traces[0][0].get_tag("error.stack")
# apply_async runs into an internal error (ValueError) so nothing is returned to t
assert t is None

def test_shared_task(self):
# Ensure Django Shared Task are supported
@celery.shared_task
Expand Down
Loading