Skip to content

Commit

Permalink
feat: support baggage (#10389)
Browse files Browse the repository at this point in the history
First PR introducing baggage support

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Emmett Butler <[email protected]>
Co-authored-by: Zachary Groves <[email protected]>
Co-authored-by: Munir Abdinur <[email protected]>
  • Loading branch information
4 people authored Oct 23, 2024
1 parent 2a4d8ed commit 2b410f9
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 43 deletions.
17 changes: 15 additions & 2 deletions ddtrace/_trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def _trace_id_64bits(self):
else:
return _MAX_UINT_64BITS & self.trace_id

def _set_baggage_item(self, key: str, value: Any) -> None:
def set_baggage_item(self, key: str, value: Any) -> None:
"""Sets a baggage item in this span context.
Note that this operation mutates the baggage of this span context
"""
Expand All @@ -237,10 +237,23 @@ def _with_baggage_item(self, key: str, value: Any) -> "Context":
ctx._baggage = new_baggage
return ctx

def _get_baggage_item(self, key: str) -> Optional[Any]:
def get_baggage_item(self, key: str) -> Optional[Any]:
"""Gets a baggage item in this span context."""
return self._baggage.get(key, None)

def get_all_baggage_items(self) -> Dict[str, Any]:
"""Returns all baggage items in this span context."""
return self._baggage

def remove_baggage_item(self, key: str) -> None:
"""Remove a baggage item from this span context."""
if key in self._baggage:
del self._baggage[key]

def remove_all_baggage_items(self) -> None:
"""Removes all baggage items from this span context."""
self._baggage.clear()

def __eq__(self, other: Any) -> bool:
if isinstance(other, Context):
with self._lock:
Expand Down
11 changes: 0 additions & 11 deletions ddtrace/_trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,23 +497,12 @@ def get_metric(self, key: _TagNameType) -> Optional[NumericType]:
"""Return the given metric or None if it doesn't exist."""
return self._metrics.get(key)

def _set_baggage_item(self, key: str, value: Any) -> "Span":
"""Sets a baggage item in the span context of this span.
Baggage is used to propagate state between spans (in-process, http/https).
"""
self._context = self.context._with_baggage_item(key, value)
return self

def _add_event(
self, name: str, attributes: Optional[Dict[str, str]] = None, timestamp: Optional[int] = None
) -> None:
"""Add an event to the span."""
self._events.append(SpanEvent(name, attributes, timestamp))

def _get_baggage_item(self, key: str) -> Optional[Any]:
"""Gets a baggage item from the span context of this span."""
return self.context._get_baggage_item(key)

def get_metrics(self) -> _MetricDictType:
"""Return all metrics."""
return self._metrics.copy()
Expand Down
8 changes: 7 additions & 1 deletion ddtrace/internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
PROPAGATION_STYLE_B3_SINGLE = "b3"
_PROPAGATION_STYLE_W3C_TRACECONTEXT = "tracecontext"
_PROPAGATION_STYLE_NONE = "none"
_PROPAGATION_STYLE_DEFAULT = "datadog,tracecontext"
_PROPAGATION_STYLE_DEFAULT = "datadog,tracecontext,baggage"
_PROPAGATION_STYLE_BAGGAGE = "baggage"
PROPAGATION_STYLE_ALL = (
_PROPAGATION_STYLE_W3C_TRACECONTEXT,
PROPAGATION_STYLE_DATADOG,
PROPAGATION_STYLE_B3_MULTI,
PROPAGATION_STYLE_B3_SINGLE,
_PROPAGATION_STYLE_NONE,
_PROPAGATION_STYLE_BAGGAGE,
)
W3C_TRACESTATE_KEY = "tracestate"
W3C_TRACEPARENT_KEY = "traceparent"
Expand Down Expand Up @@ -67,6 +69,10 @@
_HTTPLIB_NO_TRACE_REQUEST = "_dd_no_trace"
DEFAULT_TIMEOUT = 2.0

# baggage
DD_TRACE_BAGGAGE_MAX_ITEMS = 64
DD_TRACE_BAGGAGE_MAX_BYTES = 8192


class _PRIORITY_CATEGORY:
USER = "user"
Expand Down
10 changes: 5 additions & 5 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _do_annotations(self, span):
if span.span_type != SpanTypes.LLM: # do this check to avoid the warning log in `annotate`
return
current_context = self._instance.tracer.current_trace_context()
current_context_id = current_context._get_baggage_item(ANNOTATIONS_CONTEXT_ID)
current_context_id = current_context.get_baggage_item(ANNOTATIONS_CONTEXT_ID)
with self._annotation_context_lock:
for _, context_id, annotation_kwargs in self._instance._annotations:
if current_context_id == context_id:
Expand Down Expand Up @@ -301,12 +301,12 @@ def get_annotations_context_id():
ctx_id = annotation_id
if current_ctx is None:
current_ctx = Context(is_remote=False)
current_ctx._set_baggage_item(ANNOTATIONS_CONTEXT_ID, ctx_id)
current_ctx.set_baggage_item(ANNOTATIONS_CONTEXT_ID, ctx_id)
cls._instance.tracer.context_provider.activate(current_ctx)
elif not current_ctx._get_baggage_item(ANNOTATIONS_CONTEXT_ID):
current_ctx._set_baggage_item(ANNOTATIONS_CONTEXT_ID, ctx_id)
elif not current_ctx.get_baggage_item(ANNOTATIONS_CONTEXT_ID):
current_ctx.set_baggage_item(ANNOTATIONS_CONTEXT_ID, ctx_id)
else:
ctx_id = current_ctx._get_baggage_item(ANNOTATIONS_CONTEXT_ID)
ctx_id = current_ctx.get_baggage_item(ANNOTATIONS_CONTEXT_ID)
return ctx_id

def register_annotation():
Expand Down
111 changes: 101 additions & 10 deletions ddtrace/propagation/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Text # noqa:F401
from typing import Tuple # noqa:F401
from typing import cast # noqa:F401
import urllib.parse

import ddtrace
from ddtrace._trace.span import Span # noqa:F401
Expand Down Expand Up @@ -38,8 +39,11 @@
from ..internal._tagset import decode_tagset_string
from ..internal._tagset import encode_tagset_values
from ..internal.compat import ensure_text
from ..internal.constants import _PROPAGATION_STYLE_BAGGAGE
from ..internal.constants import _PROPAGATION_STYLE_NONE
from ..internal.constants import _PROPAGATION_STYLE_W3C_TRACECONTEXT
from ..internal.constants import DD_TRACE_BAGGAGE_MAX_BYTES
from ..internal.constants import DD_TRACE_BAGGAGE_MAX_ITEMS
from ..internal.constants import HIGHER_ORDER_TRACE_ID_BITS as _HIGHER_ORDER_TRACE_ID_BITS
from ..internal.constants import LAST_DD_PARENT_ID_KEY
from ..internal.constants import MAX_UINT_64BITS as _MAX_UINT_64BITS
Expand Down Expand Up @@ -74,6 +78,7 @@
_HTTP_HEADER_TAGS: Literal["x-datadog-tags"] = "x-datadog-tags"
_HTTP_HEADER_TRACEPARENT: Literal["traceparent"] = "traceparent"
_HTTP_HEADER_TRACESTATE: Literal["tracestate"] = "tracestate"
_HTTP_HEADER_BAGGAGE: Literal["baggage"] = "baggage"


def _possible_header(header):
Expand Down Expand Up @@ -127,7 +132,7 @@ def _attach_baggage_to_context(headers: Dict[str, str], context: Context):
if context is not None:
for key, value in headers.items():
if key[: len(_HTTP_BAGGAGE_PREFIX)] == _HTTP_BAGGAGE_PREFIX:
context._set_baggage_item(key[len(_HTTP_BAGGAGE_PREFIX) :], value)
context.set_baggage_item(key[len(_HTTP_BAGGAGE_PREFIX) :], value)


def _hex_id_to_dd_id(hex_id):
Expand Down Expand Up @@ -885,12 +890,77 @@ def _inject(span_context, headers):
return headers


class _BaggageHeader:
"""Helper class to inject/extract Baggage Headers"""

SAFE_CHARACTERS_KEY = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789" "!#$%&'*+-.^_`|~"
SAFE_CHARACTERS_VALUE = (
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789" "!#$%&'()*+-./:<>?@[]^_`{|}~"
)

@staticmethod
def _encode_key(key: str) -> str:
return urllib.parse.quote(str(key).strip(), safe=_BaggageHeader.SAFE_CHARACTERS_KEY)

@staticmethod
def _encode_value(value: str) -> str:
return urllib.parse.quote(str(value).strip(), safe=_BaggageHeader.SAFE_CHARACTERS_VALUE)

@staticmethod
def _inject(span_context: Context, headers: Dict[str, str]) -> None:
baggage_items = span_context._baggage.items()
if not baggage_items:
return

if len(baggage_items) > DD_TRACE_BAGGAGE_MAX_ITEMS:
log.warning("Baggage item limit exceeded")
return

try:
header_value = ",".join(
f"{_BaggageHeader._encode_key(key)}={_BaggageHeader._encode_value(value)}"
for key, value in baggage_items
)

buf = bytes(header_value, "utf-8")
if len(buf) > DD_TRACE_BAGGAGE_MAX_BYTES:
log.warning("Baggage header size exceeded")
return

headers[_HTTP_HEADER_BAGGAGE] = header_value

except Exception:
log.warning("Failed to encode and inject baggage header")

@staticmethod
def _extract(headers: Dict[str, str]) -> Context:
header_value = headers.get(_HTTP_HEADER_BAGGAGE)

if not header_value:
return Context(baggage={})

baggage = {}
baggages = header_value.split(",")
for key_value in baggages:
if "=" not in key_value:
return Context(baggage={})
key, value = key_value.split("=", 1)
key = urllib.parse.unquote(key.strip())
value = urllib.parse.unquote(value.strip())
if not key or not value:
return Context(baggage={})
baggage[key] = value

return Context(baggage=baggage)


_PROP_STYLES = {
PROPAGATION_STYLE_DATADOG: _DatadogMultiHeader,
PROPAGATION_STYLE_B3_MULTI: _B3MultiHeader,
PROPAGATION_STYLE_B3_SINGLE: _B3SingleHeader,
_PROPAGATION_STYLE_W3C_TRACECONTEXT: _TraceContext,
_PROPAGATION_STYLE_NONE: _NOP_Propagator,
_PROPAGATION_STYLE_BAGGAGE: _BaggageHeader,
}


Expand All @@ -906,6 +976,9 @@ def _extract_configured_contexts_avail(normalized_headers):
for prop_style in config._propagation_style_extract:
propagator = _PROP_STYLES[prop_style]
context = propagator._extract(normalized_headers)
# baggage is handled separately
if prop_style == _PROPAGATION_STYLE_BAGGAGE:
continue
if context:
contexts.append(context)
styles_w_ctx.append(prop_style)
Expand All @@ -915,6 +988,7 @@ def _extract_configured_contexts_avail(normalized_headers):
def _resolve_contexts(contexts, styles_w_ctx, normalized_headers):
primary_context = contexts[0]
links = []

for context in contexts[1:]:
style_w_ctx = styles_w_ctx[contexts.index(context)]
# encoding expects at least trace_id and span_id
Expand All @@ -924,9 +998,11 @@ def _resolve_contexts(contexts, styles_w_ctx, normalized_headers):
context.trace_id,
context.span_id,
flags=1 if context.sampling_priority and context.sampling_priority > 0 else 0,
tracestate=context._meta.get(W3C_TRACESTATE_KEY, "")
if style_w_ctx == _PROPAGATION_STYLE_W3C_TRACECONTEXT
else None,
tracestate=(
context._meta.get(W3C_TRACESTATE_KEY, "")
if style_w_ctx == _PROPAGATION_STYLE_W3C_TRACECONTEXT
else None
),
attributes={
"reason": "terminated_context",
"context_headers": style_w_ctx,
Expand All @@ -952,6 +1028,7 @@ def _resolve_contexts(contexts, styles_w_ctx, normalized_headers):
primary_context._meta[LAST_DD_PARENT_ID_KEY] = "{:016x}".format(dd_context.span_id)
# the span_id in tracecontext takes precedence over the first extracted propagation style
primary_context.span_id = context.span_id

primary_context._span_links = links
return primary_context

Expand Down Expand Up @@ -999,6 +1076,10 @@ def parent_call():
else:
log.error("ddtrace.tracer.sample is not available, unable to sample span.")

# baggage should be injected regardless of existing span or trace id
if _PROPAGATION_STYLE_BAGGAGE in config._propagation_style_inject:
_BaggageHeader._inject(span_context, headers)

# Not a valid context to propagate
if span_context.trace_id is None or span_context.span_id is None:
log.debug("tried to inject invalid context %r", span_context)
Expand All @@ -1024,7 +1105,6 @@ def parent_call():

@staticmethod
def extract(headers):
# type: (Dict[str,str]) -> Context
"""Extract a Context from HTTP headers into a new Context.
For tracecontext propagation we extract tracestate headers for
propagation even if another propagation style is specified before tracecontext,
Expand All @@ -1050,16 +1130,17 @@ def my_controller(url, headers):
return Context()
try:
normalized_headers = {name.lower(): v for name, v in headers.items()}

context = Context()
# tracer configured to extract first only
if config._propagation_extract_first:
# loop through the extract propagation styles specified in order, return whatever context we get first
for prop_style in config._propagation_style_extract:
propagator = _PROP_STYLES[prop_style]
context = propagator._extract(normalized_headers) # type: ignore
if config._propagation_http_baggage_enabled is True:
context = propagator._extract(normalized_headers)
if config.propagation_http_baggage_enabled is True:
_attach_baggage_to_context(normalized_headers, context)
return context
break

# loop through all extract propagation styles
else:
contexts, styles_w_ctx = HTTPPropagator._extract_configured_contexts_avail(normalized_headers)
Expand All @@ -1068,7 +1149,17 @@ def my_controller(url, headers):
context = HTTPPropagator._resolve_contexts(contexts, styles_w_ctx, normalized_headers)
if config._propagation_http_baggage_enabled is True:
_attach_baggage_to_context(normalized_headers, context)
return context

# baggage headers are handled separately from the other propagation styles
if _PROPAGATION_STYLE_BAGGAGE in config._propagation_style_extract:
baggage_context = _BaggageHeader._extract(normalized_headers)
if baggage_context._baggage != {}:
if context:
context._baggage = baggage_context._baggage
else:
context = baggage_context

return context

except Exception:
log.debug("error while extracting context propagation headers", exc_info=True)
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ def _parse_propagation_styles(styles_str):
- "b3" (formerly 'b3 single header')
- "b3 single header (deprecated for 'b3')"
- "tracecontext"
- "baggage"
- "none"
The default value is ``"datadog,tracecontext"``.
The default value is ``"datadog,tracecontext,baggage"``.
Examples::
Expand Down
3 changes: 3 additions & 0 deletions releasenotes/notes/baggage-support-be7eed26293f1216.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
features:
- |
tracing: Introduces support for Baggage as defined by the `OpenTelemetry specification <https://opentelemetry.io/docs/specs/otel/baggage/api/>`_.
Loading

0 comments on commit 2b410f9

Please sign in to comment.