Skip to content

Commit

Permalink
Add more metrics + dashboards for the integration tests
Browse files Browse the repository at this point in the history
Slowly populating Balter code with more emitted metrics. Additionally,
adding dashboards for all the tests to make it easier to debug
controller instability and logic.
  • Loading branch information
byronwasti committed Apr 1, 2024
1 parent 70cbc1f commit 9fc6a4b
Show file tree
Hide file tree
Showing 5 changed files with 741 additions and 7 deletions.
2 changes: 1 addition & 1 deletion balter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,4 @@ Consensus is done by sending all information between two nodes and each taking t

The Balter repository is set up to be easy to get started with development. It uses Nix to facilitate the environment setup via `shell.nix` (if you haven't yet drank the Nixaide, open up that file and it will give you an idea of the programs you'll want).

To run the integration tests, use `cargo test --release --features integration`.
To run the integration tests, use `cargo test --release --features integration`. In order to easily debug these tests (which oftentimes rely on controller logic operating correctly), it can be useful to have graphs. You can find Grafana dashboards for each test in `dashboards/`, and if you have Prometheus running (using the `prometheus.yml` at the root) and Grafana running (importing the dashboards) you should be set.
3 changes: 1 addition & 2 deletions balter/src/scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,12 @@ where
let start = Instant::now();

let mut controllers = CompositeController::new(&config);
let mut sampler = ConcurrentSampler::new(scenario, controllers.initial_tps());
let mut sampler = ConcurrentSampler::new(&config.name, scenario, controllers.initial_tps());

// NOTE: This loop is time-sensitive. Any long awaits or blocking will throw off measurements
loop {
if let Some(samples) = sampler.get_samples().await {
let new_goal_tps = controllers.limit(&samples);
info!("New Goal TPS: {new_goal_tps}");
sampler.set_goal_tps(new_goal_tps);
}

Expand Down
44 changes: 41 additions & 3 deletions balter/src/scenario/tps_sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const SAMPLE_WINDOW_SIZE: usize = 10;
const SKIP_SIZE: usize = 3;

pub(crate) struct ConcurrentSampler<T> {
base_label: String,
tps_sampler: TpsSampler<T>,
cc: ConcurrencyController,
samples: SampleSet,
Expand All @@ -33,14 +34,21 @@ where
T: Fn() -> F + Send + Sync + 'static + Clone,
F: Future<Output = ()> + Send,
{
pub(crate) fn new(scenario: T, goal_tps: NonZeroU32) -> Self {
Self {
pub(crate) fn new(name: &str, scenario: T, goal_tps: NonZeroU32) -> Self {
let new = Self {
base_label: format!("balter_{name}"),
tps_sampler: TpsSampler::new(scenario, goal_tps),
cc: ConcurrencyController::new(goal_tps),
samples: SampleSet::new(SAMPLE_WINDOW_SIZE).skip_first_n(SKIP_SIZE),
needs_clear: false,
tps_limited: false,
};

if cfg!(feature = "metrics") {
new.goal_tps_metric(goal_tps);
}

new
}

pub(crate) async fn get_samples(&mut self) -> Option<&SampleSet> {
Expand All @@ -58,14 +66,31 @@ where

if self.samples.full() {
match self.cc.analyze(&self.samples) {
CCOutcome::Stable => {}
CCOutcome::Stable => {
if cfg!(feature = "metrics") {
// TODO: Given these metric recordings aren't on the hot-path it is likely
// okay that we allocate for them. But if there is a simple way to avoid it
// that would be preferable.
metrics::gauge!(format!("{}_cc_state", &self.base_label)).set(1);
}
}
CCOutcome::TpsLimited(max_tps, concurrency) => {
// TODO: There is currently no way to get _out_ of being tps_limited. This may
// or may not be a problem, but it would be good to evaluate other options.
self.tps_limited = true;
self.set_concurrency(concurrency);
self.set_goal_tps_unchecked(max_tps);

if cfg!(feature = "metrics") {
metrics::gauge!(format!("{}_cc_state", &self.base_label)).set(2);
}
}
CCOutcome::AlterConcurrency(concurrency) => {
self.set_concurrency(concurrency);

if cfg!(feature = "metrics") {
metrics::gauge!(format!("{}_cc_state", &self.base_label)).set(0);
}
}
}

Expand All @@ -91,15 +116,28 @@ where
self.needs_clear = true;
info!("Setting concurrency to: {concurrency}");
self.tps_sampler.set_concurrency(concurrency);

if cfg!(feature = "metrics") {
metrics::gauge!(format!("{}_concurrency", &self.base_label)).set(concurrency as f64);
}
}

fn set_goal_tps_unchecked(&mut self, goal_tps: NonZeroU32) {
if goal_tps != self.tps_sampler.tps_limit {
self.needs_clear = true;
self.cc.set_goal_tps(goal_tps);
self.tps_sampler.set_tps_limit(goal_tps);

if cfg!(feature = "metrics") {
self.goal_tps_metric(goal_tps);
}
}
}

#[cfg(feature="metrics")]
fn goal_tps_metric(&self, goal_tps: NonZeroU32) {
metrics::gauge!(format!("{}_goal_tps", &self.base_label)).set(goal_tps.get());
}
}

pub(crate) struct TpsSampler<T> {
Expand Down
Loading

0 comments on commit 9fc6a4b

Please sign in to comment.