Skip to content

Commit

Permalink
fix(internal): revert rust rate limiter [backport 2.10] (#10229)
Browse files Browse the repository at this point in the history
Rust, just revert back to the original pure python rate limiter.

The impact here is fairly low, the Rust rate limiter is much faster, but
this code isn't the slow part of a hot path, better to be safe than
fast.

Fixes #10002

- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance

policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
brettlangdon authored Aug 14, 2024
1 parent 5b2d1f8 commit 29d24e8
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 236 deletions.
2 changes: 1 addition & 1 deletion ddtrace/internal/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def _on_jsonify_context_started_flask(ctx):
The names of these events follow the pattern ``context.[started|ended].<context_name>``.
"""

from contextlib import contextmanager
import logging
import sys
Expand All @@ -115,7 +116,6 @@ def _on_jsonify_context_started_flask(ctx):

from ..utils.deprecations import DDTraceDeprecationWarning
from . import event_hub # noqa:F401
from ._core import RateLimiter # noqa:F401
from .event_hub import EventResultDict # noqa:F401
from .event_hub import dispatch
from .event_hub import dispatch_with_results # noqa:F401
Expand Down
49 changes: 0 additions & 49 deletions ddtrace/internal/core/_core.pyi
Original file line number Diff line number Diff line change
@@ -1,49 +0,0 @@
import typing

class RateLimiter:
"""
A token bucket rate limiter implementation
"""

rate_limit: int
time_window: float
effective_rate: float
current_window_rate: float
prev_window_rate: typing.Optional[float]
tokens: float
max_tokens: float
tokens_allowed: int
tokens_total: int
last_update_ns: float
current_window_ns: float

def __init__(self, rate_limit: int, time_window: float = 1e9):
"""
Constructor for RateLimiter
:param rate_limit: The rate limit to apply for number of requests per second.
rate limit > 0 max number of requests to allow per second,
rate limit == 0 to disallow all requests,
rate limit < 0 to allow all requests
:type rate_limit: :obj:`int`
:param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second.
:type time_window: :obj:`float`
"""
def is_allowed(self, timestamp_ns: typing.Optional[int] = None) -> bool:
"""
Check whether the current request is allowed or not
This method will also reduce the number of available tokens by 1
:param int timestamp_ns: timestamp in nanoseconds for the current request. [deprecated]
:returns: Whether the current request is allowed or not
:rtype: :obj:`bool`
"""
def _is_allowed(self, timestamp_ns: int) -> bool:
"""
Internal method to check whether the current request is allowed or not
:param int timestamp_ns: timestamp in nanoseconds for the current request.
:returns: Whether the current request is allowed or not
:rtype: :obj:`bool`
"""
156 changes: 153 additions & 3 deletions ddtrace/internal/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,174 @@

from ..internal import compat
from ..internal.constants import DEFAULT_SAMPLING_RATE_LIMIT
from .core import RateLimiter as _RateLimiter


class RateLimiter(_RateLimiter):
class RateLimiter(object):
"""
A token bucket rate limiter implementation
"""

__slots__ = (
"_lock",
"current_window_ns",
"time_window",
"last_update_ns",
"max_tokens",
"prev_window_rate",
"rate_limit",
"tokens",
"tokens_allowed",
"tokens_total",
)

def __init__(self, rate_limit: int, time_window: float = 1e9):
"""
Constructor for RateLimiter
:param rate_limit: The rate limit to apply for number of requests per second.
rate limit > 0 max number of requests to allow per second,
rate limit == 0 to disallow all requests,
rate limit < 0 to allow all requests
:type rate_limit: :obj:`int`
:param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second.
:type time_window: :obj:`float`
"""
self.rate_limit = rate_limit
self.time_window = time_window
self.tokens = rate_limit # type: float
self.max_tokens = rate_limit

self.last_update_ns = compat.monotonic_ns()

self.current_window_ns = 0 # type: float
self.tokens_allowed = 0
self.tokens_total = 0
self.prev_window_rate = None # type: Optional[float]

self._lock = threading.Lock()

@property
def _has_been_configured(self):
return self.rate_limit != DEFAULT_SAMPLING_RATE_LIMIT

def is_allowed(self, timestamp_ns: Optional[int] = None) -> bool:
"""
Check whether the current request is allowed or not
This method will also reduce the number of available tokens by 1
:param int timestamp_ns: timestamp in nanoseconds for the current request.
:returns: Whether the current request is allowed or not
:rtype: :obj:`bool`
"""
if timestamp_ns is not None:
deprecate(
"The `timestamp_ns` parameter is deprecated and will be removed in a future version."
"Ratelimiter will use the current time.",
category=DDTraceDeprecationWarning,
)

# rate limits are tested and mocked in pytest so we need to compute the timestamp here
# (or move the unit tests to rust)
return self._is_allowed(compat.monotonic_ns())
timestamp_ns = timestamp_ns or compat.monotonic_ns()
allowed = self._is_allowed(timestamp_ns)
# Update counts used to determine effective rate
self._update_rate_counts(allowed, timestamp_ns)
return allowed

def _update_rate_counts(self, allowed: bool, timestamp_ns: int) -> None:
# No tokens have been seen yet, start a new window
if not self.current_window_ns:
self.current_window_ns = timestamp_ns

# If more time than the configured time window
# has past since last window, reset
# DEV: We are comparing nanoseconds, so 1e9 is 1 second
elif timestamp_ns - self.current_window_ns >= self.time_window:
# Store previous window's rate to average with current for `.effective_rate`
self.prev_window_rate = self._current_window_rate()
self.tokens_allowed = 0
self.tokens_total = 0
self.current_window_ns = timestamp_ns

# Keep track of total tokens seen vs allowed
if allowed:
self.tokens_allowed += 1
self.tokens_total += 1

def _is_allowed(self, timestamp_ns: int) -> bool:
# Rate limit of 0 blocks everything
if self.rate_limit == 0:
return False

# Negative rate limit disables rate limiting
elif self.rate_limit < 0:
return True

# Lock, we need this to be thread safe, it should be shared by all threads
with self._lock:
self._replenish(timestamp_ns)

if self.tokens >= 1:
self.tokens -= 1
return True

return False

def _replenish(self, timestamp_ns: int) -> None:
try:
# If we are at the max, we do not need to add any more
if self.tokens == self.max_tokens:
return

# Add more available tokens based on how much time has passed
# DEV: We store as nanoseconds, convert to seconds
elapsed = (timestamp_ns - self.last_update_ns) / self.time_window
finally:
# always update the timestamp
# we can't update at the beginning of the function, since if we did, our calculation for
# elapsed would be incorrect
self.last_update_ns = timestamp_ns

# Update the number of available tokens, but ensure we do not exceed the max
self.tokens = min(
self.max_tokens,
self.tokens + (elapsed * self.rate_limit),
)

def _current_window_rate(self) -> float:
# No tokens have been seen, effectively 100% sample rate
# DEV: This is to avoid division by zero error
if not self.tokens_total:
return 1.0

# Get rate of tokens allowed
return self.tokens_allowed / self.tokens_total

@property
def effective_rate(self) -> float:
"""
Return the effective sample rate of this rate limiter
:returns: Effective sample rate value 0.0 <= rate <= 1.0
:rtype: :obj:`float``
"""
# If we have not had a previous window yet, return current rate
if self.prev_window_rate is None:
return self._current_window_rate()

return (self._current_window_rate() + self.prev_window_rate) / 2.0

def __repr__(self):
return "{}(rate_limit={!r}, tokens={!r}, last_update_ns={!r}, effective_rate={!r})".format(
self.__class__.__name__,
self.rate_limit,
self.tokens,
self.last_update_ns,
self.effective_rate,
)

__str__ = __repr__


class RateLimitExceeded(Exception):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
internal: Fix ``Already mutably borrowed`` error by reverting back to pure-python rate limiter.
5 changes: 1 addition & 4 deletions src/core/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
mod rate_limiter;

use pyo3::prelude::*;

#[pymodule]
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<rate_limiter::RateLimiterPy>()?;
fn _core(_: &Bound<'_, PyModule>) -> PyResult<()> {
Ok(())
}
Loading

0 comments on commit 29d24e8

Please sign in to comment.