Skip to content

Commit

Permalink
fix(celery): close celery.apply spans using task_protocol 1 [backport…
Browse files Browse the repository at this point in the history
… 2.12] (#10872)

Backport 6346fcb from #10848 to 2.12.

This PR fixes an issue where Celery's closing signals got triggered but
dd-trace-py skipped closing the `celery.apply` span due to not finding
the task id.

In celery's `task_protocol: 1`, the id is in the message of the body:
https://docs.celeryq.dev/en/main/internals/protocol.html#message-body .

The issue with the previous logic is that if the headers does have
information (even if the headers were unrelated to the id), it would
skip the check of the id in the body:

before:

```
if headers:
```

after (this PR): 

```
if headers and 'id' in headers:
```

By doing this, we check the headers for the id, then check the body for
the id.

If it fails to find the task id in the body or header, then it still
hits the debug log, `unable to extract the Task and the task_id. This
version of Celery may not be supported.` .

This PR relates to the goal of
#10676 , to close celery
spans. If for some reason the logic in this PR fails to close an open
`celery.apply` span, #10676
will act as a fail safe and close it.

Special Thanks: @timmc-edx for helping us track this down!

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

Co-authored-by: wantsui <[email protected]>
  • Loading branch information
github-actions[bot] and wantsui authored Oct 1, 2024
1 parent e672046 commit dc64a56
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 10 deletions.
7 changes: 5 additions & 2 deletions ddtrace/contrib/internal/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,12 @@ def retrieve_task_id(context):
"""
headers = context.get("headers")
body = context.get("body")
if headers:
# Check if the id is in the headers, then check the body for it.
# If we don't check the id first, we could wrongly assume no task_id
# when the task_id is in the body.
if headers and "id" in headers:
# Protocol Version 2 (default from Celery 4.0)
return headers.get("id")
else:
elif body and "id" in body:
# Protocol Version 1
return body.get("id")
4 changes: 4 additions & 0 deletions releasenotes/notes/fix-protocol-check-62ad7d096b1f36a9.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing(celery): Fixes an issue where ``celery.apply`` spans using task_protocol 1 didn't close by improving the check for the task id in the body.
102 changes: 94 additions & 8 deletions tests/contrib/celery/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ def test_tags_from_context_empty_keys():
assert {} == tags


def test_task_id_from_protocol_v1():
def test_task_id_from_protocol_v1_no_headers():
# ensures a `task_id` is properly returned when Protocol v1 is used.
# `context` is an example of an emitted Signal with Protocol v1
# this test assumes the headers are blank
test_id = "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7"
context = {
"body": {
"expires": None,
Expand All @@ -143,7 +145,7 @@ def test_task_id_from_protocol_v1():
"callbacks": None,
"errbacks": None,
"taskset": None,
"id": "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7",
"id": test_id,
"retries": 0,
"task": "tests.contrib.celery.test_integration.fn_task_parameters",
"timelimit": (None, None),
Expand All @@ -159,12 +161,87 @@ def test_task_id_from_protocol_v1():
}

task_id = retrieve_task_id(context)
assert task_id == "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7"
assert task_id == test_id


def test_task_id_from_protocol_v2():
def test_task_id_from_protocol_v1_with_headers():
# ensures a `task_id` is properly returned when Protocol v1 is used with headers.
# `context` is an example of an emitted Signal with Protocol v1
# this tests ensures that the headers have other information
test_id = "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7"
context = {
"body": {
"expires": None,
"utc": True,
"args": ["user"],
"chord": None,
"callbacks": None,
"errbacks": None,
"taskset": None,
"id": test_id,
"retries": 0,
"task": "tests.contrib.celery.test_integration.fn_task_parameters",
"timelimit": (None, None),
"eta": None,
"kwargs": {"force_logout": True},
},
"sender": "tests.contrib.celery.test_integration.fn_task_parameters",
"exchange": "celery",
"routing_key": "celery",
"retry_policy": None,
"headers": {
"header1": "value",
"header2": "value",
},
"properties": {},
}

task_id = retrieve_task_id(context)
assert task_id == test_id


def test_task_id_from_protocol_v2_no_body():
# ensures a `task_id` is properly returned when Protocol v2 is used.
# `context` is an example of an emitted Signal with Protocol v2
# this tests assumes the body has no data
test_id = "7e917b83-4018-431d-9832-73a28e1fb6c0"
context = {
"body": {},
"sender": "tests.contrib.celery.test_integration.fn_task_parameters",
"exchange": "",
"routing_key": "celery",
"retry_policy": None,
"headers": {
"origin": "gen83744@hostname",
"root_id": test_id,
"expires": None,
"shadow": None,
"id": test_id,
"kwargsrepr": "{'force_logout': True}",
"lang": "py",
"retries": 0,
"task": "tests.contrib.celery.test_integration.fn_task_parameters",
"group": None,
"timelimit": [None, None],
"parent_id": None,
"argsrepr": "['user']",
"eta": None,
},
"properties": {
"reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca",
"correlation_id": test_id,
},
}

task_id = retrieve_task_id(context)
assert task_id == test_id


def test_task_id_from_protocol_v2_with_body():
# ensures a `task_id` is properly returned when Protocol v2 is used.
# `context` is an example of an emitted Signal with Protocol v2
# this tests assumes the body has data
test_id = "7e917b83-4018-431d-9832-73a28e1fb6c0"
context = {
"body": (
["user"],
Expand All @@ -177,10 +254,10 @@ def test_task_id_from_protocol_v2():
"retry_policy": None,
"headers": {
"origin": "gen83744@hostname",
"root_id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
"root_id": test_id,
"expires": None,
"shadow": None,
"id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
"id": test_id,
"kwargsrepr": "{'force_logout': True}",
"lang": "py",
"retries": 0,
Expand All @@ -193,9 +270,18 @@ def test_task_id_from_protocol_v2():
},
"properties": {
"reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca",
"correlation_id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
"correlation_id": test_id,
},
}

task_id = retrieve_task_id(context)
assert task_id == "7e917b83-4018-431d-9832-73a28e1fb6c0"
assert task_id == test_id


def test_task_id_from_blank_context():
# if there is no context (thus missing headers and body),
# no task_id is returned
context = {}

task_id = retrieve_task_id(context)
assert task_id is None

0 comments on commit dc64a56

Please sign in to comment.