From c2c572713d6ed70759bad2290d93eb329f7c8cfd Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 1 Oct 2024 19:42:21 +0000 Subject: [PATCH] fix(celery): close celery.apply spans using task_protocol 1 [backport 2.14] (#10874) Backport 6346fcbc7290c686c81839add78e355aefa97955 from #10848 to 2.14. This PR fixes an issue where Celery's closing signals got triggered but dd-trace-py skipped closing the `celery.apply` span due to not finding the task id. In celery's `task_protocol: 1`, the id is in the message of the body: https://docs.celeryq.dev/en/main/internals/protocol.html#message-body . The issue with the previous logic is that if the headers does have information (even if the headers were unrelated to the id), it would skip the check of the id in the body: before: ``` if headers: ``` after (this PR): ``` if headers and 'id' in headers: ``` By doing this, we check the headers for the id, then check the body for the id. If it fails to find the task id in the body or header, then it still hits the debug log, `unable to extract the Task and the task_id. This version of Celery may not be supported.` . This PR relates to the goal of https://github.com/DataDog/dd-trace-py/pull/10676 , to close celery spans. If for some reason the logic in this PR fails to close an open `celery.apply` span, https://github.com/DataDog/dd-trace-py/pull/10676 will act as a fail safe and close it. Special Thanks: @timmc-edx for helping us track this down! ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: wantsui --- ddtrace/contrib/internal/celery/utils.py | 7 +- .../fix-protocol-check-62ad7d096b1f36a9.yaml | 4 + tests/contrib/celery/test_utils.py | 102 ++++++++++++++++-- 3 files changed, 103 insertions(+), 10 deletions(-) create mode 100644 releasenotes/notes/fix-protocol-check-62ad7d096b1f36a9.yaml diff --git a/ddtrace/contrib/internal/celery/utils.py b/ddtrace/contrib/internal/celery/utils.py index 80a4046f191..e18f83ba0dc 100644 --- a/ddtrace/contrib/internal/celery/utils.py +++ b/ddtrace/contrib/internal/celery/utils.py @@ -128,9 +128,12 @@ def retrieve_task_id(context): """ headers = context.get("headers") body = context.get("body") - if headers: + # Check if the id is in the headers, then check the body for it. + # If we don't check the id first, we could wrongly assume no task_id + # when the task_id is in the body. + if headers and "id" in headers: # Protocol Version 2 (default from Celery 4.0) return headers.get("id") - else: + elif body and "id" in body: # Protocol Version 1 return body.get("id") diff --git a/releasenotes/notes/fix-protocol-check-62ad7d096b1f36a9.yaml b/releasenotes/notes/fix-protocol-check-62ad7d096b1f36a9.yaml new file mode 100644 index 00000000000..e6dba7a3923 --- /dev/null +++ b/releasenotes/notes/fix-protocol-check-62ad7d096b1f36a9.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + tracing(celery): Fixes an issue where ``celery.apply`` spans using task_protocol 1 didn't close by improving the check for the task id in the body. \ No newline at end of file diff --git a/tests/contrib/celery/test_utils.py b/tests/contrib/celery/test_utils.py index 92a9405c99b..3bbeb959ce1 100644 --- a/tests/contrib/celery/test_utils.py +++ b/tests/contrib/celery/test_utils.py @@ -131,9 +131,11 @@ def test_tags_from_context_empty_keys(): assert {} == tags -def test_task_id_from_protocol_v1(): +def test_task_id_from_protocol_v1_no_headers(): # ensures a `task_id` is properly returned when Protocol v1 is used. # `context` is an example of an emitted Signal with Protocol v1 + # this test assumes the headers are blank + test_id = "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7" context = { "body": { "expires": None, @@ -143,7 +145,7 @@ def test_task_id_from_protocol_v1(): "callbacks": None, "errbacks": None, "taskset": None, - "id": "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7", + "id": test_id, "retries": 0, "task": "tests.contrib.celery.test_integration.fn_task_parameters", "timelimit": (None, None), @@ -159,12 +161,87 @@ def test_task_id_from_protocol_v1(): } task_id = retrieve_task_id(context) - assert task_id == "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7" + assert task_id == test_id -def test_task_id_from_protocol_v2(): +def test_task_id_from_protocol_v1_with_headers(): + # ensures a `task_id` is properly returned when Protocol v1 is used with headers. + # `context` is an example of an emitted Signal with Protocol v1 + # this tests ensures that the headers have other information + test_id = "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7" + context = { + "body": { + "expires": None, + "utc": True, + "args": ["user"], + "chord": None, + "callbacks": None, + "errbacks": None, + "taskset": None, + "id": test_id, + "retries": 0, + "task": "tests.contrib.celery.test_integration.fn_task_parameters", + "timelimit": (None, None), + "eta": None, + "kwargs": {"force_logout": True}, + }, + "sender": "tests.contrib.celery.test_integration.fn_task_parameters", + "exchange": "celery", + "routing_key": "celery", + "retry_policy": None, + "headers": { + "header1": "value", + "header2": "value", + }, + "properties": {}, + } + + task_id = retrieve_task_id(context) + assert task_id == test_id + + +def test_task_id_from_protocol_v2_no_body(): # ensures a `task_id` is properly returned when Protocol v2 is used. # `context` is an example of an emitted Signal with Protocol v2 + # this tests assumes the body has no data + test_id = "7e917b83-4018-431d-9832-73a28e1fb6c0" + context = { + "body": {}, + "sender": "tests.contrib.celery.test_integration.fn_task_parameters", + "exchange": "", + "routing_key": "celery", + "retry_policy": None, + "headers": { + "origin": "gen83744@hostname", + "root_id": test_id, + "expires": None, + "shadow": None, + "id": test_id, + "kwargsrepr": "{'force_logout': True}", + "lang": "py", + "retries": 0, + "task": "tests.contrib.celery.test_integration.fn_task_parameters", + "group": None, + "timelimit": [None, None], + "parent_id": None, + "argsrepr": "['user']", + "eta": None, + }, + "properties": { + "reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca", + "correlation_id": test_id, + }, + } + + task_id = retrieve_task_id(context) + assert task_id == test_id + + +def test_task_id_from_protocol_v2_with_body(): + # ensures a `task_id` is properly returned when Protocol v2 is used. + # `context` is an example of an emitted Signal with Protocol v2 + # this tests assumes the body has data + test_id = "7e917b83-4018-431d-9832-73a28e1fb6c0" context = { "body": ( ["user"], @@ -177,10 +254,10 @@ def test_task_id_from_protocol_v2(): "retry_policy": None, "headers": { "origin": "gen83744@hostname", - "root_id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + "root_id": test_id, "expires": None, "shadow": None, - "id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + "id": test_id, "kwargsrepr": "{'force_logout': True}", "lang": "py", "retries": 0, @@ -193,9 +270,18 @@ def test_task_id_from_protocol_v2(): }, "properties": { "reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca", - "correlation_id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + "correlation_id": test_id, }, } task_id = retrieve_task_id(context) - assert task_id == "7e917b83-4018-431d-9832-73a28e1fb6c0" + assert task_id == test_id + + +def test_task_id_from_blank_context(): + # if there is no context (thus missing headers and body), + # no task_id is returned + context = {} + + task_id = retrieve_task_id(context) + assert task_id is None