Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: metered bytes log #121

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use tokio::{
fs::{File, OpenOptions},
io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufWriter},
select,
time::Instant,
};
use tokio::{signal, sync::mpsc};
use tokio_stream::{
Expand Down Expand Up @@ -906,17 +905,11 @@ async fn run() -> Result<(), S2CliError> {
.await?;
let mut writer = output.into_writer().await.unwrap();

let mut start = None;
let mut total_data_len = 0;

loop {
select! {
maybe_read_result = read_output_stream.next() => {
match maybe_read_result {
Some(read_result) => {
if start.is_none() {
start = Some(Instant::now());
}
match read_result {
Ok(ReadOutput::Batch(sequenced_record_batch)) => {
let num_records = sequenced_record_batch.records.len();
Expand Down Expand Up @@ -975,18 +968,12 @@ async fn run() -> Result<(), S2CliError> {
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
}
}
total_data_len += batch_len;

let throughput_mibps = (total_data_len as f64
/ start.unwrap().elapsed().as_secs_f64())
/ 1024.0
/ 1024.0;

eprintln!(
"{}",
format!(
"⦿ {throughput_mibps:.2} MiB/s \
({num_records} records in range {seq_range:?})",
"⦿ {batch_len} bytes \
({num_records} records in range {seq_range:?})",
)
.blue()
.bold()
Expand Down Expand Up @@ -1015,21 +1002,6 @@ async fn run() -> Result<(), S2CliError> {
break;
}
}
let total_elapsed_time = start.unwrap().elapsed().as_secs_f64();

let total_throughput_mibps =
(total_data_len as f64 / total_elapsed_time) / 1024.0 / 1024.0;

eprintln!(
"{}",
format!(
"{total_data_len} metered bytes in \
{total_elapsed_time} seconds \
at {total_throughput_mibps:.2} MiB/s"
)
.yellow()
.bold()
);

writer.flush().await.expect("writer flush");
}
Expand Down