diff --git a/graph-gateway/src/budgets.rs b/graph-gateway/src/budgets.rs index 93798dd3..d8c39a3b 100644 --- a/graph-gateway/src/budgets.rs +++ b/graph-gateway/src/budgets.rs @@ -105,15 +105,15 @@ impl Actor { fn revise_budget(&mut self) { let target = self.controller.target_query_fees; - let surplus = self.controller.surplus(); - tracing::debug!(budget_surplus = %surplus); + let control_variable = self.controller.control_variable(); + tracing::debug!(budget_control_variable = %control_variable); let now = Instant::now(); let budgets = self .volume_estimators .iter() .map(|(deployment, volume_estimator)| { let volume = volume_estimator.monthly_volume_estimate(now) as u64; - let mut budget = volume_discount(volume, target) + surplus; + let mut budget = volume_discount(volume, target) * control_variable; // limit budget to 100x target budget = budget.min(target * USD::try_from(100_u64).unwrap()); (*deployment, budget) @@ -128,15 +128,11 @@ fn volume_discount(monthly_volume: u64, target: USD) -> USD { // Discount the budget, based on a generalized logistic function. We apply little to no discount // between 0 and ~10e3 queries per month. And we limit the discount to a minimum budget of // 10E-6 USD. - // https://www.desmos.com/calculator/awtbdpoehu + // https://www.desmos.com/calculator/5ue96zyvjw let b_min = MIN_BUDGET_USD; - // TODO: this 250 magic number is to help temporarily mimic the outomes of the old volume - // discounting (https://www.desmos.com/calculator/afjpgynlsp) - let b_max = target.as_f64() * 250.0; - let m: f64 = 1e3; - // dips below old volume discounting outcomes between (10^4, 10^8), to compensate for higher - // query count per request - let z: f64 = 0.69; + let b_max = target.as_f64(); + let m: f64 = 1e6; + let z: f64 = 1.0; let v = monthly_volume as f64; let budget = b_min + ((b_max - b_min) * m.powf(z)) / (v + m).powf(z); // 52fcdb5f-8557-4ebb-968d-46e7756aa63f @@ -148,7 +144,7 @@ struct Controller { target_query_fees: USD, recent_fees: USD, recent_query_count: u64, - error_history: [USD; 10], + error_history: FastDecayBuffer, } impl Controller { @@ -157,7 +153,7 @@ impl Controller { target_query_fees, recent_fees: USD::zero(), recent_query_count: 0, - error_history: <[USD; 10]>::default(), + error_history: FastDecayBuffer::default(), } } @@ -166,18 +162,21 @@ impl Controller { self.recent_query_count += query_count; } - fn surplus(&mut self) -> USD { - // Consider this a control system where `target_query_fees` is the setpoint, and - // `recent_query_fees` is the process variable. - let recent_query_fees = - self.recent_fees / USD::try_from(self.recent_query_count.max(1)).unwrap(); - METRICS.avg_query_fees.set(recent_query_fees.as_f64()); + fn control_variable(&mut self) -> USD { + // See the following link if you're unfamiliar with PID controllers: + // https://en.wikipedia.org/wiki/Proportional%E2%80%93integral%E2%80%93derivative_controller + let process_variable = self.recent_fees.as_f64() / self.recent_query_count.max(1) as f64; + METRICS.avg_query_fees.set(process_variable); + self.recent_fees = USD::zero(); self.recent_query_count = 0; - // This is effectively a PID controller with just an integral term. - self.error_history.rotate_left(1); - self.error_history[0] = self.target_query_fees.saturating_sub(recent_query_fees); - self.error_history.iter().copied().sum() + self.error_history.decay(); + let error = self.target_query_fees.as_f64() - process_variable; + *self.error_history.current_mut() = error; + + let i: f64 = self.error_history.frames().iter().sum(); + let k_i = 3e4; + USD::try_from(1.0).unwrap() + USD::try_from(i * k_i).unwrap_or(USD::zero()) } } @@ -236,6 +235,8 @@ impl VolumeEstimator { #[cfg(test)] mod tests { + use indexer_selection::test_utils::assert_within; + use super::*; #[track_caller] @@ -322,4 +323,43 @@ mod tests { assert!(estimation > queries); assert!(estimation < (queries * 1.03)); } + + #[test] + fn controller() { + fn test_controller( + controller: &mut Controller, + process_variable_multiplier: f64, + tolerance: f64, + ) { + let setpoint = controller.target_query_fees.as_f64(); + let mut process_variable = USD::zero(); + for i in 0..30 { + let control_variable = controller.control_variable(); + process_variable = controller.target_query_fees + * USD::try_from(process_variable_multiplier).unwrap() + * control_variable; + println!( + "{i:02} SP={setpoint:.6}, PV={:.8}, CV={:.8}", + process_variable.as_f64(), + control_variable.as_f64(), + ); + controller.add_queries(process_variable, 1); + } + assert_within(process_variable.as_f64(), setpoint, tolerance); + } + + for setpoint in [20e-6, 40e-6] { + let mut controller = Controller::new(USD::try_from(setpoint).unwrap()); + test_controller(&mut controller, 0.2, 1e-6); + let mut controller = Controller::new(USD::try_from(setpoint).unwrap()); + test_controller(&mut controller, 0.6, 1e-6); + let mut controller = Controller::new(USD::try_from(setpoint).unwrap()); + test_controller(&mut controller, 0.8, 1e-6); + + let mut controller = Controller::new(USD::try_from(setpoint).unwrap()); + test_controller(&mut controller, 0.2, 1e-6); + test_controller(&mut controller, 0.6, 1e-6); + test_controller(&mut controller, 0.7, 1e-6); + } + } } diff --git a/indexer-selection/src/fee.rs b/indexer-selection/src/fee.rs index a69513cd..45ef4c7b 100644 --- a/indexer-selection/src/fee.rs +++ b/indexer-selection/src/fee.rs @@ -88,7 +88,7 @@ pub fn fee_utility(fee: &GRT, budget: &GRT) -> UtilityFactor { utility = utility.max(1e-18); UtilityFactor { utility, - weight: 0.5, + weight: WEIGHT, } }