Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auth-server: rate-limiter: Rate limit quote endpoint #95

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions auth/auth-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pub struct Cli {
/// The bundle rate limit in bundles per minute
#[arg(long, env = "BUNDLE_RATE_LIMIT", default_value = "4")]
pub bundle_rate_limit: u64,
/// The quote rate limit in quotes per minute
#[arg(long, env = "QUOTE_RATE_LIMIT", default_value = "100")]
pub quote_rate_limit: u64,
/// The path to the file containing token remaps for the given chain
///
/// See https://github.com/renegade-fi/token-mappings for more information on the format of this file
Expand Down
7 changes: 4 additions & 3 deletions auth/auth-server/src/server/handle_external_match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_desc = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_quote_rate_limit(key_desc.clone()).await?;

// Send the request to the relayer
let resp =
Expand All @@ -66,7 +67,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_desc = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_rate_limit(key_desc.clone()).await?;
self.check_bundle_rate_limit(key_desc.clone()).await?;

// Send the request to the relayer
let resp =
Expand Down Expand Up @@ -96,7 +97,7 @@ impl Server {
) -> Result<impl Reply, Rejection> {
// Authorize the request
let key_description = self.authorize_request(path.as_str(), &headers, &body).await?;
self.check_rate_limit(key_description.clone()).await?;
self.check_bundle_rate_limit(key_description.clone()).await?;

// Send the request to the relayer
let resp =
Expand Down Expand Up @@ -187,7 +188,7 @@ impl Server {
// If the bundle settles, increase the API user's a rate limit token balance
let did_settle = await_settlement(&match_resp.match_bundle, &self.arbitrum_client).await?;
if did_settle {
self.add_rate_limit_token(key.clone()).await;
self.add_bundle_rate_limit_token(key.clone()).await;
}

// Record metrics
Expand Down
28 changes: 19 additions & 9 deletions auth/auth-server/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use http::{HeaderMap, Method, Response};
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use rand::Rng;
use rate_limiter::BundleRateLimiter;
use rate_limiter::AuthServerRateLimiter;
use renegade_api::auth::add_expiring_auth_to_headers;
use renegade_arbitrum_client::client::ArbitrumClient;
use renegade_common::types::wallet::keychain::HmacKey;
Expand Down Expand Up @@ -72,7 +72,7 @@ pub struct Server {
/// The Arbitrum client
pub arbitrum_client: ArbitrumClient,
/// The rate limiter
pub rate_limiter: BundleRateLimiter,
pub rate_limiter: AuthServerRateLimiter,
/// The quote metrics recorder
pub quote_metrics: Option<Arc<QuoteComparisonHandler>>,
/// Rate at which to sample metrics (0.0 to 1.0)
Expand All @@ -95,7 +95,8 @@ impl Server {
let relayer_admin_key =
HmacKey::from_base64_string(&args.relayer_admin_key).map_err(AuthServerError::setup)?;

let rate_limiter = BundleRateLimiter::new(args.bundle_rate_limit);
let rate_limiter =
AuthServerRateLimiter::new(args.quote_rate_limit, args.bundle_rate_limit);

// Setup the quote metrics recorder and sources if enabled
let quote_metrics = if args.enable_quote_comparison {
Expand Down Expand Up @@ -182,18 +183,27 @@ impl Server {

// --- Rate Limiting --- //

/// Check the rate limiter
pub async fn check_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check(key_description.clone()).await {
warn!("Rate limit exceeded for key: {key_description}");
/// Check the quote rate limiter
pub async fn check_quote_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check_quote_token(key_description.clone()).await {
warn!("Quote rate limit exceeded for key: {key_description}");
return Err(ApiError::TooManyRequests);
}
Ok(())
}

/// Check the bundle rate limiter
pub async fn check_bundle_rate_limit(&self, key_description: String) -> Result<(), ApiError> {
if !self.rate_limiter.check_bundle_token(key_description.clone()).await {
warn!("Bundle rate limit exceeded for key: {key_description}");
return Err(ApiError::TooManyRequests);
}
Ok(())
}

/// Increment the token balance for a given API user
pub async fn add_rate_limit_token(&self, key_description: String) {
self.rate_limiter.add_token(key_description).await;
pub async fn add_bundle_rate_limit_token(&self, key_description: String) {
self.rate_limiter.add_bundle_token(key_description).await;
}

// --- Caching --- //
Expand Down
72 changes: 61 additions & 11 deletions auth/auth-server/src/server/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
//! A rate limiter for the server
//!
//! We rate limit on two different token schedules:
//! - Quote tokens: These are used for quote requests and typically have a high
//! max tokens value. A quote is purely informational, and therefore does not
//! require active liquidity.
//! - Bundle tokens: These are used for bundle requests and typically have a low
//! max tokens value. A bundle indicates an intent to trade, and therefore
//! requires active liquidity.
//!
//! The unit which we rate limit is number of inflight bundles. Therefore, there
//! are two ways for the token bucket to refill:
//! - Wait for the next refill
Expand All @@ -14,23 +22,66 @@ use tokio::sync::Mutex;

/// A type alias for a per-user rate limiter
type BucketMap = HashMap<String, Ratelimiter>;
/// A type alias for a shared bucket map
type SharedBucketMap = Arc<Mutex<BucketMap>>;

/// One minute duration
const ONE_MINUTE: Duration = Duration::from_secs(60);

/// The bundle rate limiter
#[derive(Clone)]
pub struct BundleRateLimiter {
/// The number of bundles allowed per minute
rate_limit: u64,
/// A per-user rate limiter
bucket_map: Arc<Mutex<BucketMap>>,
pub struct AuthServerRateLimiter {
/// The quote rate limiter
quote_rate_limiter: UserRateLimiter,
/// The bundle rate limiter
bundle_rate_limiter: UserRateLimiter,
}

impl BundleRateLimiter {
impl AuthServerRateLimiter {
/// Create a new bundle rate limiter
pub fn new(quote_rate_limit: u64, bundle_rate_limit: u64) -> Self {
Self {
quote_rate_limiter: UserRateLimiter::new(quote_rate_limit),
bundle_rate_limiter: UserRateLimiter::new(bundle_rate_limit),
}
}

/// Consume a quote token from bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
pub async fn check_quote_token(&self, user_id: String) -> bool {
self.quote_rate_limiter.check(user_id).await
}

/// Consume a bundle token from bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
pub async fn check_bundle_token(&self, user_id: String) -> bool {
self.bundle_rate_limiter.check(user_id).await
}

/// Increment the number of tokens available to a given user
#[allow(unused_must_use)]
pub async fn add_bundle_token(&self, user_id: String) {
self.bundle_rate_limiter.add_token(user_id).await;
}
}

/// A per user token bucket rate limiter
#[derive(Clone)]
pub struct UserRateLimiter {
/// The number of tokens allowed per minute
rate_limit: u64,
/// The token buckets in a per-user map
buckets: SharedBucketMap,
}

impl UserRateLimiter {
/// Create a new user rate limiter
pub fn new(rate_limit: u64) -> Self {
Self { rate_limit, bucket_map: Arc::new(Mutex::new(HashMap::new())) }
Self { rate_limit, buckets: Arc::new(Mutex::new(HashMap::new())) }
}

/// Create a new rate limiter
Expand All @@ -42,21 +93,20 @@ impl BundleRateLimiter {
.expect("invalid rate limit configuration")
}

/// Consume a token from bucket if available
/// Consume a token from the bucket if available
///
/// If no token is available (rate limit reached), this method returns
/// false, otherwise true
pub async fn check(&self, user_id: String) -> bool {
let mut map = self.bucket_map.lock().await;
let mut map = self.buckets.lock().await;
let entry = map.entry(user_id).or_insert_with(|| self.new_rate_limiter());

entry.try_wait().is_ok()
}

/// Increment the number of tokens available to a given user
#[allow(unused_must_use)]
pub async fn add_token(&self, user_id: String) {
let mut map = self.bucket_map.lock().await;
let mut map = self.buckets.lock().await;
let entry = map.entry(user_id).or_insert_with(|| self.new_rate_limiter());

// Set the available tokens
Expand Down
5 changes: 5 additions & 0 deletions auth/auth-server/src/telemetry/quote_comparison/handler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Defines the quote comparison handler

use ethers::utils::format_units;
use ethers::{providers::Middleware, types::U256};
use futures_util::future::join_all;
Expand All @@ -20,8 +22,11 @@ use super::{price_reporter_client::PriceReporterClient, QuoteComparison};

/// Records metrics comparing quotes from different sources
pub struct QuoteComparisonHandler {
/// The sources to compare quotes from
sources: Vec<QuoteSource>,
/// The arbitrum client
arbitrum_client: ArbitrumClient,
/// The price reporter client
price_reporter_client: PriceReporterClient,
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A client for the price reporter

use renegade_common::types::{exchange::Exchange, token::Token};

use crate::telemetry::sources::http_utils::{send_get_request, HttpError};
Expand All @@ -7,16 +9,19 @@ pub const PRICE_ROUTE: &str = "/price";
/// Default timeout for requests to the price reporter
const DEFAULT_TIMEOUT_SECS: u64 = 5;

/// A client for the price reporter
pub struct PriceReporterClient {
/// The base URL of the price reporter
base_url: String,
}

impl PriceReporterClient {
/// Create a new PriceReporterClient
pub fn new(base_url: &str) -> Self {
Self { base_url: base_url.to_string() }
}

/// Get the price of a token from the price reporter
pub async fn get_binance_price(&self, mint: &str) -> Result<Option<f64>, HttpError> {
let exchange = Exchange::Binance;
let quote_mint = Token::from_ticker("USDT").get_addr();
Expand Down
6 changes: 6 additions & 0 deletions auth/auth-server/src/telemetry/sources/http_utils.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
//! Utilities for HTTP requests

use reqwest::{Client, Response};
use serde::Serialize;
use std::time::Duration;
use thiserror::Error;

/// An error with the HTTP client
#[derive(Debug, Error)]
pub enum HttpError {
/// A network error
#[error("Network error: {0}")]
Network(String, #[source] reqwest::Error),

/// An API error
#[error("API error: {0}")]
Api(String),
}

/// Sends a basic GET request
pub async fn send_get_request(url: &str, timeout_secs: u64) -> Result<Response, HttpError> {
let client = Client::builder()
.timeout(Duration::from_secs(timeout_secs))
Expand Down
2 changes: 2 additions & 0 deletions auth/auth-server/src/telemetry/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ impl QuoteSource {
}

impl QuoteSource {
/// Creates a new quote source for the Odos API
pub fn odos(config: odos::OdosConfig) -> Self {
QuoteSource::Odos(odos::OdosQuoteSource::new(config))
}

/// Creates a new quote source for the Odos API with default configuration
pub fn odos_default() -> Self {
Self::odos(odos::OdosConfig::default())
}
Expand Down
8 changes: 7 additions & 1 deletion auth/auth-server/src/telemetry/sources/odos/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A client for the Odos API

use super::types::{OdosQuoteRequest, OdosQuoteResponse};
use crate::telemetry::sources::http_utils::{send_post_request, HttpError};

Expand All @@ -11,9 +13,13 @@ const BASE_URL: &str = "https://api.odos.xyz";
const QUOTE_ROUTE: &str = "/sor/quote/v2";

// Default configuration values
const DEFAULT_CHAIN_ID: u64 = 42161; // Arbitrum
/// Default chain ID for the target blockchain (Arbitrum)
const DEFAULT_CHAIN_ID: u64 = 42161;
/// Default value for disabling RFQs
const DEFAULT_DISABLE_RFQS: bool = false;
/// Default slippage limit as a percentage
const DEFAULT_SLIPPAGE_LIMIT_PERCENT: f64 = 0.3;
/// Default request timeout in seconds
const DEFAULT_TIMEOUT_SECS: u64 = 5;

/// Configuration options for the Odos client
Expand Down
4 changes: 4 additions & 0 deletions auth/auth-server/src/telemetry/sources/odos/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Error types for the Odos client

use thiserror::Error;

/// An error with the Odos client
#[derive(Debug, Error)]
pub enum OdosError {
/// An error with the input
#[error("Invalid input: {0}")]
Input(String),
}
2 changes: 2 additions & 0 deletions auth/auth-server/src/telemetry/sources/odos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A client for the Odos API

mod client;
mod error;
mod types;
Expand Down
6 changes: 6 additions & 0 deletions auth/auth-server/src/telemetry/sources/odos/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
//! Types for the Odos API

use super::{client::OdosConfig, error::OdosError};
use serde::{Deserialize, Serialize};

/// Request structure for the Odos API quote endpoint.
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) struct OdosQuoteRequest {
pub chain_id: u64,
pub input_tokens: Vec<InputToken>,
Expand All @@ -15,6 +18,7 @@ pub(crate) struct OdosQuoteRequest {
/// Response structure from the Odos API quote endpoint.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) struct OdosQuoteResponse {
pub in_amounts: Vec<String>,
pub in_tokens: Vec<String>,
Expand All @@ -25,13 +29,15 @@ pub(crate) struct OdosQuoteResponse {

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) struct InputToken {
pub token_address: String,
pub amount: String,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) struct OutputToken {
pub token_address: String,
pub proportion: f32,
Expand Down
Loading