Skip to content

Commit

Permalink
auth-server: rate-limiter: Rate limit quote endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
joeykraut committed Jan 21, 2025
1 parent bd8bf0e commit fab6c57
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 35 deletions.
3 changes: 3 additions & 0 deletions auth/auth-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pub struct Cli {
/// The bundle rate limit in bundles per minute
#[arg(long, env = "BUNDLE_RATE_LIMIT", default_value = "4")]
pub bundle_rate_limit: u64,
/// The quote rate limit in quotes per minute
#[arg(long, env = "QUOTE_RATE_LIMIT", default_value = "100")]
pub quote_rate_limit: u64,
/// The path to the file containing token remaps for the given chain
///
/// See https://github.com/renegade-fi/token-mappings for more information on the format of this file
Expand Down
7 changes: 4 additions & 3 deletions auth/auth-server/src/server/handle_external_match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_desc = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_quote_rate_limit(key_desc.clone()).await?;

// Send the request to the relayer
let resp =
Expand All @@ -66,7 +67,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_desc = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_rate_limit(key_desc.clone()).await?;
self.check_bundle_rate_limit(key_desc.clone()).await?;

// Send the request to the relayer
let resp =
Expand Down Expand Up @@ -96,7 +97,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_description = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_rate_limit(key_description.clone()).await?;
self.check_bundle_rate_limit(key_description.clone()).await?;

// Send the request to the relayer
let resp =
Expand Down Expand Up @@ -187,7 +188,7 @@ impl Server {
// If the bundle settles, increase the API user's a rate limit token balance
let did_settle = await_settlement(&match_resp.match_bundle, &self.arbitrum_client).await?;
if did_settle {
self.add_rate_limit_token(key.clone()).await;
self.add_bundle_rate_limit_token(key.clone()).await;
}

// Record metrics
Expand Down
26 changes: 18 additions & 8 deletions auth/auth-server/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use http::{HeaderMap, Method, Response};
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use rand::Rng;
use rate_limiter::BundleRateLimiter;
use rate_limiter::AuthServerRateLimiter;
use renegade_api::auth::add_expiring_auth_to_headers;
use renegade_arbitrum_client::client::ArbitrumClient;
use renegade_common::types::wallet::keychain::HmacKey;
Expand Down Expand Up @@ -72,7 +72,7 @@ pub struct Server {
/// The Arbitrum client
pub arbitrum_client: ArbitrumClient,
/// The rate limiter
pub rate_limiter: BundleRateLimiter,
pub rate_limiter: AuthServerRateLimiter,
/// The quote metrics recorder
pub quote_metrics: Option<Arc<QuoteComparisonHandler>>,
/// Rate at which to sample metrics (0.0 to 1.0)
Expand All @@ -95,7 +95,8 @@ impl Server {
let relayer_admin_key =
HmacKey::from_base64_string(&args.relayer_admin_key).map_err(AuthServerError::setup)?;

let rate_limiter = BundleRateLimiter::new(args.bundle_rate_limit);
let rate_limiter =
AuthServerRateLimiter::new(args.quote_rate_limit, args.bundle_rate_limit);

// Setup the quote metrics recorder and sources if enabled
let quote_metrics = if args.enable_quote_comparison {
Expand Down Expand Up @@ -182,18 +183,27 @@ impl Server {

// --- Rate Limiting --- //

/// Check the rate limiter
pub async fn check_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check(key_description.clone()).await {
/// Check the quote rate limiter
pub async fn check_quote_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check_quote_token(key_description.clone()).await {
warn!("Rate limit exceeded for key: {key_description}");
return Err(ApiError::TooManyRequests);
}
Ok(())
}

/// Check the bundle rate limiter
pub async fn check_bundle_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check_bundle_token(key_description.clone()).await {
warn!("Rate limit exceeded for key: {key_description}");
return Err(ApiError::TooManyRequests);
}
Ok(())
}

/// Increment the token balance for a given API user
pub async fn add_rate_limit_token(&self, key_description: String) {
self.rate_limiter.add_token(key_description).await;
pub async fn add_bundle_rate_limit_token(&self, key_description: String) {
self.rate_limiter.add_bundle_token(key_description).await;
}

// --- Caching --- //
Expand Down
91 changes: 67 additions & 24 deletions auth/auth-server/src/server/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
//! A rate limiter for the server
//!
//! We rate limit on two different token schedules:
//! - Quote tokens: These are used for quote requests and typically have a high
//! max tokens value. A quote is purely informational, and therefore does not
//! require active liquidity.
//! - Bundle tokens: These are used for bundle requests and typically have a low
//! max tokens value. A bundle indicates an intent to trade, and therefore
//! requires active liquidity.
//!
//! The unit which we rate limit is number of inflight bundles. Therefore, there
//! are two ways for the token bucket to refill:
//! - Wait for the next refill
Expand All @@ -14,54 +22,89 @@ use tokio::sync::Mutex;

/// A type alias for a per-user rate limiter
type BucketMap = HashMap<String, Ratelimiter>;
/// A type alias for a shared bucket map
type SharedBucketMap = Arc<Mutex<BucketMap>>;

/// One minute duration
const ONE_MINUTE: Duration = Duration::from_secs(60);

/// The bundle rate limiter
#[derive(Clone)]
pub struct BundleRateLimiter {
/// The number of bundles allowed per minute
rate_limit: u64,
/// A per-user rate limiter
bucket_map: Arc<Mutex<BucketMap>>,
pub struct AuthServerRateLimiter {
/// The quote rate limiter
quote_rate_limiter: UserRateLimiter,
/// The bundle rate limiter
bundle_rate_limiter: UserRateLimiter,
}

impl BundleRateLimiter {
impl AuthServerRateLimiter {
/// Create a new bundle rate limiter
pub fn new(quote_rate_limit: u64, bundle_rate_limit: u64) -> Self {
Self {
quote_rate_limiter: UserRateLimiter::new(quote_rate_limit),
bundle_rate_limiter: UserRateLimiter::new(bundle_rate_limit),
}
}

/// Consume a quote token from bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
pub async fn check_quote_token(&self, user_id: String) -> bool {
self.quote_rate_limiter.check(user_id).await
}

/// Consume a bundle token from bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
pub async fn check_bundle_token(&self, user_id: String) -> bool {
self.bundle_rate_limiter.check(user_id).await
}

/// Increment the number of tokens available to a given user
#[allow(unused_must_use)]
pub async fn add_bundle_token(&self, user_id: String) {
self.bundle_rate_limiter.add_token(user_id).await;
}
}

/// A per user token bucket rate limiter
#[derive(Clone)]
pub struct UserRateLimiter {
/// The number of tokens allowed per minute
rate_limit: u64,
/// The token buckets in a per-user map
buckets: SharedBucketMap,
}

impl UserRateLimiter {
/// Create a new user rate limiter
pub fn new(rate_limit: u64) -> Self {
Self { rate_limit, bucket_map: Arc::new(Mutex::new(HashMap::new())) }
Self { rate_limit, buckets: Arc::new(Mutex::new(HashMap::new())) }
}

/// Create a new rate limiter
fn new_rate_limiter(&self) -> Ratelimiter {
Ratelimiter::builder(self.rate_limit, ONE_MINUTE)
.initial_available(self.rate_limit)
.max_tokens(self.rate_limit)
fn new_rate_limiter(&self, rate_limit: u64) -> Ratelimiter {
Ratelimiter::builder(rate_limit, ONE_MINUTE)
.initial_available(rate_limit)
.max_tokens(rate_limit)
.build()
.expect("invalid rate limit configuration")
}

/// Consume a token from bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
/// Check the rate limiter
pub async fn check(&self, user_id: String) -> bool {
let mut map = self.bucket_map.lock().await;
let entry = map.entry(user_id).or_insert_with(|| self.new_rate_limiter());

let mut map = self.buckets.lock().await;
let entry = map.entry(user_id).or_insert_with(|| self.new_rate_limiter(self.rate_limit));
entry.try_wait().is_ok()
}

/// Increment the number of tokens available to a given user
#[allow(unused_must_use)]
pub async fn add_token(&self, user_id: String) {
let mut map = self.bucket_map.lock().await;
let entry = map.entry(user_id).or_insert_with(|| self.new_rate_limiter());

// Set the available tokens
// The underlying rate limiter will error if this exceeds the configured
// maximum, we ignore this error
let mut map = self.buckets.lock().await;
let entry = map.entry(user_id).or_insert_with(|| self.new_rate_limiter(self.rate_limit));
let available = entry.available();
entry.set_available(available + 1);
}
Expand Down

0 comments on commit fab6c57

Please sign in to comment.