diff --git a/ddtrace/_trace/processor/__init__.py b/ddtrace/_trace/processor/__init__.py index 0437b65b364..b6f056f4166 100644 --- a/ddtrace/_trace/processor/__init__.py +++ b/ddtrace/_trace/processor/__init__.py @@ -9,7 +9,7 @@ from typing import Union from ddtrace import config -from ddtrace._trace.sampler import BaseSampler +from ddtrace._trace.sampler import DatadogSampler from ddtrace._trace.span import Span from ddtrace._trace.span import _get_64_highest_order_bits_as_hex from ddtrace._trace.span import _is_top_level @@ -120,7 +120,7 @@ class TraceSamplingProcessor(TraceProcessor): def __init__( self, compute_stats_enabled: bool, - sampler: BaseSampler, + sampler: DatadogSampler, single_span_rules: List[SpanSamplingRule], apm_opt_out: bool, ): diff --git a/ddtrace/_trace/sampler.py b/ddtrace/_trace/sampler.py index 62ae06d1c7b..4a131e0e1e5 100644 --- a/ddtrace/_trace/sampler.py +++ b/ddtrace/_trace/sampler.py @@ -2,22 +2,19 @@ Any `sampled = False` trace won't be written, and can be ignored by the instrumentation. """ - -import abc import json -from typing import TYPE_CHECKING # noqa:F401 -from typing import Dict # noqa:F401 -from typing import List # noqa:F401 -from typing import Optional # noqa:F401 -from typing import Tuple # noqa:F401 +from json.decoder import JSONDecodeError +from typing import Dict +from typing import List +from typing import Optional from ddtrace import config +from ddtrace._trace.span import Span from ddtrace.constants import _SAMPLING_LIMIT_DECISION from ..constants import ENV_KEY -from ..internal.constants import _PRIORITY_CATEGORY -from ..internal.constants import DEFAULT_SAMPLING_RATE_LIMIT -from ..internal.constants import MAX_UINT_64BITS as _MAX_UINT_64BITS +from ..internal.constants import MAX_UINT_64BITS +from ..internal.constants import SamplingMechanism from ..internal.logger import get_logger from ..internal.rate_limiter import RateLimiter from ..internal.sampling import _get_highest_precedence_rule_matching @@ -27,56 +24,14 @@ PROVENANCE_ORDER = ["customer", "dynamic", "default"] -try: - from json.decoder import JSONDecodeError -except ImportError: - # handling python 2.X import error - JSONDecodeError = ValueError # type: ignore - -if TYPE_CHECKING: # pragma: no cover - from ddtrace._trace.span import Span # noqa:F401 - log = get_logger(__name__) -# All references to MAX_TRACE_ID were replaced with _MAX_UINT_64BITS. -# Now that ddtrace supports generating 128bit trace_ids, -# the max trace id should be 2**128 - 1 (not 2**64 -1) -# MAX_TRACE_ID is no longer used and should be removed. -MAX_TRACE_ID = _MAX_UINT_64BITS # Has to be the same factor and key as the Agent to allow chained sampling KNUTH_FACTOR = 1111111111111111111 -class SamplingError(Exception): - pass - - -class BaseSampler(metaclass=abc.ABCMeta): - __slots__ = () - - @abc.abstractmethod - def sample(self, span): - # type: (Span) -> bool - pass - - -class BasePrioritySampler(BaseSampler): - __slots__ = () - - @abc.abstractmethod - def update_rate_by_service_sample_rates(self, sample_rates): - pass - - -class AllSampler(BaseSampler): - """Sampler sampling all the traces""" - - def sample(self, span): - return True - - -class RateSampler(BaseSampler): +class RateSampler: """Sampler based on a rate Keep (100 * `sample_rate`)% of the traces. @@ -91,165 +46,64 @@ def __init__(self, sample_rate: float = 1.0) -> None: def set_sample_rate(self, sample_rate: float) -> None: self.sample_rate = float(sample_rate) - self.sampling_id_threshold = self.sample_rate * _MAX_UINT_64BITS - - def sample(self, span): - sampled = ((span._trace_id_64bits * KNUTH_FACTOR) % _MAX_UINT_64BITS) <= self.sampling_id_threshold - return sampled - - -class _AgentRateSampler(RateSampler): - pass - - -class RateByServiceSampler(BasePrioritySampler): - """Sampler based on a rate, by service - - Keep (100 * `sample_rate`)% of the traces. - The sample rate is kept independently for each service/env tuple. - """ - - __slots__ = ("sample_rate", "_by_service_samplers", "_default_sampler") - - _default_key = "service:,env:" - - @staticmethod - def _key( - service=None, # type: Optional[str] - env=None, # type: Optional[str] - ): - # type: (...) -> str - """Compute a key with the same format used by the Datadog agent API.""" - service = service or "" - env = env or "" - return "service:" + service + ",env:" + env - - def __init__(self, sample_rate=1.0): - # type: (float) -> None - self.sample_rate = sample_rate - self._default_sampler = RateSampler(self.sample_rate) - self._by_service_samplers = {} # type: Dict[str, RateSampler] - - def set_sample_rate( - self, - sample_rate, # type: float - service=None, # type: Optional[str] - env=None, # type: Optional[str] - ): - # type: (...) -> None - - # if we have a blank service, we need to match it to the config.service - if service is None: - service = config.service - if env is None: - env = config.env - - self._by_service_samplers[self._key(service, env)] = _AgentRateSampler(sample_rate) + self.sampling_id_threshold = self.sample_rate * MAX_UINT_64BITS - def sample(self, span): - sampled, sampler = self._make_sampling_decision(span) - _set_sampling_tags( - span, - sampled, - sampler.sample_rate, - self._choose_priority_category(sampler), - ) + def sample(self, span: Span) -> bool: + sampled = ((span._trace_id_64bits * KNUTH_FACTOR) % MAX_UINT_64BITS) <= self.sampling_id_threshold return sampled - def _choose_priority_category(self, sampler): - # type: (BaseSampler) -> str - if sampler is self._default_sampler: - return _PRIORITY_CATEGORY.DEFAULT - elif isinstance(sampler, _AgentRateSampler): - return _PRIORITY_CATEGORY.AUTO - else: - return _PRIORITY_CATEGORY.RULE_DEF - - def _make_sampling_decision(self, span): - # type: (Span) -> Tuple[bool, BaseSampler] - env = span.get_tag(ENV_KEY) - key = self._key(span.service, env) - sampler = self._by_service_samplers.get(key) or self._default_sampler - sampled = sampler.sample(span) - return sampled, sampler - - def update_rate_by_service_sample_rates(self, rate_by_service): - # type: (Dict[str, float]) -> None - samplers = {} # type: Dict[str, RateSampler] - for key, sample_rate in rate_by_service.items(): - samplers[key] = _AgentRateSampler(sample_rate) - - self._by_service_samplers = samplers - -class DatadogSampler(RateByServiceSampler): +class DatadogSampler: """ - By default, this sampler relies on dynamic sample rates provided by the trace agent - to determine which traces are kept or dropped. + The DatadogSampler samples traces based on the following (in order of precedence): + - A list of sampling rules, applied in the order they are provided. The first matching rule is used. + - A default sample rate, stored as the final sampling rule (lowest precedence sampling rule). + - A global rate limit, applied only if a rule is matched or if `rate_limit_always_on` is set to `True`. + - Sample rates provided by the agent (priority sampling, maps sample rates to service and env tags). + - By default, spans are sampled at a rate of 1.0 and assigned an `AUTO_KEEP` priority, allowing + the agent to determine the final sample rate and sampling decision. - You can also configure a static sample rate via ``default_sample_rate`` to use for sampling. - When a ``default_sample_rate`` is configured, that is the only sample rate used, and the agent - provided rates are ignored. - - You may also supply a list of ``SamplingRule`` instances to set sample rates for specific - services. - - Example rules:: + Example sampling rules:: DatadogSampler(rules=[ SamplingRule(sample_rate=1.0, service="my-svc"), SamplingRule(sample_rate=0.0, service="less-important"), + SamplingRule(sample_rate=0.5), # sample all remaining services at 50% ]) - - Rules are evaluated in the order they are provided, and the first rule that matches is used. - If no rule matches, then the agent sample rates are used. - - This sampler can be configured with a rate limit. This will ensure the max number of - sampled traces per second does not exceed the supplied limit. The default is 100 traces kept - per second. """ - __slots__ = ("limiter", "rules", "default_sample_rate", "_rate_limit_always_on") - - NO_RATE_LIMIT = -1 - # deprecate and remove the DEFAULT_RATE_LIMIT field from DatadogSampler - DEFAULT_RATE_LIMIT = DEFAULT_SAMPLING_RATE_LIMIT + __slots__ = ( + "limiter", + "rules", + "default_sample_rate", + "_rate_limit_always_on", + "_by_service_samplers", + ) + _default_key = "service:,env:" def __init__( self, - rules=None, # type: Optional[List[SamplingRule]] - default_sample_rate=None, # type: Optional[float] - rate_limit=None, # type: Optional[int] - rate_limit_window=1e9, # type: float - rate_limit_always_on=False, # type: bool + rules: Optional[List[SamplingRule]] = None, + default_sample_rate: Optional[float] = None, + rate_limit: Optional[int] = None, + rate_limit_window: float = 1e9, + rate_limit_always_on: bool = False, ): - # type: (...) -> None """ Constructor for DatadogSampler sampler :param rules: List of :class:`SamplingRule` rules to apply to the root span of every trace, default no rules - :param default_sample_rate: The default sample rate to apply if no rules matched (default: ``None`` / - Use :class:`RateByServiceSampler` only) + :param default_sample_rate: The default sample rate to apply if no rules matched :param rate_limit: Global rate limit (traces per second) to apply to all traces regardless of the rules applied to them, (default: ``100``) """ - # Use default sample rate of 1.0 - super(DatadogSampler, self).__init__() - - if rate_limit is None: - rate_limit = int(config._trace_rate_limit) - - self._rate_limit_always_on = rate_limit_always_on - + # Set sampling rules + self.rules: List[SamplingRule] = [] if rules is None: env_sampling_rules = config._trace_sampling_rules if env_sampling_rules: - rules = self._parse_rules_from_str(env_sampling_rules) - else: - rules = [] - self.rules = rules + self.rules = self._parse_rules_from_str(env_sampling_rules) else: - self.rules = [] # Validate that rules is a list of SampleRules for rule in rules: if isinstance(rule, SamplingRule): @@ -258,28 +112,46 @@ def __init__( raise TypeError( "Rule {!r} must be a sub-class of type ddtrace._trace.sampler.SamplingRules".format(rule) ) - - # DEV: sampling rule must come last if default_sample_rate is not None: + # DEV: sampling rule must come last self.rules.append(SamplingRule(sample_rate=default_sample_rate)) - - # Configure rate limiter - self.limiter = RateLimiter(rate_limit, rate_limit_window) + # Set Agent based samplers + self._by_service_samplers: Dict[str, RateSampler] = {} + # Set rate limiter + self._rate_limit_always_on: bool = rate_limit_always_on + if rate_limit is None: + rate_limit = int(config._trace_rate_limit) + self.limiter: RateLimiter = RateLimiter(rate_limit, rate_limit_window) log.debug("initialized %r", self) + @staticmethod + def _key(service: Optional[str], env: Optional[str]): + """Compute a key with the same format used by the Datadog agent API.""" + return f"service:{service or ''},env:{env or ''}" + + def update_rate_by_service_sample_rates(self, rate_by_service: Dict[str, float]) -> None: + samplers: Dict[str, RateSampler] = {} + for key, sample_rate in rate_by_service.items(): + samplers[key] = RateSampler(sample_rate) + self._by_service_samplers = samplers + def __str__(self): rates = {key: sampler.sample_rate for key, sampler in self._by_service_samplers.items()} - return "{}(agent_rates={!r}, limiter={!r}, rules={!r})".format( - self.__class__.__name__, rates, self.limiter, self.rules + return "{}(agent_rates={!r}, limiter={!r}, rules={!r}), rate_limit_always_on={!r}".format( + self.__class__.__name__, + rates, + self.limiter, + self.rules, + self._rate_limit_always_on, ) __repr__ = __str__ @staticmethod - def _parse_rules_from_str(rules): - # type: (str) -> List[SamplingRule] + def _parse_rules_from_str(rules: str) -> List[SamplingRule]: sampling_rules = [] + json_rules = [] try: json_rules = json.loads(rules) except JSONDecodeError: @@ -290,47 +162,42 @@ def _parse_rules_from_str(rules): if config._raise: raise KeyError("No sample_rate provided for sampling rule: {}".format(json.dumps(rule))) continue - sample_rate = float(rule["sample_rate"]) - service = rule.get("service", SamplingRule.NO_RULE) - name = rule.get("name", SamplingRule.NO_RULE) - resource = rule.get("resource", SamplingRule.NO_RULE) - tags = rule.get("tags", SamplingRule.NO_RULE) - provenance = rule.get("provenance", "default") try: sampling_rule = SamplingRule( - sample_rate=sample_rate, - service=service, - name=name, - resource=resource, - tags=tags, - provenance=provenance, + sample_rate=float(rule["sample_rate"]), + service=rule.get("service", SamplingRule.NO_RULE), + name=rule.get("name", SamplingRule.NO_RULE), + resource=rule.get("resource", SamplingRule.NO_RULE), + tags=rule.get("tags", SamplingRule.NO_RULE), + provenance=rule.get("provenance", "default"), ) + sampling_rules.append(sampling_rule) except ValueError as e: if config._raise: raise ValueError("Error creating sampling rule {}: {}".format(json.dumps(rule), e)) - continue - sampling_rules.append(sampling_rule) # Sort the sampling_rules list using a lambda function as the key sampling_rules = sorted(sampling_rules, key=lambda rule: PROVENANCE_ORDER.index(rule.provenance)) return sampling_rules - def sample(self, span): + def sample(self, span: Span) -> bool: span.context._update_tags(span) - matched_rule = _get_highest_precedence_rule_matching(span, self.rules) - - sampler = self._default_sampler # type: BaseSampler - sample_rate = self.sample_rate + # Default sampling + agent_service_based = False + sampled = True + sample_rate = 1.0 if matched_rule: - # Client based sampling + # Rules based sampling (set via env_var or remote config) sampled = matched_rule.sample(span) sample_rate = matched_rule.sample_rate else: - # Agent based sampling - sampled, sampler = super(DatadogSampler, self)._make_sampling_decision(span) - if isinstance(sampler, RateSampler): - sample_rate = sampler.sample_rate + key = self._key(span.service, span.get_tag(ENV_KEY)) + if key in self._by_service_samplers: + # Agent service based sampling + agent_service_based = True + sampled = self._by_service_samplers[key].sample(span) + sample_rate = self._by_service_samplers[key].sample_rate if matched_rule or self._rate_limit_always_on: # Avoid rate limiting when trace sample rules and/or sample rates are NOT provided @@ -339,26 +206,28 @@ def sample(self, span): if sampled: sampled = self.limiter.is_allowed() span.set_metric(_SAMPLING_LIMIT_DECISION, self.limiter.effective_rate) + + sampling_mechanism = self._get_sampling_mechanism(matched_rule, agent_service_based) _set_sampling_tags( span, sampled, sample_rate, - self._choose_priority_category_with_rule(matched_rule, sampler), + sampling_mechanism, ) - return sampled - def _choose_priority_category_with_rule(self, rule, sampler): - # type: (Optional[SamplingRule], BaseSampler) -> str - if rule: - provenance = rule.provenance - if provenance == "customer": - return _PRIORITY_CATEGORY.RULE_CUSTOMER - if provenance == "dynamic": - return _PRIORITY_CATEGORY.RULE_DYNAMIC - return _PRIORITY_CATEGORY.RULE_DEF + def _get_sampling_mechanism(self, matched_rule: Optional[SamplingRule], agent_service_based: bool) -> int: + if matched_rule and matched_rule.provenance == "customer": + return SamplingMechanism.REMOTE_USER_TRACE_SAMPLING_RULE + elif matched_rule and matched_rule.provenance == "dynamic": + return SamplingMechanism.REMOTE_DYNAMIC_TRACE_SAMPLING_RULE + elif matched_rule: + return SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE elif self._rate_limit_always_on: # backwards compaitbiility for ASM, when the rate limit is always on (ASM standalone mode) # we want spans to be set to a MANUAL priority to avoid agent based sampling - return _PRIORITY_CATEGORY.USER - return super(DatadogSampler, self)._choose_priority_category(sampler) + return SamplingMechanism.MANUAL + elif agent_service_based: + return SamplingMechanism.AGENT_RATE_BY_SERVICE + else: + return SamplingMechanism.DEFAULT diff --git a/ddtrace/_trace/tracer.py b/ddtrace/_trace/tracer.py index 6a9500f1ea2..999ad3a40d2 100644 --- a/ddtrace/_trace/tracer.py +++ b/ddtrace/_trace/tracer.py @@ -25,8 +25,6 @@ from ddtrace._trace.processor import TraceTagsProcessor from ddtrace._trace.provider import BaseContextProvider from ddtrace._trace.provider import DefaultContextProvider -from ddtrace._trace.sampler import BasePrioritySampler -from ddtrace._trace.sampler import BaseSampler from ddtrace._trace.sampler import DatadogSampler from ddtrace._trace.span import Span from ddtrace.appsec._constants import APPSEC @@ -107,7 +105,7 @@ def _default_span_processors_factory( compute_stats_enabled: bool, single_span_sampling_rules: List[SpanSamplingRule], agent_url: str, - trace_sampler: BaseSampler, + trace_sampler: DatadogSampler, profiling_span_processor: EndpointCallCounterProcessor, ) -> Tuple[List[SpanProcessor], Optional[Any], List[SpanProcessor]]: # FIXME: type should be AppsecSpanProcessor but we have a cyclic import here @@ -230,7 +228,7 @@ def __init__( self.enabled = config._tracing_enabled self.context_provider = context_provider or DefaultContextProvider() # _user_sampler is the backup in case we need to revert from remote config to local - self._user_sampler: BaseSampler = DatadogSampler() + self._user_sampler = DatadogSampler() self._dogstatsd_url = agent.get_stats_url() if dogstatsd_url is None else dogstatsd_url if asm_config._apm_opt_out: self.enabled = False @@ -239,9 +237,9 @@ def __init__( # If ASM is enabled but tracing is disabled, # we need to set the rate limiting to 1 trace per minute # for the backend to consider the service as alive. - self._sampler: BaseSampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True) + self._sampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True) else: - self._sampler: BaseSampler = DatadogSampler() + self._sampler = DatadogSampler() self._compute_stats = config._trace_compute_stats self._agent_url: str = agent.get_trace_url() if url is None else url verify_url(self._agent_url) @@ -437,7 +435,7 @@ def _configure( port: Optional[int] = None, uds_path: Optional[str] = None, https: Optional[bool] = None, - sampler: Optional[BaseSampler] = None, + sampler: Optional[DatadogSampler] = None, context_provider: Optional[BaseContextProvider] = None, wrap_executor: Optional[Callable] = None, priority_sampling: Optional[bool] = None, @@ -478,16 +476,11 @@ def _configure( # Disable compute stats (neither agent or tracer should compute them) config._trace_compute_stats = False # Update the rate limiter to 1 trace per minute when tracing is disabled - if isinstance(sampler, DatadogSampler): - sampler._rate_limit_always_on = True # type: ignore[has-type] - sampler.limiter.rate_limit = 1 # type: ignore[has-type] - sampler.limiter.time_window = 60e9 # type: ignore[has-type] + if sampler is not None: + sampler._rate_limit_always_on = True + sampler.limiter.rate_limit = 1 + sampler.limiter.time_window = 60e9 else: - if sampler is not None: - log.warning( - "Overriding sampler: %s, a DatadogSampler must be used in ASM Standalone mode", - sampler.__class__, - ) sampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True) log.debug("ASM standalone mode is enabled, traces will be rate limited at 1 trace per minute") @@ -600,12 +593,11 @@ def _agent_response_callback(self, resp: AgentResponse) -> None: The agent can return updated sample rates for the priority sampler. """ try: - if isinstance(self._sampler, BasePrioritySampler): - self._sampler.update_rate_by_service_sample_rates( - resp.rate_by_service, - ) - except ValueError: - log.error("sample_rate is negative, cannot update the rate samplers") + self._sampler.update_rate_by_service_sample_rates( + resp.rate_by_service, + ) + except ValueError as e: + log.error("Failed to set agent service sample rates: %s", str(e)) def _generate_diagnostic_logs(self): if config._debug_mode or config._startup_logs_enabled: diff --git a/ddtrace/internal/constants.py b/ddtrace/internal/constants.py index c4255035c41..fb1f9b57180 100644 --- a/ddtrace/internal/constants.py +++ b/ddtrace/internal/constants.py @@ -82,24 +82,30 @@ DD_TRACE_BAGGAGE_MAX_BYTES = 8192 -class _PRIORITY_CATEGORY: - USER = "user" - RULE_DEF = "rule_default" - RULE_CUSTOMER = "rule_customer" - RULE_DYNAMIC = "rule_dynamic" - AUTO = "auto" - DEFAULT = "default" +class SamplingMechanism(object): + DEFAULT = 0 + AGENT_RATE_BY_SERVICE = 1 + REMOTE_RATE = 2 # not used, this mechanism is deprecated + LOCAL_USER_TRACE_SAMPLING_RULE = 3 + MANUAL = 4 + APPSEC = 5 + REMOTE_RATE_USER = 6 # not used, this mechanism is deprecated + REMOTE_RATE_DATADOG = 7 # not used, this mechanism is deprecated + SPAN_SAMPLING_RULE = 8 + OTLP_INGEST_PROBABILISTIC_SAMPLING = 9 # not used in ddtrace + DATA_JOBS_MONITORING = 10 # not used in ddtrace + REMOTE_USER_TRACE_SAMPLING_RULE = 11 + REMOTE_DYNAMIC_TRACE_SAMPLING_RULE = 12 -# intermediate mapping of priority categories to actual priority values -# used to simplify code that selects sampling priority based on many factors -_CATEGORY_TO_PRIORITIES = { - _PRIORITY_CATEGORY.USER: (USER_KEEP, USER_REJECT), - _PRIORITY_CATEGORY.RULE_DEF: (USER_KEEP, USER_REJECT), - _PRIORITY_CATEGORY.RULE_CUSTOMER: (USER_KEEP, USER_REJECT), - _PRIORITY_CATEGORY.RULE_DYNAMIC: (USER_KEEP, USER_REJECT), - _PRIORITY_CATEGORY.AUTO: (AUTO_KEEP, AUTO_REJECT), - _PRIORITY_CATEGORY.DEFAULT: (AUTO_KEEP, AUTO_REJECT), +SAMPLING_MECHANISM_TO_PRIORITIES = { + # TODO(munir): Update mapping to include single span sampling and appsec sampling mechaisms + SamplingMechanism.AGENT_RATE_BY_SERVICE: (AUTO_KEEP, AUTO_REJECT), + SamplingMechanism.DEFAULT: (AUTO_KEEP, AUTO_REJECT), + SamplingMechanism.MANUAL: (USER_KEEP, USER_REJECT), + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE: (USER_KEEP, USER_REJECT), + SamplingMechanism.REMOTE_USER_TRACE_SAMPLING_RULE: (USER_KEEP, USER_REJECT), + SamplingMechanism.REMOTE_DYNAMIC_TRACE_SAMPLING_RULE: (USER_KEEP, USER_REJECT), } _KEEP_PRIORITY_INDEX = 0 _REJECT_PRIORITY_INDEX = 1 diff --git a/ddtrace/internal/debug.py b/ddtrace/internal/debug.py index 2a285423e4c..c0bd3c1ecfc 100644 --- a/ddtrace/internal/debug.py +++ b/ddtrace/internal/debug.py @@ -10,7 +10,6 @@ from typing import Union # noqa:F401 import ddtrace -from ddtrace._trace.sampler import DatadogSampler from ddtrace.internal import agent from ddtrace.internal.packages import get_distributions from ddtrace.internal.utils.cache import callonce @@ -72,9 +71,7 @@ def collect(tracer): agent_url = "CUSTOM" agent_error = None - sampler_rules = None - if isinstance(tracer._sampler, DatadogSampler): - sampler_rules = [str(rule) for rule in tracer._sampler.rules] + sampler_rules = [str(rule) for rule in tracer._sampler.rules] is_venv = in_venv() diff --git a/ddtrace/internal/sampling.py b/ddtrace/internal/sampling.py index 40f82a388c2..e51cd8c9879 100644 --- a/ddtrace/internal/sampling.py +++ b/ddtrace/internal/sampling.py @@ -22,10 +22,11 @@ from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MAX_PER_SEC_NO_LIMIT from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MECHANISM from ddtrace.constants import _SINGLE_SPAN_SAMPLING_RATE -from ddtrace.internal.constants import _CATEGORY_TO_PRIORITIES from ddtrace.internal.constants import _KEEP_PRIORITY_INDEX from ddtrace.internal.constants import _REJECT_PRIORITY_INDEX from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY +from ddtrace.internal.constants import SAMPLING_MECHANISM_TO_PRIORITIES +from ddtrace.internal.constants import SamplingMechanism from ddtrace.internal.glob_matching import GlobMatcher from ddtrace.internal.logger import get_logger @@ -49,20 +50,6 @@ MAX_SPAN_ID = 2**64 -class SamplingMechanism(object): - DEFAULT = 0 - AGENT_RATE = 1 - REMOTE_RATE = 2 - TRACE_SAMPLING_RULE = 3 - MANUAL = 4 - APPSEC = 5 - REMOTE_RATE_USER = 6 - REMOTE_RATE_DATADOG = 7 - SPAN_SAMPLING_RULE = 8 - REMOTE_USER_RULE = 11 - REMOTE_DYNAMIC_RULE = 12 - - class PriorityCategory(object): DEFAULT = "default" AUTO = "auto" @@ -278,30 +265,23 @@ def is_single_span_sampled(span): return span.get_metric(_SINGLE_SPAN_SAMPLING_MECHANISM) == SamplingMechanism.SPAN_SAMPLING_RULE -def _set_sampling_tags(span, sampled, sample_rate, priority_category): - # type: (Span, bool, float, str) -> None - mechanism = SamplingMechanism.TRACE_SAMPLING_RULE - if priority_category == PriorityCategory.RULE_DEFAULT: - span.set_metric(_SAMPLING_RULE_DECISION, sample_rate) - if priority_category == PriorityCategory.RULE_CUSTOMER: - span.set_metric(_SAMPLING_RULE_DECISION, sample_rate) - mechanism = SamplingMechanism.REMOTE_USER_RULE - if priority_category == PriorityCategory.RULE_DYNAMIC: +def _set_sampling_tags(span, sampled, sample_rate, mechanism): + # type: (Span, bool, float, int) -> None + # Set the sampling mechanism + set_sampling_decision_maker(span.context, mechanism) + # Set the sampling psr rate + if mechanism in ( + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, + SamplingMechanism.REMOTE_USER_TRACE_SAMPLING_RULE, + SamplingMechanism.REMOTE_DYNAMIC_TRACE_SAMPLING_RULE, + ): span.set_metric(_SAMPLING_RULE_DECISION, sample_rate) - mechanism = SamplingMechanism.REMOTE_DYNAMIC_RULE - elif priority_category == PriorityCategory.DEFAULT: - mechanism = SamplingMechanism.DEFAULT - elif priority_category == PriorityCategory.AUTO: - mechanism = SamplingMechanism.AGENT_RATE + elif mechanism == SamplingMechanism.AGENT_RATE_BY_SERVICE: span.set_metric(_SAMPLING_AGENT_DECISION, sample_rate) - priorities = _CATEGORY_TO_PRIORITIES[priority_category] - _set_priority(span, priorities[_KEEP_PRIORITY_INDEX] if sampled else priorities[_REJECT_PRIORITY_INDEX]) - set_sampling_decision_maker(span.context, mechanism) - - -def _set_priority(span, priority): - # type: (Span, int) -> None - span.context.sampling_priority = priority + # Set the sampling priority + priorities = SAMPLING_MECHANISM_TO_PRIORITIES[mechanism] + priority_index = _KEEP_PRIORITY_INDEX if sampled else _REJECT_PRIORITY_INDEX + span.context.sampling_priority = priorities[priority_index] def _get_highest_precedence_rule_matching(span, rules): diff --git a/ddtrace/propagation/http.py b/ddtrace/propagation/http.py index a6efdb70f3e..c74e7f30f07 100644 --- a/ddtrace/propagation/http.py +++ b/ddtrace/propagation/http.py @@ -347,7 +347,7 @@ def _extract(headers): meta = {} if not meta.get(SAMPLING_DECISION_TRACE_TAG_KEY): - meta[SAMPLING_DECISION_TRACE_TAG_KEY] = f"-{SamplingMechanism.TRACE_SAMPLING_RULE}" + meta[SAMPLING_DECISION_TRACE_TAG_KEY] = f"-{SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE}" # Try to parse values into their expected types try: diff --git a/tests/integration/test_debug.py b/tests/integration/test_debug.py index b2f973b1a48..51386c1433d 100644 --- a/tests/integration/test_debug.py +++ b/tests/integration/test_debug.py @@ -317,18 +317,6 @@ def flush_queue(self) -> None: assert info.get("agent_url") == "CUSTOM" -@pytest.mark.subprocess() -def test_different_samplers(): - import ddtrace - from ddtrace.internal import debug - from ddtrace.trace import tracer - - tracer._configure(sampler=ddtrace._trace.sampler.RateSampler()) - info = debug.collect(tracer) - - assert info.get("sampler_type") == "RateSampler" - - @pytest.mark.subprocess() def test_startup_logs_sampling_rules(): import ddtrace diff --git a/tests/integration/test_sampling.py b/tests/integration/test_sampling.py index 053064c8cd8..14f4f35d0b7 100644 --- a/tests/integration/test_sampling.py +++ b/tests/integration/test_sampling.py @@ -1,7 +1,6 @@ import pytest from ddtrace._trace.sampler import DatadogSampler -from ddtrace._trace.sampler import RateSampler from ddtrace._trace.sampler import SamplingRule from ddtrace.constants import MANUAL_DROP_KEY from ddtrace.constants import MANUAL_KEEP_KEY @@ -101,14 +100,6 @@ def test_sampling_with_default_sample_rate_1_and_manual_keep(writer, tracer): span.set_tag(MANUAL_KEEP_KEY) -@snapshot_parametrized_with_writers -def test_sampling_with_rate_sampler_with_tiny_rate(writer, tracer): - sampler = RateSampler(0.0000000001) - tracer._configure(sampler=sampler, writer=writer) - with tracer.trace("trace8"): - tracer.trace("child").finish() - - @snapshot_parametrized_with_writers def test_sampling_with_sample_rate_1_and_rate_limit_0(writer, tracer): sampler = DatadogSampler(default_sample_rate=1, rate_limit=0) diff --git a/tests/tracer/test_sampler.py b/tests/tracer/test_sampler.py index f8391c5065c..1772c32d56c 100644 --- a/tests/tracer/test_sampler.py +++ b/tests/tracer/test_sampler.py @@ -6,7 +6,6 @@ import pytest from ddtrace._trace.sampler import DatadogSampler -from ddtrace._trace.sampler import RateByServiceSampler from ddtrace._trace.sampler import RateSampler from ddtrace._trace.sampling_rule import SamplingRule from ddtrace.constants import _SAMPLING_AGENT_DECISION @@ -17,6 +16,7 @@ from ddtrace.constants import AUTO_REJECT from ddtrace.constants import USER_KEEP from ddtrace.constants import USER_REJECT +from ddtrace.internal.constants import DEFAULT_SAMPLING_RATE_LIMIT from ddtrace.internal.rate_limiter import RateLimiter from ddtrace.internal.sampling import SAMPLING_DECISION_TRACE_TAG_KEY from ddtrace.internal.sampling import SamplingMechanism @@ -24,7 +24,6 @@ from ddtrace.trace import Context from ddtrace.trace import Span -from ..subprocesstest import run_in_subprocess from ..utils import DummyTracer from ..utils import override_global_config @@ -88,36 +87,6 @@ def test_set_sample_rate(self): sampler.set_sample_rate(str(rate)) assert sampler.sample_rate == float(rate), "The rate can be set as a string" - def test_sample_rate_deviation(self): - for sample_rate in [0.1, 0.25, 0.5, 1]: - tracer = DummyTracer() - - # Since RateSampler does not set the sampling priority on a span, we will use a DatadogSampler - # with rate limiting disabled. - tracer._sampler = DatadogSampler(default_sample_rate=sample_rate, rate_limit=-1) - - iterations = int(1e4 / sample_rate) - - for i in range(iterations): - span = tracer.trace(str(i)) - span.finish() - - samples = tracer.pop() - # non sampled spans do not have sample rate applied - sampled_spans = [s for s in samples if s.context.sampling_priority > 0] - if sample_rate != 1: - assert len(sampled_spans) != len(samples) - else: - assert len(sampled_spans) == len(samples) - - deviation = abs(len(sampled_spans) - (iterations * sample_rate)) / (iterations * sample_rate) - assert ( - deviation < 0.05 - ), "Actual sample rate should be within 5 percent of set sample " "rate (actual: %f, set: %f)" % ( - deviation, - sample_rate, - ) - def test_deterministic_behavior(self): """Test that for a given trace ID, the result is always the same""" tracer = DummyTracer() @@ -155,71 +124,27 @@ def test_sample_rate_0_does_not_reset_to_1(self): # RateByServiceSamplerTest Cases def test_default_key(): - assert ( - "service:,env:" == RateByServiceSampler._default_key - ), "default key should correspond to no service and no env" + assert "service:,env:" == DatadogSampler._default_key, "default key should correspond to no service and no env" def test_key(): - assert ( - RateByServiceSampler._default_key == RateByServiceSampler._key() - ), "_key() with no arguments returns the default key" - assert "service:mcnulty,env:" == RateByServiceSampler._key( - service="mcnulty" + assert DatadogSampler._default_key == DatadogSampler._key( + None, None + ), "_key() with None arguments returns the default key" + assert "service:mcnulty,env:" == DatadogSampler._key( + service="mcnulty", env=None ), "_key call with service name returns expected result" - assert "service:,env:test" == RateByServiceSampler._key( - env="test" + assert "service:,env:test" == DatadogSampler._key( + None, env="test" ), "_key call with env name returns expected result" - assert "service:mcnulty,env:test" == RateByServiceSampler._key( + assert "service:mcnulty,env:test" == DatadogSampler._key( service="mcnulty", env="test" ), "_key call with service and env name returns expected result" - assert "service:mcnulty,env:test" == RateByServiceSampler._key( + assert "service:mcnulty,env:test" == DatadogSampler._key( "mcnulty", "test" ), "_key call with service and env name as positional args returns expected result" -@run_in_subprocess(env=dict(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED="true")) -def test_sample_rate_deviation_128bit_trace_id(): - _test_sample_rate_deviation() - - -@run_in_subprocess(env=dict(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED="false", DD_SERVICE="my-svc")) -def test_sample_rate_deviation_64bit_trace_id(): - _test_sample_rate_deviation() - - -def _test_sample_rate_deviation(): - for sample_rate in [0.1, 0.25, 0.5, 1]: - tracer = DummyTracer() - tracer._configure(sampler=RateByServiceSampler()) - tracer._sampler.set_sample_rate(sample_rate) - - iterations = int(1e4 / sample_rate) - - for i in range(iterations): - span = tracer.trace(str(i)) - span.finish() - - samples = tracer.pop() - samples_with_high_priority = 0 - for sample in samples: - sample_priority = sample.context.sampling_priority - samples_with_high_priority += int(bool(sample_priority > 0)) - assert_sampling_decision_tags( - sample, - agent=sample_rate, - trace_tag="-{}".format(SamplingMechanism.AGENT_RATE), - ) - - deviation = abs(samples_with_high_priority - (iterations * sample_rate)) / (iterations * sample_rate) - assert ( - deviation < 0.05 - ), "Actual sample rate should be within 5 percent of set sample " "rate (actual: %f, set: %f)" % ( - deviation, - sample_rate, - ) - - @pytest.mark.parametrize( "sample_rate,expectation", [ @@ -605,14 +530,14 @@ def test_datadog_sampler_init(): sampler.limiter, RateLimiter ), "DatadogSampler initialized with no arguments should hold a RateLimiter" assert ( - sampler.limiter.rate_limit == DatadogSampler.DEFAULT_RATE_LIMIT + sampler.limiter.rate_limit == DEFAULT_SAMPLING_RATE_LIMIT ), "DatadogSampler initialized with no arguments should hold a RateLimiter with the default limit" rule = SamplingRule(sample_rate=1) sampler = DatadogSampler(rules=[rule]) assert sampler.rules == [rule], "DatadogSampler initialized with a rule should hold that rule" assert ( - sampler.limiter.rate_limit == DatadogSampler.DEFAULT_RATE_LIMIT + sampler.limiter.rate_limit == DEFAULT_SAMPLING_RATE_LIMIT ), "DatadogSampler initialized with a rule should hold the default rate limit" sampler = DatadogSampler(rate_limit=10) @@ -620,7 +545,7 @@ def test_datadog_sampler_init(): sampler = DatadogSampler(default_sample_rate=0.5) assert ( - sampler.limiter.rate_limit == DatadogSampler.DEFAULT_RATE_LIMIT + sampler.limiter.rate_limit == DEFAULT_SAMPLING_RATE_LIMIT ), "DatadogSampler initialized with default_sample_rate should hold the default rate limit" assert sampler.rules == [ SamplingRule(sample_rate=0.5) @@ -721,7 +646,7 @@ def sample(self, span): ], ), USER_KEEP, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 1.0, None, ), @@ -735,7 +660,7 @@ def sample(self, span): ], ), USER_KEEP, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0.5, None, ), @@ -749,7 +674,7 @@ def sample(self, span): ], ), USER_KEEP, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0.5, None, ), @@ -763,7 +688,7 @@ def sample(self, span): ], ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0.5, None, ), @@ -772,7 +697,7 @@ def sample(self, span): default_sample_rate=0, ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0, None, ), @@ -782,7 +707,7 @@ def sample(self, span): rate_limit=0, ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 1.0, 0.0, ), @@ -793,7 +718,7 @@ def sample(self, span): ], ), USER_KEEP, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 1, None, ), @@ -805,7 +730,7 @@ def sample(self, span): rate_limit=0, ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 1, None, ), @@ -814,7 +739,7 @@ def sample(self, span): rules=[SamplingRule(sample_rate=0, name="span")], ), USER_REJECT, - SamplingMechanism.TRACE_SAMPLING_RULE, + SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, 0, None, ), @@ -850,14 +775,14 @@ def test_datadog_sampler_tracer_child(dummy_tracer): rule=1.0, limit=None, sampling_priority=USER_KEEP, - trace_tag="-{}".format(SamplingMechanism.TRACE_SAMPLING_RULE), + trace_tag="-{}".format(SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE), ) assert_sampling_decision_tags( spans[1], agent=None, rule=None, limit=None, - trace_tag="-{}".format(SamplingMechanism.TRACE_SAMPLING_RULE), + trace_tag="-{}".format(SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE), ) @@ -873,11 +798,11 @@ def test_datadog_sampler_tracer_start_span(dummy_tracer): rule=1.0, limit=None, sampling_priority=USER_KEEP, - trace_tag="-{}".format(SamplingMechanism.TRACE_SAMPLING_RULE), + trace_tag="-{}".format(SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE), ) -@pytest.mark.parametrize("priority_sampler", [DatadogSampler(), RateByServiceSampler()]) +@pytest.mark.parametrize("priority_sampler", [DatadogSampler()]) def test_update_rate_by_service_sample_rates(priority_sampler): cases = [ { @@ -921,8 +846,8 @@ def context(): @pytest.mark.parametrize( "sampling_mechanism,expected", [ - (SamplingMechanism.AGENT_RATE, "-1"), - (SamplingMechanism.TRACE_SAMPLING_RULE, "-3"), + (SamplingMechanism.AGENT_RATE_BY_SERVICE, "-1"), + (SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE, "-3"), (SamplingMechanism.DEFAULT, "-0"), (SamplingMechanism.MANUAL, "-4"), (SamplingMechanism.DEFAULT, "-0"),