diff --git a/ddtrace/contrib/internal/celery/utils.py b/ddtrace/contrib/internal/celery/utils.py index 80a4046f191..e18f83ba0dc 100644 --- a/ddtrace/contrib/internal/celery/utils.py +++ b/ddtrace/contrib/internal/celery/utils.py @@ -128,9 +128,12 @@ def retrieve_task_id(context): """ headers = context.get("headers") body = context.get("body") - if headers: + # Check if the id is in the headers, then check the body for it. + # If we don't check the id first, we could wrongly assume no task_id + # when the task_id is in the body. + if headers and "id" in headers: # Protocol Version 2 (default from Celery 4.0) return headers.get("id") - else: + elif body and "id" in body: # Protocol Version 1 return body.get("id") diff --git a/releasenotes/notes/fix-protocol-check-62ad7d096b1f36a9.yaml b/releasenotes/notes/fix-protocol-check-62ad7d096b1f36a9.yaml new file mode 100644 index 00000000000..e6dba7a3923 --- /dev/null +++ b/releasenotes/notes/fix-protocol-check-62ad7d096b1f36a9.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + tracing(celery): Fixes an issue where ``celery.apply`` spans using task_protocol 1 didn't close by improving the check for the task id in the body. \ No newline at end of file diff --git a/tests/contrib/celery/test_utils.py b/tests/contrib/celery/test_utils.py index 92a9405c99b..3bbeb959ce1 100644 --- a/tests/contrib/celery/test_utils.py +++ b/tests/contrib/celery/test_utils.py @@ -131,9 +131,11 @@ def test_tags_from_context_empty_keys(): assert {} == tags -def test_task_id_from_protocol_v1(): +def test_task_id_from_protocol_v1_no_headers(): # ensures a `task_id` is properly returned when Protocol v1 is used. # `context` is an example of an emitted Signal with Protocol v1 + # this test assumes the headers are blank + test_id = "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7" context = { "body": { "expires": None, @@ -143,7 +145,7 @@ def test_task_id_from_protocol_v1(): "callbacks": None, "errbacks": None, "taskset": None, - "id": "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7", + "id": test_id, "retries": 0, "task": "tests.contrib.celery.test_integration.fn_task_parameters", "timelimit": (None, None), @@ -159,12 +161,87 @@ def test_task_id_from_protocol_v1(): } task_id = retrieve_task_id(context) - assert task_id == "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7" + assert task_id == test_id -def test_task_id_from_protocol_v2(): +def test_task_id_from_protocol_v1_with_headers(): + # ensures a `task_id` is properly returned when Protocol v1 is used with headers. + # `context` is an example of an emitted Signal with Protocol v1 + # this tests ensures that the headers have other information + test_id = "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7" + context = { + "body": { + "expires": None, + "utc": True, + "args": ["user"], + "chord": None, + "callbacks": None, + "errbacks": None, + "taskset": None, + "id": test_id, + "retries": 0, + "task": "tests.contrib.celery.test_integration.fn_task_parameters", + "timelimit": (None, None), + "eta": None, + "kwargs": {"force_logout": True}, + }, + "sender": "tests.contrib.celery.test_integration.fn_task_parameters", + "exchange": "celery", + "routing_key": "celery", + "retry_policy": None, + "headers": { + "header1": "value", + "header2": "value", + }, + "properties": {}, + } + + task_id = retrieve_task_id(context) + assert task_id == test_id + + +def test_task_id_from_protocol_v2_no_body(): # ensures a `task_id` is properly returned when Protocol v2 is used. # `context` is an example of an emitted Signal with Protocol v2 + # this tests assumes the body has no data + test_id = "7e917b83-4018-431d-9832-73a28e1fb6c0" + context = { + "body": {}, + "sender": "tests.contrib.celery.test_integration.fn_task_parameters", + "exchange": "", + "routing_key": "celery", + "retry_policy": None, + "headers": { + "origin": "gen83744@hostname", + "root_id": test_id, + "expires": None, + "shadow": None, + "id": test_id, + "kwargsrepr": "{'force_logout': True}", + "lang": "py", + "retries": 0, + "task": "tests.contrib.celery.test_integration.fn_task_parameters", + "group": None, + "timelimit": [None, None], + "parent_id": None, + "argsrepr": "['user']", + "eta": None, + }, + "properties": { + "reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca", + "correlation_id": test_id, + }, + } + + task_id = retrieve_task_id(context) + assert task_id == test_id + + +def test_task_id_from_protocol_v2_with_body(): + # ensures a `task_id` is properly returned when Protocol v2 is used. + # `context` is an example of an emitted Signal with Protocol v2 + # this tests assumes the body has data + test_id = "7e917b83-4018-431d-9832-73a28e1fb6c0" context = { "body": ( ["user"], @@ -177,10 +254,10 @@ def test_task_id_from_protocol_v2(): "retry_policy": None, "headers": { "origin": "gen83744@hostname", - "root_id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + "root_id": test_id, "expires": None, "shadow": None, - "id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + "id": test_id, "kwargsrepr": "{'force_logout': True}", "lang": "py", "retries": 0, @@ -193,9 +270,18 @@ def test_task_id_from_protocol_v2(): }, "properties": { "reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca", - "correlation_id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + "correlation_id": test_id, }, } task_id = retrieve_task_id(context) - assert task_id == "7e917b83-4018-431d-9832-73a28e1fb6c0" + assert task_id == test_id + + +def test_task_id_from_blank_context(): + # if there is no context (thus missing headers and body), + # no task_id is returned + context = {} + + task_id = retrieve_task_id(context) + assert task_id is None