From 6ac48fca0d13c55d107e2bb09e972b8d56d2bc10 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Thu, 30 Jan 2025 17:25:20 -0500 Subject: [PATCH 1/3] fix: metered bytes log --- src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main.rs b/src/main.rs index d24e544..1e24977 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1031,6 +1031,8 @@ async fn run() -> Result<(), S2CliError> { .bold() ); + total_data_len = 0; + writer.flush().await.expect("writer flush"); } } From 7de90ff56b8e6f1a6274ceb6ea95703d5bfc71c3 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Wed, 5 Feb 2025 10:17:38 -0500 Subject: [PATCH 2/3] cr --- src/main.rs | 49 ++----------------------------------------------- 1 file changed, 2 insertions(+), 47 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1e24977..f9533e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ use s2::{ client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ AppendRecord, AppendRecordBatch, BasinInfo, CommandRecord, ConvertError, FencingToken, - MeteredBytes as _, ReadOutput, StreamInfo, + ReadOutput, StreamInfo, }, }; use stream::{RecordStream, StreamService}; @@ -31,7 +31,6 @@ use tokio::{ fs::{File, OpenOptions}, io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufWriter}, select, - time::Instant, }; use tokio::{signal, sync::mpsc}; use tokio_stream::{ @@ -906,23 +905,14 @@ async fn run() -> Result<(), S2CliError> { .await?; let mut writer = output.into_writer().await.unwrap(); - let mut start = None; - let mut total_data_len = 0; - loop { select! { maybe_read_result = read_output_stream.next() => { match maybe_read_result { Some(read_result) => { - if start.is_none() { - start = Some(Instant::now()); - } match read_result { Ok(ReadOutput::Batch(sequenced_record_batch)) => { - let num_records = sequenced_record_batch.records.len(); - let mut batch_len = 0; - - let seq_range = match ( + match ( sequenced_record_batch.records.first(), sequenced_record_batch.records.last(), ) { @@ -930,8 +920,6 @@ async fn run() -> Result<(), S2CliError> { _ => panic!("empty batch"), }; for sequenced_record in sequenced_record_batch.records { - batch_len += sequenced_record.metered_bytes(); - if let Some(command_record) = sequenced_record.as_command_record() { let (cmd, description) = match command_record { CommandRecord::Fence { fencing_token } => ( @@ -975,22 +963,6 @@ async fn run() -> Result<(), S2CliError> { .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; } } - total_data_len += batch_len; - - let throughput_mibps = (total_data_len as f64 - / start.unwrap().elapsed().as_secs_f64()) - / 1024.0 - / 1024.0; - - eprintln!( - "{}", - format!( - "⦿ {throughput_mibps:.2} MiB/s \ - ({num_records} records in range {seq_range:?})", - ) - .blue() - .bold() - ); } Ok(ReadOutput::FirstSeqNum(seq_num)) => { @@ -1015,23 +987,6 @@ async fn run() -> Result<(), S2CliError> { break; } } - let total_elapsed_time = start.unwrap().elapsed().as_secs_f64(); - - let total_throughput_mibps = - (total_data_len as f64 / total_elapsed_time) / 1024.0 / 1024.0; - - eprintln!( - "{}", - format!( - "{total_data_len} metered bytes in \ - {total_elapsed_time} seconds \ - at {total_throughput_mibps:.2} MiB/s" - ) - .yellow() - .bold() - ); - - total_data_len = 0; writer.flush().await.expect("writer flush"); } From 3770c299dc4a9f7d13a9cb56522cc21c7a050efd Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Wed, 5 Feb 2025 11:01:16 -0500 Subject: [PATCH 3/3] .. --- src/main.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index f9533e6..4079830 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ use s2::{ client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ AppendRecord, AppendRecordBatch, BasinInfo, CommandRecord, ConvertError, FencingToken, - ReadOutput, StreamInfo, + MeteredBytes as _, ReadOutput, StreamInfo, }, }; use stream::{RecordStream, StreamService}; @@ -912,7 +912,10 @@ async fn run() -> Result<(), S2CliError> { Some(read_result) => { match read_result { Ok(ReadOutput::Batch(sequenced_record_batch)) => { - match ( + let num_records = sequenced_record_batch.records.len(); + let mut batch_len = 0; + + let seq_range = match ( sequenced_record_batch.records.first(), sequenced_record_batch.records.last(), ) { @@ -920,6 +923,8 @@ async fn run() -> Result<(), S2CliError> { _ => panic!("empty batch"), }; for sequenced_record in sequenced_record_batch.records { + batch_len += sequenced_record.metered_bytes(); + if let Some(command_record) = sequenced_record.as_command_record() { let (cmd, description) = match command_record { CommandRecord::Fence { fencing_token } => ( @@ -963,6 +968,16 @@ async fn run() -> Result<(), S2CliError> { .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; } } + + eprintln!( + "{}", + format!( + "⦿ {batch_len} bytes \ + ({num_records} records in range {seq_range:?})", + ) + .blue() + .bold() + ); } Ok(ReadOutput::FirstSeqNum(seq_num)) => {