Skip to content

Commit

Permalink
auth-server: rate-limiter: Rate limit quote endpoint (#95)
Browse files Browse the repository at this point in the history
* auth-server: fix clippy lints

* auth-server: rate-limiter: Rate limit quote endpoint
  • Loading branch information
joeykraut authored Jan 21, 2025
1 parent f4feb0e commit 2f05f59
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 24 deletions.
3 changes: 3 additions & 0 deletions auth/auth-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pub struct Cli {
/// The bundle rate limit in bundles per minute
#[arg(long, env = "BUNDLE_RATE_LIMIT", default_value = "4")]
pub bundle_rate_limit: u64,
/// The quote rate limit in quotes per minute
#[arg(long, env = "QUOTE_RATE_LIMIT", default_value = "100")]
pub quote_rate_limit: u64,
/// The path to the file containing token remaps for the given chain
///
/// See https://github.com/renegade-fi/token-mappings for more information on the format of this file
Expand Down
7 changes: 4 additions & 3 deletions auth/auth-server/src/server/handle_external_match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_desc = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_quote_rate_limit(key_desc.clone()).await?;

// Send the request to the relayer
let resp =
Expand All @@ -66,7 +67,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_desc = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_rate_limit(key_desc.clone()).await?;
self.check_bundle_rate_limit(key_desc.clone()).await?;

// Send the request to the relayer
let resp =
Expand Down Expand Up @@ -96,7 +97,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_description = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_rate_limit(key_description.clone()).await?;
self.check_bundle_rate_limit(key_description.clone()).await?;

// Send the request to the relayer
let resp =
Expand Down Expand Up @@ -187,7 +188,7 @@ impl Server {
// If the bundle settles, increase the API user's a rate limit token balance
let did_settle = await_settlement(&match_resp.match_bundle, &self.arbitrum_client).await?;
if did_settle {
self.add_rate_limit_token(key.clone()).await;
self.add_bundle_rate_limit_token(key.clone()).await;
}

// Record metrics
Expand Down
28 changes: 19 additions & 9 deletions auth/auth-server/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use http::{HeaderMap, Method, Response};
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use rand::Rng;
use rate_limiter::BundleRateLimiter;
use rate_limiter::AuthServerRateLimiter;
use renegade_api::auth::add_expiring_auth_to_headers;
use renegade_arbitrum_client::client::ArbitrumClient;
use renegade_common::types::wallet::keychain::HmacKey;
Expand Down Expand Up @@ -72,7 +72,7 @@ pub struct Server {
/// The Arbitrum client
pub arbitrum_client: ArbitrumClient,
/// The rate limiter
pub rate_limiter: BundleRateLimiter,
pub rate_limiter: AuthServerRateLimiter,
/// The quote metrics recorder
pub quote_metrics: Option<Arc<QuoteComparisonHandler>>,
/// Rate at which to sample metrics (0.0 to 1.0)
Expand All @@ -95,7 +95,8 @@ impl Server {
let relayer_admin_key =
HmacKey::from_base64_string(&args.relayer_admin_key).map_err(AuthServerError::setup)?;

let rate_limiter = BundleRateLimiter::new(args.bundle_rate_limit);
let rate_limiter =
AuthServerRateLimiter::new(args.quote_rate_limit, args.bundle_rate_limit);

// Setup the quote metrics recorder and sources if enabled
let quote_metrics = if args.enable_quote_comparison {
Expand Down Expand Up @@ -182,18 +183,27 @@ impl Server {

// --- Rate Limiting --- //

/// Check the rate limiter
pub async fn check_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check(key_description.clone()).await {
warn!("Rate limit exceeded for key: {key_description}");
/// Check the quote rate limiter
pub async fn check_quote_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check_quote_token(key_description.clone()).await {
warn!("Quote rate limit exceeded for key: {key_description}");
return Err(ApiError::TooManyRequests);
}
Ok(())
}

/// Check the bundle rate limiter
pub async fn check_bundle_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check_bundle_token(key_description.clone()).await {
warn!("Bundle rate limit exceeded for key: {key_description}");
return Err(ApiError::TooManyRequests);
}
Ok(())
}

/// Increment the token balance for a given API user
pub async fn add_rate_limit_token(&self, key_description: String) {
self.rate_limiter.add_token(key_description).await;
pub async fn add_bundle_rate_limit_token(&self, key_description: String) {
self.rate_limiter.add_bundle_token(key_description).await;
}

// --- Caching --- //
Expand Down
72 changes: 61 additions & 11 deletions auth/auth-server/src/server/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
//! A rate limiter for the server
//!
//! We rate limit on two different token schedules:
//! - Quote tokens: These are used for quote requests and typically have a high
//! max tokens value. A quote is purely informational, and therefore does not
//! require active liquidity.
//! - Bundle tokens: These are used for bundle requests and typically have a low
//! max tokens value. A bundle indicates an intent to trade, and therefore
//! requires active liquidity.
//!
//! The unit which we rate limit is number of inflight bundles. Therefore, there
//! are two ways for the token bucket to refill:
//! - Wait for the next refill
Expand All @@ -14,23 +22,66 @@ use tokio::sync::Mutex;

/// A type alias for a per-user rate limiter
type BucketMap = HashMap<String, Ratelimiter>;
/// A type alias for a shared bucket map
type SharedBucketMap = Arc<Mutex<BucketMap>>;

/// One minute duration
const ONE_MINUTE: Duration = Duration::from_secs(60);

/// The bundle rate limiter
#[derive(Clone)]
pub struct BundleRateLimiter {
/// The number of bundles allowed per minute
rate_limit: u64,
/// A per-user rate limiter
bucket_map: Arc<Mutex<BucketMap>>,
pub struct AuthServerRateLimiter {
/// The quote rate limiter
quote_rate_limiter: UserRateLimiter,
/// The bundle rate limiter
bundle_rate_limiter: UserRateLimiter,
}

impl BundleRateLimiter {
impl AuthServerRateLimiter {
/// Create a new bundle rate limiter
pub fn new(quote_rate_limit: u64, bundle_rate_limit: u64) -> Self {
Self {
quote_rate_limiter: UserRateLimiter::new(quote_rate_limit),
bundle_rate_limiter: UserRateLimiter::new(bundle_rate_limit),
}
}

/// Consume a quote token from bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
pub async fn check_quote_token(&self, user_id: String) -> bool {
self.quote_rate_limiter.check(user_id).await
}

/// Consume a bundle token from bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
pub async fn check_bundle_token(&self, user_id: String) -> bool {
self.bundle_rate_limiter.check(user_id).await
}

/// Increment the number of tokens available to a given user
#[allow(unused_must_use)]
pub async fn add_bundle_token(&self, user_id: String) {
self.bundle_rate_limiter.add_token(user_id).await;
}
}

/// A per user token bucket rate limiter
#[derive(Clone)]
pub struct UserRateLimiter {
/// The number of tokens allowed per minute
rate_limit: u64,
/// The token buckets in a per-user map
buckets: SharedBucketMap,
}

impl UserRateLimiter {
/// Create a new user rate limiter
pub fn new(rate_limit: u64) -> Self {
Self { rate_limit, bucket_map: Arc::new(Mutex::new(HashMap::new())) }
Self { rate_limit, buckets: Arc::new(Mutex::new(HashMap::new())) }
}

/// Create a new rate limiter
Expand All @@ -42,21 +93,20 @@ impl BundleRateLimiter {
.expect("invalid rate limit configuration")
}

/// Consume a token from bucket if available
/// Consume a token from the bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
pub async fn check(&self, user_id: String) -> bool {
let mut map = self.bucket_map.lock().await;
let mut map = self.buckets.lock().await;
let entry = map.entry(user_id).or_insert_with(|| self.new_rate_limiter());

entry.try_wait().is_ok()
}

/// Increment the number of tokens available to a given user
#[allow(unused_must_use)]
pub async fn add_token(&self, user_id: String) {
let mut map = self.bucket_map.lock().await;
let mut map = self.buckets.lock().await;
let entry = map.entry(user_id).or_insert_with(|| self.new_rate_limiter());

// Set the available tokens
Expand Down
5 changes: 5 additions & 0 deletions auth/auth-server/src/telemetry/quote_comparison/handler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Defines the quote comparison handler
use ethers::utils::format_units;
use ethers::{providers::Middleware, types::U256};
use futures_util::future::join_all;
Expand All @@ -20,8 +22,11 @@ use super::{price_reporter_client::PriceReporterClient, QuoteComparison};

/// Records metrics comparing quotes from different sources
pub struct QuoteComparisonHandler {
/// The sources to compare quotes from
sources: Vec<QuoteSource>,
/// The arbitrum client
arbitrum_client: ArbitrumClient,
/// The price reporter client
price_reporter_client: PriceReporterClient,
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A client for the price reporter
use renegade_common::types::{exchange::Exchange, token::Token};

use crate::telemetry::sources::http_utils::{send_get_request, HttpError};
Expand All @@ -7,16 +9,19 @@ pub const PRICE_ROUTE: &str = "/price";
/// Default timeout for requests to the price reporter
const DEFAULT_TIMEOUT_SECS: u64 = 5;

/// A client for the price reporter
pub struct PriceReporterClient {
/// The base URL of the price reporter
base_url: String,
}

impl PriceReporterClient {
/// Create a new PriceReporterClient
pub fn new(base_url: &str) -> Self {
Self { base_url: base_url.to_string() }
}

/// Get the price of a token from the price reporter
pub async fn get_binance_price(&self, mint: &str) -> Result<Option<f64>, HttpError> {
let exchange = Exchange::Binance;
let quote_mint = Token::from_ticker("USDT").get_addr();
Expand Down
6 changes: 6 additions & 0 deletions auth/auth-server/src/telemetry/sources/http_utils.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
//! Utilities for HTTP requests
use reqwest::{Client, Response};
use serde::Serialize;
use std::time::Duration;
use thiserror::Error;

/// An error with the HTTP client
#[derive(Debug, Error)]
pub enum HttpError {
/// A network error
#[error("Network error: {0}")]
Network(String, #[source] reqwest::Error),

/// An API error
#[error("API error: {0}")]
Api(String),
}

/// Sends a basic GET request
pub async fn send_get_request(url: &str, timeout_secs: u64) -> Result<Response, HttpError> {
let client = Client::builder()
.timeout(Duration::from_secs(timeout_secs))
Expand Down
2 changes: 2 additions & 0 deletions auth/auth-server/src/telemetry/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ impl QuoteSource {
}

impl QuoteSource {
/// Creates a new quote source for the Odos API
pub fn odos(config: odos::OdosConfig) -> Self {
QuoteSource::Odos(odos::OdosQuoteSource::new(config))
}

/// Creates a new quote source for the Odos API with default configuration
pub fn odos_default() -> Self {
Self::odos(odos::OdosConfig::default())
}
Expand Down
8 changes: 7 additions & 1 deletion auth/auth-server/src/telemetry/sources/odos/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A client for the Odos API
use super::types::{OdosQuoteRequest, OdosQuoteResponse};
use crate::telemetry::sources::http_utils::{send_post_request, HttpError};

Expand All @@ -11,9 +13,13 @@ const BASE_URL: &str = "https://api.odos.xyz";
const QUOTE_ROUTE: &str = "/sor/quote/v2";

// Default configuration values
const DEFAULT_CHAIN_ID: u64 = 42161; // Arbitrum
/// Default chain ID for the target blockchain (Arbitrum)
const DEFAULT_CHAIN_ID: u64 = 42161;
/// Default value for disabling RFQs
const DEFAULT_DISABLE_RFQS: bool = false;
/// Default slippage limit as a percentage
const DEFAULT_SLIPPAGE_LIMIT_PERCENT: f64 = 0.3;
/// Default request timeout in seconds
const DEFAULT_TIMEOUT_SECS: u64 = 5;

/// Configuration options for the Odos client
Expand Down
4 changes: 4 additions & 0 deletions auth/auth-server/src/telemetry/sources/odos/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Error types for the Odos client
use thiserror::Error;

/// An error with the Odos client
#[derive(Debug, Error)]
pub enum OdosError {
/// An error with the input
#[error("Invalid input: {0}")]
Input(String),
}
2 changes: 2 additions & 0 deletions auth/auth-server/src/telemetry/sources/odos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A client for the Odos API
mod client;
mod error;
mod types;
Expand Down
6 changes: 6 additions & 0 deletions auth/auth-server/src/telemetry/sources/odos/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
//! Types for the Odos API
use super::{client::OdosConfig, error::OdosError};
use serde::{Deserialize, Serialize};

/// Request structure for the Odos API quote endpoint.
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) struct OdosQuoteRequest {
pub chain_id: u64,
pub input_tokens: Vec<InputToken>,
Expand All @@ -15,6 +18,7 @@ pub(crate) struct OdosQuoteRequest {
/// Response structure from the Odos API quote endpoint.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) struct OdosQuoteResponse {
pub in_amounts: Vec<String>,
pub in_tokens: Vec<String>,
Expand All @@ -25,13 +29,15 @@ pub(crate) struct OdosQuoteResponse {

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) struct InputToken {
pub token_address: String,
pub amount: String,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) struct OutputToken {
pub token_address: String,
pub proportion: f32,
Expand Down

0 comments on commit 2f05f59

Please sign in to comment.