From 1b8923afc59715c9779a936e5fac4b4e8a8a5a19 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Tue, 4 Mar 2025 17:16:55 -0500 Subject: [PATCH] chore(sampling): simplify sampling (#12581) ## Description With ddtrace v3.0 many sampling components are now internal to the library. This allows to refactor and simplify how sampling rules and agent service based sample rates are applied. This PR also improves the performance of creating spans by 9%. ## Changes - Removes all samplers from `ddtrace._trace.sampling` except for the `RateSampler` and `DatadogSampler`. - These are the only two samplers that are used by ddtrace components (all other samplers just add unnecessary complexity). - Updates the types of `ddtrace._tracer._sampler` and `ddtrace._tracer._user_sampler` from `BaseSampler` to `DatadogSampler`. - The tracer is only compatible with the `DatadogSampler`, the Datadog sampler is required to enable dynamic sampling and priority sampling. This also fixes some typing issues and removes unnecessary type checks and try-except blocks. - Removes `sample_rate` field from `ddtrace._trace.sampling.DatadogSampler` - Moving forward tracing spans must be sampled via a sampling rule, priority sampling (agent service based sampling), a RateLimiter or use the default sample rate (1.0 + auto_keep). - Cleans up the sampling rules parsing logic in `DatadogSampler._parse_rules_from_str` - Also fixes a subtle bug where a `NameError` is raised when we failed to parse SamplingRules (even when `DD_TESTING_RAISE` is false). [here](https://github.com/DataDog/dd-trace-py/blob/v3.1.0/ddtrace/_trace/sampler.py#L293) json_rules can be undefined. - Replaces `_PRIORITY_CATEGORY` with `_MECHANISM_TO_PRIORITIES` and `DatadogSampler._choose_priority_category_with_rule(...) with `DatadogSampler._get_sampling_mechanism(...)` - Previously a sampling outcome was mapped a "priority category", the priority category was then mapped to a sampling mechanism and then the sampling mechanism was mapped to a sampling_priority. This indirection is a bit unnecessary. This PR maps sampling outcomes directly to sampling mechanisms and sampling_priority (via `ddtrace.constants._MECHANISM_TO_PRIORITIES` map) - Renames constants in `ddtrace.internal.constants.SamplingMechanism` to improve clarity: - AUTO -> AGENT_RATE_BY_SERVICE - TRACE_SAMPLING_RULE -> LOCAL_USER_TRACE_SAMPLING_RULE - REMOTE_USER_RULE -> REMOTE_USER_TRACE_SAMPLING_RULE - REMOTE_DYNAMIC_RULE -> REMOTE_DYNAMIC_TRACE_SAMPLING_RULE - Removes `DatadogSampler._make_sampling_decision(...)`, `DatadogSampler._default_sampler`, and `DatadogSampler.set_sample_rate(...)`. - These attributes are no longer used. Follow up: Remove [ddtrace.config._trace_sample_rate](https://github.com/DataDog/dd-trace-py/pull/12582) ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Brett Langdon --- ddtrace/_trace/processor/__init__.py | 4 +- ddtrace/_trace/sampler.py | 329 ++++++++------------------- ddtrace/_trace/tracer.py | 36 ++- ddtrace/internal/constants.py | 38 ++-- ddtrace/internal/debug.py | 5 +- ddtrace/internal/sampling.py | 54 ++--- ddtrace/propagation/http.py | 2 +- tests/integration/test_debug.py | 12 - tests/integration/test_sampling.py | 9 - tests/tracer/test_sampler.py | 133 +++-------- 10 files changed, 185 insertions(+), 437 deletions(-) diff --git a/ddtrace/_trace/processor/__init__.py b/ddtrace/_trace/processor/__init__.py index 0437b65b364..b6f056f4166 100644 --- a/ddtrace/_trace/processor/__init__.py +++ b/ddtrace/_trace/processor/__init__.py @@ -9,7 +9,7 @@ from typing import Union from ddtrace import config -from ddtrace._trace.sampler import BaseSampler +from ddtrace._trace.sampler import DatadogSampler from ddtrace._trace.span import Span from ddtrace._trace.span import _get_64_highest_order_bits_as_hex from ddtrace._trace.span import _is_top_level @@ -120,7 +120,7 @@ class TraceSamplingProcessor(TraceProcessor): def __init__( self, compute_stats_enabled: bool, - sampler: BaseSampler, + sampler: DatadogSampler, single_span_rules: List[SpanSamplingRule], apm_opt_out: bool, ): diff --git a/ddtrace/_trace/sampler.py b/ddtrace/_trace/sampler.py index 62ae06d1c7b..4a131e0e1e5 100644 --- a/ddtrace/_trace/sampler.py +++ b/ddtrace/_trace/sampler.py @@ -2,22 +2,19 @@ Any `sampled = False` trace won't be written, and can be ignored by the instrumentation. """ - -import abc import json -from typing import TYPE_CHECKING # noqa:F401 -from typing import Dict # noqa:F401 -from typing import List # noqa:F401 -from typing import Optional # noqa:F401 -from typing import Tuple # noqa:F401 +from json.decoder import JSONDecodeError +from typing import Dict +from typing import List +from typing import Optional from ddtrace import config +from ddtrace._trace.span import Span from ddtrace.constants import _SAMPLING_LIMIT_DECISION from ..constants import ENV_KEY -from ..internal.constants import _PRIORITY_CATEGORY -from ..internal.constants import DEFAULT_SAMPLING_RATE_LIMIT -from ..internal.constants import MAX_UINT_64BITS as _MAX_UINT_64BITS +from ..internal.constants import MAX_UINT_64BITS +from ..internal.constants import SamplingMechanism from ..internal.logger import get_logger from ..internal.rate_limiter import RateLimiter from ..internal.sampling import _get_highest_precedence_rule_matching @@ -27,56 +24,14 @@ PROVENANCE_ORDER = ["customer", "dynamic", "default"] -try: - from json.decoder import JSONDecodeError -except ImportError: - # handling python 2.X import error - JSONDecodeError = ValueError # type: ignore - -if TYPE_CHECKING: # pragma: no cover - from ddtrace._trace.span import Span # noqa:F401 - log = get_logger(__name__) -# All references to MAX_TRACE_ID were replaced with _MAX_UINT_64BITS. -# Now that ddtrace supports generating 128bit trace_ids, -# the max trace id should be 2**128 - 1 (not 2**64 -1) -# MAX_TRACE_ID is no longer used and should be removed. -MAX_TRACE_ID = _MAX_UINT_64BITS # Has to be the same factor and key as the Agent to allow chained sampling KNUTH_FACTOR = 1111111111111111111 -class SamplingError(Exception): - pass - - -class BaseSampler(metaclass=abc.ABCMeta): - __slots__ = () - - @abc.abstractmethod - def sample(self, span): - # type: (Span) -> bool - pass - - -class BasePrioritySampler(BaseSampler): - __slots__ = () - - @abc.abstractmethod - def update_rate_by_service_sample_rates(self, sample_rates): - pass - - -class AllSampler(BaseSampler): - """Sampler sampling all the traces""" - - def sample(self, span): - return True - - -class RateSampler(BaseSampler): +class RateSampler: """Sampler based on a rate Keep (100 * `sample_rate`)% of the traces. @@ -91,165 +46,64 @@ def __init__(self, sample_rate: float = 1.0) -> None: def set_sample_rate(self, sample_rate: float) -> None: self.sample_rate = float(sample_rate) - self.sampling_id_threshold = self.sample_rate * _MAX_UINT_64BITS - - def sample(self, span): - sampled = ((span._trace_id_64bits * KNUTH_FACTOR) % _MAX_UINT_64BITS) <= self.sampling_id_threshold - return sampled - - -class _AgentRateSampler(RateSampler): - pass - - -class RateByServiceSampler(BasePrioritySampler): - """Sampler based on a rate, by service - - Keep (100 * `sample_rate`)% of the traces. - The sample rate is kept independently for each service/env tuple. - """ - - __slots__ = ("sample_rate", "_by_service_samplers", "_default_sampler") - - _default_key = "service:,env:" - - @staticmethod - def _key( - service=None, # type: Optional[str] - env=None, # type: Optional[str] - ): - # type: (...) -> str - """Compute a key with the same format used by the Datadog agent API.""" - service = service or "" - env = env or "" - return "service:" + service + ",env:" + env - - def __init__(self, sample_rate=1.0): - # type: (float) -> None - self.sample_rate = sample_rate - self._default_sampler = RateSampler(self.sample_rate) - self._by_service_samplers = {} # type: Dict[str, RateSampler] - - def set_sample_rate( - self, - sample_rate, # type: float - service=None, # type: Optional[str] - env=None, # type: Optional[str] - ): - # type: (...) -> None - - # if we have a blank service, we need to match it to the config.service - if service is None: - service = config.service - if env is None: - env = config.env - - self._by_service_samplers[self._key(service, env)] = _AgentRateSampler(sample_rate) + self.sampling_id_threshold = self.sample_rate * MAX_UINT_64BITS - def sample(self, span): - sampled, sampler = self._make_sampling_decision(span) - _set_sampling_tags( - span, - sampled, - sampler.sample_rate, - self._choose_priority_category(sampler), - ) + def sample(self, span: Span) -> bool: + sampled = ((span._trace_id_64bits * KNUTH_FACTOR) % MAX_UINT_64BITS) <= self.sampling_id_threshold return sampled - def _choose_priority_category(self, sampler): - # type: (BaseSampler) -> str - if sampler is self._default_sampler: - return _PRIORITY_CATEGORY.DEFAULT - elif isinstance(sampler, _AgentRateSampler): - return _PRIORITY_CATEGORY.AUTO - else: - return _PRIORITY_CATEGORY.RULE_DEF - - def _make_sampling_decision(self, span): - # type: (Span) -> Tuple[bool, BaseSampler] - env = span.get_tag(ENV_KEY) - key = self._key(span.service, env) - sampler = self._by_service_samplers.get(key) or self._default_sampler - sampled = sampler.sample(span) - return sampled, sampler - - def update_rate_by_service_sample_rates(self, rate_by_service): - # type: (Dict[str, float]) -> None - samplers = {} # type: Dict[str, RateSampler] - for key, sample_rate in rate_by_service.items(): - samplers[key] = _AgentRateSampler(sample_rate) - - self._by_service_samplers = samplers - -class DatadogSampler(RateByServiceSampler): +class DatadogSampler: """ - By default, this sampler relies on dynamic sample rates provided by the trace agent - to determine which traces are kept or dropped. + The DatadogSampler samples traces based on the following (in order of precedence): + - A list of sampling rules, applied in the order they are provided. The first matching rule is used. + - A default sample rate, stored as the final sampling rule (lowest precedence sampling rule). + - A global rate limit, applied only if a rule is matched or if `rate_limit_always_on` is set to `True`. + - Sample rates provided by the agent (priority sampling, maps sample rates to service and env tags). + - By default, spans are sampled at a rate of 1.0 and assigned an `AUTO_KEEP` priority, allowing + the agent to determine the final sample rate and sampling decision. - You can also configure a static sample rate via ``default_sample_rate`` to use for sampling. - When a ``default_sample_rate`` is configured, that is the only sample rate used, and the agent - provided rates are ignored. - - You may also supply a list of ``SamplingRule`` instances to set sample rates for specific - services. - - Example rules:: + Example sampling rules:: DatadogSampler(rules=[ SamplingRule(sample_rate=1.0, service="my-svc"), SamplingRule(sample_rate=0.0, service="less-important"), + SamplingRule(sample_rate=0.5), # sample all remaining services at 50% ]) - - Rules are evaluated in the order they are provided, and the first rule that matches is used. - If no rule matches, then the agent sample rates are used. - - This sampler can be configured with a rate limit. This will ensure the max number of - sampled traces per second does not exceed the supplied limit. The default is 100 traces kept - per second. """ - __slots__ = ("limiter", "rules", "default_sample_rate", "_rate_limit_always_on") - - NO_RATE_LIMIT = -1 - # deprecate and remove the DEFAULT_RATE_LIMIT field from DatadogSampler - DEFAULT_RATE_LIMIT = DEFAULT_SAMPLING_RATE_LIMIT + __slots__ = ( + "limiter", + "rules", + "default_sample_rate", + "_rate_limit_always_on", + "_by_service_samplers", + ) + _default_key = "service:,env:" def __init__( self, - rules=None, # type: Optional[List[SamplingRule]] - default_sample_rate=None, # type: Optional[float] - rate_limit=None, # type: Optional[int] - rate_limit_window=1e9, # type: float - rate_limit_always_on=False, # type: bool + rules: Optional[List[SamplingRule]] = None, + default_sample_rate: Optional[float] = None, + rate_limit: Optional[int] = None, + rate_limit_window: float = 1e9, + rate_limit_always_on: bool = False, ): - # type: (...) -> None """ Constructor for DatadogSampler sampler :param rules: List of :class:`SamplingRule` rules to apply to the root span of every trace, default no rules - :param default_sample_rate: The default sample rate to apply if no rules matched (default: ``None`` / - Use :class:`RateByServiceSampler` only) + :param default_sample_rate: The default sample rate to apply if no rules matched :param rate_limit: Global rate limit (traces per second) to apply to all traces regardless of the rules applied to them, (default: ``100``) """ - # Use default sample rate of 1.0 - super(DatadogSampler, self).__init__() - - if rate_limit is None: - rate_limit = int(config._trace_rate_limit) - - self._rate_limit_always_on = rate_limit_always_on - + # Set sampling rules + self.rules: List[SamplingRule] = [] if rules is None: env_sampling_rules = config._trace_sampling_rules if env_sampling_rules: - rules = self._parse_rules_from_str(env_sampling_rules) - else: - rules = [] - self.rules = rules + self.rules = self._parse_rules_from_str(env_sampling_rules) else: - self.rules = [] # Validate that rules is a list of SampleRules for rule in rules: if isinstance(rule, SamplingRule): @@ -258,28 +112,46 @@ def __init__( raise TypeError( "Rule {!r} must be a sub-class of type ddtrace._trace.sampler.SamplingRules".format(rule) ) - - # DEV: sampling rule must come last if default_sample_rate is not None: + # DEV: sampling rule must come last self.rules.append(SamplingRule(sample_rate=default_sample_rate)) - - # Configure rate limiter - self.limiter = RateLimiter(rate_limit, rate_limit_window) + # Set Agent based samplers + self._by_service_samplers: Dict[str, RateSampler] = {} + # Set rate limiter + self._rate_limit_always_on: bool = rate_limit_always_on + if rate_limit is None: + rate_limit = int(config._trace_rate_limit) + self.limiter: RateLimiter = RateLimiter(rate_limit, rate_limit_window) log.debug("initialized %r", self) + @staticmethod + def _key(service: Optional[str], env: Optional[str]): + """Compute a key with the same format used by the Datadog agent API.""" + return f"service:{service or ''},env:{env or ''}" + + def update_rate_by_service_sample_rates(self, rate_by_service: Dict[str, float]) -> None: + samplers: Dict[str, RateSampler] = {} + for key, sample_rate in rate_by_service.items(): + samplers[key] = RateSampler(sample_rate) + self._by_service_samplers = samplers + def __str__(self): rates = {key: sampler.sample_rate for key, sampler in self._by_service_samplers.items()} - return "{}(agent_rates={!r}, limiter={!r}, rules={!r})".format( - self.__class__.__name__, rates, self.limiter, self.rules + return "{}(agent_rates={!r}, limiter={!r}, rules={!r}), rate_limit_always_on={!r}".format( + self.__class__.__name__, + rates, + self.limiter, + self.rules, + self._rate_limit_always_on, ) __repr__ = __str__ @staticmethod - def _parse_rules_from_str(rules): - # type: (str) -> List[SamplingRule] + def _parse_rules_from_str(rules: str) -> List[SamplingRule]: sampling_rules = [] + json_rules = [] try: json_rules = json.loads(rules) except JSONDecodeError: @@ -290,47 +162,42 @@ def _parse_rules_from_str(rules): if config._raise: raise KeyError("No sample_rate provided for sampling rule: {}".format(json.dumps(rule))) continue - sample_rate = float(rule["sample_rate"]) - service = rule.get("service", SamplingRule.NO_RULE) - name = rule.get("name", SamplingRule.NO_RULE) - resource = rule.get("resource", SamplingRule.NO_RULE) - tags = rule.get("tags", SamplingRule.NO_RULE) - provenance = rule.get("provenance", "default") try: sampling_rule = SamplingRule( - sample_rate=sample_rate, - service=service, - name=name, - resource=resource, - tags=tags, - provenance=provenance, + sample_rate=float(rule["sample_rate"]), + service=rule.get("service", SamplingRule.NO_RULE), + name=rule.get("name", SamplingRule.NO_RULE), + resource=rule.get("resource", SamplingRule.NO_RULE), + tags=rule.get("tags", SamplingRule.NO_RULE), + provenance=rule.get("provenance", "default"), ) + sampling_rules.append(sampling_rule) except ValueError as e: if config._raise: raise ValueError("Error creating sampling rule {}: {}".format(json.dumps(rule), e)) - continue - sampling_rules.append(sampling_rule) # Sort the sampling_rules list using a lambda function as the key sampling_rules = sorted(sampling_rules, key=lambda rule: PROVENANCE_ORDER.index(rule.provenance)) return sampling_rules - def sample(self, span): + def sample(self, span: Span) -> bool: span.context._update_tags(span) - matched_rule = _get_highest_precedence_rule_matching(span, self.rules) - - sampler = self._default_sampler # type: BaseSampler - sample_rate = self.sample_rate + # Default sampling + agent_service_based = False + sampled = True + sample_rate = 1.0 if matched_rule: - # Client based sampling + # Rules based sampling (set via env_var or remote config) sampled = matched_rule.sample(span) sample_rate = matched_rule.sample_rate else: - # Agent based sampling - sampled, sampler = super(DatadogSampler, self)._make_sampling_decision(span) - if isinstance(sampler, RateSampler): - sample_rate = sampler.sample_rate + key = self._key(span.service, span.get_tag(ENV_KEY)) + if key in self._by_service_samplers: + # Agent service based sampling + agent_service_based = True + sampled = self._by_service_samplers[key].sample(span) + sample_rate = self._by_service_samplers[key].sample_rate if matched_rule or self._rate_limit_always_on: # Avoid rate limiting when trace sample rules and/or sample rates are NOT provided @@ -339,26 +206,28 @@ def sample(self, span): if sampled: sampled = self.limiter.is_allowed() span.set_metric(_SAMPLING_LIMIT_DECISION, self.limiter.effective_rate) + + sampling_mechanism = self._get_sampling_mechanism(matched_rule, agent_service_based) _set_sampling_tags( span, sampled, sample_rate, - self._choose_priority_category_with_rule(matched_rule, sampler), + sampling_mechanism, ) - return sampled - def _choose_priority_category_with_rule(self, rule, sampler): - # type: (Optional[SamplingRule], BaseSampler) -> str - if rule: - provenance = rule.provenance - if provenance == "customer": - return _PRIORITY_CATEGORY.RULE_CUSTOMER - if provenance == "dynamic": - return _PRIORITY_CATEGORY.RULE_DYNAMIC - return _PRIORITY_CATEGORY.RULE_DEF + def _get_sampling_mechanism(self, matched_rule: Optional[SamplingRule], agent_service_based: bool) -> int: + if matched_rule and matched_rule.provenance == "customer": + return SamplingMechanism.REMOTE_USER_TRACE_SAMPLING_RULE + elif matched_rule and matched_rule.provenance == "dynamic": + return SamplingMechanism.REMOTE_DYNAMIC_TRACE_SAMPLING_RULE + elif matched_rule: + return SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE elif self._rate_limit_always_on: # backwards compaitbiility for ASM, when the rate limit is always on (ASM standalone mode) # we want spans to be set to a MANUAL priority to avoid agent based sampling - return _PRIORITY_CATEGORY.USER - return super(DatadogSampler, self)._choose_priority_category(sampler) + return SamplingMechanism.MANUAL + elif agent_service_based: + return SamplingMechanism.AGENT_RATE_BY_SERVICE + else: + return SamplingMechanism.DEFAULT diff --git a/ddtrace/_trace/tracer.py b/ddtrace/_trace/tracer.py index 6a9500f1ea2..999ad3a40d2 100644 --- a/ddtrace/_trace/tracer.py +++ b/ddtrace/_trace/tracer.py @@ -25,8 +25,6 @@ from ddtrace._trace.processor import TraceTagsProcessor from ddtrace._trace.provider import BaseContextProvider from ddtrace._trace.provider import DefaultContextProvider -from ddtrace._trace.sampler import BasePrioritySampler -from ddtrace._trace.sampler import BaseSampler from ddtrace._trace.sampler import DatadogSampler from ddtrace._trace.span import Span from ddtrace.appsec._constants import APPSEC @@ -107,7 +105,7 @@ def _default_span_processors_factory( compute_stats_enabled: bool, single_span_sampling_rules: List[SpanSamplingRule], agent_url: str, - trace_sampler: BaseSampler, + trace_sampler: DatadogSampler, profiling_span_processor: EndpointCallCounterProcessor, ) -> Tuple[List[SpanProcessor], Optional[Any], List[SpanProcessor]]: # FIXME: type should be AppsecSpanProcessor but we have a cyclic import here @@ -230,7 +228,7 @@ def __init__( self.enabled = config._tracing_enabled self.context_provider = context_provider or DefaultContextProvider() # _user_sampler is the backup in case we need to revert from remote config to local - self._user_sampler: BaseSampler = DatadogSampler() + self._user_sampler = DatadogSampler() self._dogstatsd_url = agent.get_stats_url() if dogstatsd_url is None else dogstatsd_url if asm_config._apm_opt_out: self.enabled = False @@ -239,9 +237,9 @@ def __init__( # If ASM is enabled but tracing is disabled, # we need to set the rate limiting to 1 trace per minute # for the backend to consider the service as alive. - self._sampler: BaseSampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True) + self._sampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True) else: - self._sampler: BaseSampler = DatadogSampler() + self._sampler = DatadogSampler() self._compute_stats = config._trace_compute_stats self._agent_url: str = agent.get_trace_url() if url is None else url verify_url(self._agent_url) @@ -437,7 +435,7 @@ def _configure( port: Optional[int] = None, uds_path: Optional[str] = None, https: Optional[bool] = None, - sampler: Optional[BaseSampler] = None, + sampler: Optional[DatadogSampler] = None, context_provider: Optional[BaseContextProvider] = None, wrap_executor: Optional[Callable] = None, priority_sampling: Optional[bool] = None, @@ -478,16 +476,11 @@ def _configure( # Disable compute stats (neither agent or tracer should compute them) config._trace_compute_stats = False # Update the rate limiter to 1 trace per minute when tracing is disabled - if isinstance(sampler, DatadogSampler): - sampler._rate_limit_always_on = True # type: ignore[has-type] - sampler.limiter.rate_limit = 1 # type: ignore[has-type] - sampler.limiter.time_window = 60e9 # type: ignore[has-type] + if sampler is not None: + sampler._rate_limit_always_on = True + sampler.limiter.rate_limit = 1 + sampler.limiter.time_window = 60e9 else: - if sampler is not None: - log.warning( - "Overriding sampler: %s, a DatadogSampler must be used in ASM Standalone mode", - sampler.__class__, - ) sampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True) log.debug("ASM standalone mode is enabled, traces will be rate limited at 1 trace per minute") @@ -600,12 +593,11 @@ def _agent_response_callback(self, resp: AgentResponse) -> None: The agent can return updated sample rates for the priority sampler. """ try: - if isinstance(self._sampler, BasePrioritySampler): - self._sampler.update_rate_by_service_sample_rates( - resp.rate_by_service, - ) - except ValueError: - log.error("sample_rate is negative, cannot update the rate samplers") + self._sampler.update_rate_by_service_sample_rates( + resp.rate_by_service, + ) + except ValueError as e: + log.error("Failed to set agent service sample rates: %s", str(e)) def _generate_diagnostic_logs(self): if config._debug_mode or config._startup_logs_enabled: diff --git a/ddtrace/internal/constants.py b/ddtrace/internal/constants.py index c4255035c41..fb1f9b57180 100644 --- a/ddtrace/internal/constants.py +++ b/ddtrace/internal/constants.py @@ -82,24 +82,30 @@ DD_TRACE_BAGGAGE_MAX_BYTES = 8192 -class _PRIORITY_CATEGORY: - USER = "user" - RULE_DEF = "rule_default" - RULE_CUSTOMER = "rule_customer" - RULE_DYNAMIC = "rule_dynamic" - AUTO = "auto" - DEFAULT = "default" +class SamplingMechanism(object): + DEFAULT = 0 + AGENT_RATE_BY_SERVICE = 1 + REMOTE_RATE = 2 # not used, this mechanism is deprecated + LOCAL_USER_TRACE_SAMPLING_RULE = 3 + MANUAL = 4 + APPSEC = 5 + REMOTE_RATE_USER = 6 # not used, this mechanism is deprecated + REMOTE_RATE_DATADOG = 7 # not used, this mechanism is deprecated + SPAN_SAMPLING_RULE = 8 + OTLP_INGEST_PROBABILISTIC_SAMPLING = 9 # not used in ddtrace + DATA_JOBS_MONITORING = 10 # not used in ddtrace + REMOTE_USER_TRACE_SAMPLING_RULE = 11 + REMOTE_DYNAMIC_TRACE_SAMPLING_RULE = 12 -# intermediate mapping of priority categories to actual priority values -# used to simplify code that selects sampling priority based on many factors -_CATEGORY_TO_PRIORITIES = { - _PRIORITY_CATEGORY.USER: (USER_KEEP, USER_REJECT), - _PRIORITY_CATEGORY.RULE_DEF: (USER_KEEP, USER_REJECT), - _PRIORITY_CATEGORY.RULE_CUSTOMER: (USER_KEEP, USER_REJECT), - _PRIORITY_CATEGORY.RULE_DYNAMIC: (USER_KEEP, USER_REJECT), - _PRIORITY_CATEGORY.AUTO: (AUTO_KEEP, AUTO_REJECT), - _PRIORITY_CATEGORY.DEFAULT: (AUTO_KEEP, AUTO_REJECT), +SAMPLING_MECHANISM_TO_PRIORITIES = { + # TODO(munir): Update mapping to include single span sampling and appsec sampling mechaisms + SamplingMechanism.AGENT_RATE_BY_SERVICE: (AUTO_KEEP, AUTO_REJECT), + SamplingMechanism.DEFAULT: (AUTO_KEEP, AUTO_REJECT), + SamplingMechanism.MANUAL: (USER_KEEP, USER_REJECT), + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE: (USER_KEEP, USER_REJECT), + SamplingMechanism.REMOTE_USER_TRACE_SAMPLING_RULE: (USER_KEEP, USER_REJECT), + SamplingMechanism.REMOTE_DYNAMIC_TRACE_SAMPLING_RULE: (USER_KEEP, USER_REJECT), } _KEEP_PRIORITY_INDEX = 0 _REJECT_PRIORITY_INDEX = 1 diff --git a/ddtrace/internal/debug.py b/ddtrace/internal/debug.py index 2a285423e4c..c0bd3c1ecfc 100644 --- a/ddtrace/internal/debug.py +++ b/ddtrace/internal/debug.py @@ -10,7 +10,6 @@ from typing import Union # noqa:F401 import ddtrace -from ddtrace._trace.sampler import DatadogSampler from ddtrace.internal import agent from ddtrace.internal.packages import get_distributions from ddtrace.internal.utils.cache import callonce @@ -72,9 +71,7 @@ def collect(tracer): agent_url = "CUSTOM" agent_error = None - sampler_rules = None - if isinstance(tracer._sampler, DatadogSampler): - sampler_rules = [str(rule) for rule in tracer._sampler.rules] + sampler_rules = [str(rule) for rule in tracer._sampler.rules] is_venv = in_venv() diff --git a/ddtrace/internal/sampling.py b/ddtrace/internal/sampling.py index 40f82a388c2..e51cd8c9879 100644 --- a/ddtrace/internal/sampling.py +++ b/ddtrace/internal/sampling.py @@ -22,10 +22,11 @@ from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MAX_PER_SEC_NO_LIMIT from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MECHANISM from ddtrace.constants import _SINGLE_SPAN_SAMPLING_RATE -from ddtrace.internal.constants import _CATEGORY_TO_PRIORITIES from ddtrace.internal.constants import _KEEP_PRIORITY_INDEX from ddtrace.internal.constants import _REJECT_PRIORITY_INDEX from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY +from ddtrace.internal.constants import SAMPLING_MECHANISM_TO_PRIORITIES +from ddtrace.internal.constants import SamplingMechanism from ddtrace.internal.glob_matching import GlobMatcher from ddtrace.internal.logger import get_logger @@ -49,20 +50,6 @@ MAX_SPAN_ID = 2**64 -class SamplingMechanism(object): - DEFAULT = 0 - AGENT_RATE = 1 - REMOTE_RATE = 2 - TRACE_SAMPLING_RULE = 3 - MANUAL = 4 - APPSEC = 5 - REMOTE_RATE_USER = 6 - REMOTE_RATE_DATADOG = 7 - SPAN_SAMPLING_RULE = 8 - REMOTE_USER_RULE = 11 - REMOTE_DYNAMIC_RULE = 12 - - class PriorityCategory(object): DEFAULT = "default" AUTO = "auto" @@ -278,30 +265,23 @@ def is_single_span_sampled(span): return span.get_metric(_SINGLE_SPAN_SAMPLING_MECHANISM) == SamplingMechanism.SPAN_SAMPLING_RULE -def _set_sampling_tags(span, sampled, sample_rate, priority_category): - # type: (Span, bool, float, str) -> None - mechanism = SamplingMechanism.TRACE_SAMPLING_RULE - if priority_category == PriorityCategory.RULE_DEFAULT: - span.set_metric(_SAMPLING_RULE_DECISION, sample_rate) - if priority_category == PriorityCategory.RULE_CUSTOMER: - span.set_metric(_SAMPLING_RULE_DECISION, sample_rate) - mechanism = SamplingMechanism.REMOTE_USER_RULE - if priority_category == PriorityCategory.RULE_DYNAMIC: +def _set_sampling_tags(span, sampled, sample_rate, mechanism): + # type: (Span, bool, float, int) -> None + # Set the sampling mechanism + set_sampling_decision_maker(span.context, mechanism) + # Set the sampling psr rate + if mechanism in ( + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, + SamplingMechanism.REMOTE_USER_TRACE_SAMPLING_RULE, + SamplingMechanism.REMOTE_DYNAMIC_TRACE_SAMPLING_RULE, + ): span.set_metric(_SAMPLING_RULE_DECISION, sample_rate) - mechanism = SamplingMechanism.REMOTE_DYNAMIC_RULE - elif priority_category == PriorityCategory.DEFAULT: - mechanism = SamplingMechanism.DEFAULT - elif priority_category == PriorityCategory.AUTO: - mechanism = SamplingMechanism.AGENT_RATE + elif mechanism == SamplingMechanism.AGENT_RATE_BY_SERVICE: span.set_metric(_SAMPLING_AGENT_DECISION, sample_rate) - priorities = _CATEGORY_TO_PRIORITIES[priority_category] - _set_priority(span, priorities[_KEEP_PRIORITY_INDEX] if sampled else priorities[_REJECT_PRIORITY_INDEX]) - set_sampling_decision_maker(span.context, mechanism) - - -def _set_priority(span, priority): - # type: (Span, int) -> None - span.context.sampling_priority = priority + # Set the sampling priority + priorities = SAMPLING_MECHANISM_TO_PRIORITIES[mechanism] + priority_index = _KEEP_PRIORITY_INDEX if sampled else _REJECT_PRIORITY_INDEX + span.context.sampling_priority = priorities[priority_index] def _get_highest_precedence_rule_matching(span, rules): diff --git a/ddtrace/propagation/http.py b/ddtrace/propagation/http.py index a6efdb70f3e..c74e7f30f07 100644 --- a/ddtrace/propagation/http.py +++ b/ddtrace/propagation/http.py @@ -347,7 +347,7 @@ def _extract(headers): meta = {} if not meta.get(SAMPLING_DECISION_TRACE_TAG_KEY): - meta[SAMPLING_DECISION_TRACE_TAG_KEY] = f"-{SamplingMechanism.TRACE_SAMPLING_RULE}" + meta[SAMPLING_DECISION_TRACE_TAG_KEY] = f"-{SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE}" # Try to parse values into their expected types try: diff --git a/tests/integration/test_debug.py b/tests/integration/test_debug.py index b2f973b1a48..51386c1433d 100644 --- a/tests/integration/test_debug.py +++ b/tests/integration/test_debug.py @@ -317,18 +317,6 @@ def flush_queue(self) -> None: assert info.get("agent_url") == "CUSTOM" -@pytest.mark.subprocess() -def test_different_samplers(): - import ddtrace - from ddtrace.internal import debug - from ddtrace.trace import tracer - - tracer._configure(sampler=ddtrace._trace.sampler.RateSampler()) - info = debug.collect(tracer) - - assert info.get("sampler_type") == "RateSampler" - - @pytest.mark.subprocess() def test_startup_logs_sampling_rules(): import ddtrace diff --git a/tests/integration/test_sampling.py b/tests/integration/test_sampling.py index 053064c8cd8..14f4f35d0b7 100644 --- a/tests/integration/test_sampling.py +++ b/tests/integration/test_sampling.py @@ -1,7 +1,6 @@ import pytest from ddtrace._trace.sampler import DatadogSampler -from ddtrace._trace.sampler import RateSampler from ddtrace._trace.sampler import SamplingRule from ddtrace.constants import MANUAL_DROP_KEY from ddtrace.constants import MANUAL_KEEP_KEY @@ -101,14 +100,6 @@ def test_sampling_with_default_sample_rate_1_and_manual_keep(writer, tracer): span.set_tag(MANUAL_KEEP_KEY) -@snapshot_parametrized_with_writers -def test_sampling_with_rate_sampler_with_tiny_rate(writer, tracer): - sampler = RateSampler(0.0000000001) - tracer._configure(sampler=sampler, writer=writer) - with tracer.trace("trace8"): - tracer.trace("child").finish() - - @snapshot_parametrized_with_writers def test_sampling_with_sample_rate_1_and_rate_limit_0(writer, tracer): sampler = DatadogSampler(default_sample_rate=1, rate_limit=0) diff --git a/tests/tracer/test_sampler.py b/tests/tracer/test_sampler.py index f8391c5065c..1772c32d56c 100644 --- a/tests/tracer/test_sampler.py +++ b/tests/tracer/test_sampler.py @@ -6,7 +6,6 @@ import pytest from ddtrace._trace.sampler import DatadogSampler -from ddtrace._trace.sampler import RateByServiceSampler from ddtrace._trace.sampler import RateSampler from ddtrace._trace.sampling_rule import SamplingRule from ddtrace.constants import _SAMPLING_AGENT_DECISION @@ -17,6 +16,7 @@ from ddtrace.constants import AUTO_REJECT from ddtrace.constants import USER_KEEP from ddtrace.constants import USER_REJECT +from ddtrace.internal.constants import DEFAULT_SAMPLING_RATE_LIMIT from ddtrace.internal.rate_limiter import RateLimiter from ddtrace.internal.sampling import SAMPLING_DECISION_TRACE_TAG_KEY from ddtrace.internal.sampling import SamplingMechanism @@ -24,7 +24,6 @@ from ddtrace.trace import Context from ddtrace.trace import Span -from ..subprocesstest import run_in_subprocess from ..utils import DummyTracer from ..utils import override_global_config @@ -88,36 +87,6 @@ def test_set_sample_rate(self): sampler.set_sample_rate(str(rate)) assert sampler.sample_rate == float(rate), "The rate can be set as a string" - def test_sample_rate_deviation(self): - for sample_rate in [0.1, 0.25, 0.5, 1]: - tracer = DummyTracer() - - # Since RateSampler does not set the sampling priority on a span, we will use a DatadogSampler - # with rate limiting disabled. - tracer._sampler = DatadogSampler(default_sample_rate=sample_rate, rate_limit=-1) - - iterations = int(1e4 / sample_rate) - - for i in range(iterations): - span = tracer.trace(str(i)) - span.finish() - - samples = tracer.pop() - # non sampled spans do not have sample rate applied - sampled_spans = [s for s in samples if s.context.sampling_priority > 0] - if sample_rate != 1: - assert len(sampled_spans) != len(samples) - else: - assert len(sampled_spans) == len(samples) - - deviation = abs(len(sampled_spans) - (iterations * sample_rate)) / (iterations * sample_rate) - assert ( - deviation < 0.05 - ), "Actual sample rate should be within 5 percent of set sample " "rate (actual: %f, set: %f)" % ( - deviation, - sample_rate, - ) - def test_deterministic_behavior(self): """Test that for a given trace ID, the result is always the same""" tracer = DummyTracer() @@ -155,71 +124,27 @@ def test_sample_rate_0_does_not_reset_to_1(self): # RateByServiceSamplerTest Cases def test_default_key(): - assert ( - "service:,env:" == RateByServiceSampler._default_key - ), "default key should correspond to no service and no env" + assert "service:,env:" == DatadogSampler._default_key, "default key should correspond to no service and no env" def test_key(): - assert ( - RateByServiceSampler._default_key == RateByServiceSampler._key() - ), "_key() with no arguments returns the default key" - assert "service:mcnulty,env:" == RateByServiceSampler._key( - service="mcnulty" + assert DatadogSampler._default_key == DatadogSampler._key( + None, None + ), "_key() with None arguments returns the default key" + assert "service:mcnulty,env:" == DatadogSampler._key( + service="mcnulty", env=None ), "_key call with service name returns expected result" - assert "service:,env:test" == RateByServiceSampler._key( - env="test" + assert "service:,env:test" == DatadogSampler._key( + None, env="test" ), "_key call with env name returns expected result" - assert "service:mcnulty,env:test" == RateByServiceSampler._key( + assert "service:mcnulty,env:test" == DatadogSampler._key( service="mcnulty", env="test" ), "_key call with service and env name returns expected result" - assert "service:mcnulty,env:test" == RateByServiceSampler._key( + assert "service:mcnulty,env:test" == DatadogSampler._key( "mcnulty", "test" ), "_key call with service and env name as positional args returns expected result" -@run_in_subprocess(env=dict(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED="true")) -def test_sample_rate_deviation_128bit_trace_id(): - _test_sample_rate_deviation() - - -@run_in_subprocess(env=dict(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED="false", DD_SERVICE="my-svc")) -def test_sample_rate_deviation_64bit_trace_id(): - _test_sample_rate_deviation() - - -def _test_sample_rate_deviation(): - for sample_rate in [0.1, 0.25, 0.5, 1]: - tracer = DummyTracer() - tracer._configure(sampler=RateByServiceSampler()) - tracer._sampler.set_sample_rate(sample_rate) - - iterations = int(1e4 / sample_rate) - - for i in range(iterations): - span = tracer.trace(str(i)) - span.finish() - - samples = tracer.pop() - samples_with_high_priority = 0 - for sample in samples: - sample_priority = sample.context.sampling_priority - samples_with_high_priority += int(bool(sample_priority > 0)) - assert_sampling_decision_tags( - sample, - agent=sample_rate, - trace_tag="-{}".format(SamplingMechanism.AGENT_RATE), - ) - - deviation = abs(samples_with_high_priority - (iterations * sample_rate)) / (iterations * sample_rate) - assert ( - deviation < 0.05 - ), "Actual sample rate should be within 5 percent of set sample " "rate (actual: %f, set: %f)" % ( - deviation, - sample_rate, - ) - - @pytest.mark.parametrize( "sample_rate,expectation", [ @@ -605,14 +530,14 @@ def test_datadog_sampler_init(): sampler.limiter, RateLimiter ), "DatadogSampler initialized with no arguments should hold a RateLimiter" assert ( - sampler.limiter.rate_limit == DatadogSampler.DEFAULT_RATE_LIMIT + sampler.limiter.rate_limit == DEFAULT_SAMPLING_RATE_LIMIT ), "DatadogSampler initialized with no arguments should hold a RateLimiter with the default limit" rule = SamplingRule(sample_rate=1) sampler = DatadogSampler(rules=[rule]) assert sampler.rules == [rule], "DatadogSampler initialized with a rule should hold that rule" assert ( - sampler.limiter.rate_limit == DatadogSampler.DEFAULT_RATE_LIMIT + sampler.limiter.rate_limit == DEFAULT_SAMPLING_RATE_LIMIT ), "DatadogSampler initialized with a rule should hold the default rate limit" sampler = DatadogSampler(rate_limit=10) @@ -620,7 +545,7 @@ def test_datadog_sampler_init(): sampler = DatadogSampler(default_sample_rate=0.5) assert ( - sampler.limiter.rate_limit == DatadogSampler.DEFAULT_RATE_LIMIT + sampler.limiter.rate_limit == DEFAULT_SAMPLING_RATE_LIMIT ), "DatadogSampler initialized with default_sample_rate should hold the default rate limit" assert sampler.rules == [ SamplingRule(sample_rate=0.5) @@ -721,7 +646,7 @@ def sample(self, span): ], ), USER_KEEP, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 1.0, None, ), @@ -735,7 +660,7 @@ def sample(self, span): ], ), USER_KEEP, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0.5, None, ), @@ -749,7 +674,7 @@ def sample(self, span): ], ), USER_KEEP, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0.5, None, ), @@ -763,7 +688,7 @@ def sample(self, span): ], ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0.5, None, ), @@ -772,7 +697,7 @@ def sample(self, span): default_sample_rate=0, ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0, None, ), @@ -782,7 +707,7 @@ def sample(self, span): rate_limit=0, ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 1.0, 0.0, ), @@ -793,7 +718,7 @@ def sample(self, span): ], ), USER_KEEP, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 1, None, ), @@ -805,7 +730,7 @@ def sample(self, span): rate_limit=0, ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 1, None, ), @@ -814,7 +739,7 @@ def sample(self, span): rules=[SamplingRule(sample_rate=0, name="span")], ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0, None, ), @@ -850,14 +775,14 @@ def test_datadog_sampler_tracer_child(dummy_tracer): rule=1.0, limit=None, sampling_priority=USER_KEEP, - trace_tag="-{}".format(SamplingMechanism.TRACE_SAMPLING_RULE), + trace_tag="-{}".format(SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE), ) assert_sampling_decision_tags( spans[1], agent=None, rule=None, limit=None, - trace_tag="-{}".format(SamplingMechanism.TRACE_SAMPLING_RULE), + trace_tag="-{}".format(SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE), ) @@ -873,11 +798,11 @@ def test_datadog_sampler_tracer_start_span(dummy_tracer): rule=1.0, limit=None, sampling_priority=USER_KEEP, - trace_tag="-{}".format(SamplingMechanism.TRACE_SAMPLING_RULE), + trace_tag="-{}".format(SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE), ) -@pytest.mark.parametrize("priority_sampler", [DatadogSampler(), RateByServiceSampler()]) +@pytest.mark.parametrize("priority_sampler", [DatadogSampler()]) def test_update_rate_by_service_sample_rates(priority_sampler): cases = [ { @@ -921,8 +846,8 @@ def context(): @pytest.mark.parametrize( "sampling_mechanism,expected", [ - (SamplingMechanism.AGENT_RATE, "-1"), - (SamplingMechanism.TRACE_SAMPLING_RULE, "-3"), + (SamplingMechanism.AGENT_RATE_BY_SERVICE, "-1"), + (SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, "-3"), (SamplingMechanism.DEFAULT, "-0"), (SamplingMechanism.MANUAL, "-4"), (SamplingMechanism.DEFAULT, "-0"),