From bb63cbd497a54e71a38fd67815b983950b978c76 Mon Sep 17 00:00:00 2001 From: yaohm Date: Sun, 4 Feb 2024 12:04:03 +0800 Subject: [PATCH] Send messages as Vec --- sdk/src/binary/binary_client.rs | 6 +++ sdk/src/binary/messages.rs | 2 +- sdk/src/bytes_serializable.rs | 4 ++ sdk/src/lib.rs | 1 + sdk/src/messages/send_messages.rs | 20 ++++++++++ sdk/src/quic/client.rs | 8 ++++ sdk/src/tcp/client.rs | 64 ++++++++++++++++++++++++++++++- 7 files changed, 102 insertions(+), 3 deletions(-) diff --git a/sdk/src/binary/binary_client.rs b/sdk/src/binary/binary_client.rs index 8204ce679..ceec5337b 100644 --- a/sdk/src/binary/binary_client.rs +++ b/sdk/src/binary/binary_client.rs @@ -23,4 +23,10 @@ pub trait BinaryClient: Client { async fn set_state(&self, state: ClientState); /// Sends a command and returns the response. async fn send_with_response(&self, command: u32, payload: Bytes) -> Result; + /// Sends a command serialized as vector and returns the response. + async fn send_vec_with_response( + &self, + command: u32, + payload: Vec, + ) -> Result; } diff --git a/sdk/src/binary/messages.rs b/sdk/src/binary/messages.rs index 890835e53..22cd25a1c 100644 --- a/sdk/src/binary/messages.rs +++ b/sdk/src/binary/messages.rs @@ -20,7 +20,7 @@ impl MessageClient for B { async fn send_messages(&self, command: &mut SendMessages) -> Result<(), IggyError> { fail_if_not_authenticated(self).await?; - self.send_with_response(SEND_MESSAGES_CODE, command.as_bytes()) + self.send_vec_with_response(SEND_MESSAGES_CODE, command.as_bytes_vec()) .await?; Ok(()) } diff --git a/sdk/src/bytes_serializable.rs b/sdk/src/bytes_serializable.rs index 2b10a6df8..165760c19 100644 --- a/sdk/src/bytes_serializable.rs +++ b/sdk/src/bytes_serializable.rs @@ -7,6 +7,10 @@ pub trait BytesSerializable { /// Serializes the struct to bytes. fn as_bytes(&self) -> Bytes; + fn as_bytes_vec(&self) -> Vec { + vec![self.as_bytes()] + } + /// Deserializes the struct from bytes. fn from_bytes(bytes: Bytes) -> Result where diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 2347f76dd..6f62ac3a5 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(io_slice_advance)] pub mod args; pub mod binary; pub mod bytes_serializable; diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index e5de46d7b..e70ba06bf 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -421,6 +421,26 @@ impl BytesSerializable for SendMessages { bytes.freeze() } + fn as_bytes_vec(&self) -> Vec { + const HEADER_LEN: usize = 3; + let mut bytes = Vec::with_capacity(HEADER_LEN + self.messages.len()); + + // Push the stream ID, topic ID and partitioning as Header. + let key_bytes = self.partitioning.as_bytes(); + let stream_id_bytes = self.stream_id.as_bytes(); + let topic_id_bytes = self.topic_id.as_bytes(); + bytes.push(stream_id_bytes); + bytes.push(topic_id_bytes); + bytes.push(key_bytes); + + // Push the messages. + for message in &self.messages { + bytes.push(message.as_bytes()); + } + + bytes + } + fn from_bytes(bytes: Bytes) -> Result { if bytes.len() < 11 { return Err(IggyError::InvalidCommand); diff --git a/sdk/src/quic/client.rs b/sdk/src/quic/client.rs index 02c044ba8..878c5fd0c 100644 --- a/sdk/src/quic/client.rs +++ b/sdk/src/quic/client.rs @@ -135,6 +135,14 @@ impl BinaryClient for QuicClient { error!("Cannot send data. Client is not connected."); Err(IggyError::NotConnected) } + + async fn send_vec_with_response( + &self, + _command: u32, + _payload: Vec, + ) -> Result { + unimplemented!() + } } impl QuicClient { diff --git a/sdk/src/tcp/client.rs b/sdk/src/tcp/client.rs index a82f2b123..0936cc728 100644 --- a/sdk/src/tcp/client.rs +++ b/sdk/src/tcp/client.rs @@ -5,6 +5,7 @@ use crate::tcp::config::TcpClientConfig; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use std::fmt::Debug; +use std::io::{self, ErrorKind, IoSlice}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -39,6 +40,9 @@ unsafe impl Sync for TcpClient {} pub(crate) trait ConnectionStream: Debug + Sync + Send { async fn read(&mut self, buf: &mut [u8]) -> Result; async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError>; + async fn write_all_vectored(&mut self, _bufs: Vec) -> Result<(), IggyError> { + unimplemented!("write_all_vectored not implemented") + } async fn flush(&mut self) -> Result<(), IggyError>; } @@ -84,6 +88,26 @@ impl ConnectionStream for TcpConnectionStream { Ok(self.writer.write_all(buf).await?) } + async fn write_all_vectored(&mut self, bufs: Vec) -> Result<(), IggyError> { + let mut bufs: Vec = bufs.iter().map(|b| IoSlice::new(b)).collect(); + let mut bufs: &mut [IoSlice] = bufs.as_mut(); + + while !bufs.is_empty() { + match self.writer.write_vectored(&bufs).await { + Ok(0) => { + return Err(IggyError::IoError(io::Error::new( + ErrorKind::WriteZero, + "failed to write whole buffer", + ))); + } + Ok(n) => IoSlice::advance_slices(&mut bufs, n), + Err(ref e) if e.kind() == ErrorKind::Interrupted => {} + Err(e) => return Err(e.into()), + } + } + Ok(()) + } + async fn flush(&mut self) -> Result<(), IggyError> { Ok(self.writer.flush().await?) } @@ -101,7 +125,7 @@ impl ConnectionStream for TcpTlsConnectionStream { } async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError> { - let result = self.stream.write_all(buf).await; + let result = self.stream.write(buf).await; if let Err(error) = result { return Err(IggyError::from(error)); } @@ -110,7 +134,7 @@ impl ConnectionStream for TcpTlsConnectionStream { } async fn flush(&mut self) -> Result<(), IggyError> { - Ok(()) + Ok(self.stream.flush().await?) } } @@ -244,6 +268,42 @@ impl BinaryClient for TcpClient { error!("Cannot send data. Client is not connected."); Err(IggyError::NotConnected) } + + async fn send_vec_with_response( + &self, + command: u32, + payload: Vec, + ) -> Result { + if self.get_state().await == ClientState::Disconnected { + return Err(IggyError::NotConnected); + } + + let mut stream = self.stream.lock().await; + if let Some(stream) = stream.as_mut() { + let payload_length = + payload.iter().map(|p| p.len()).sum::() + REQUEST_INITIAL_BYTES_LENGTH; + trace!("Sending a TCP request..."); + stream.write(&(payload_length as u32).to_le_bytes()).await?; + stream.write(&command.to_le_bytes()).await?; + stream.write_all_vectored(payload).await?; + stream.flush().await?; + trace!("Sent a TCP request, waiting for a response..."); + + let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + let read_bytes = stream.read(&mut response_buffer).await?; + if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { + error!("Received an invalid or empty response."); + return Err(IggyError::EmptyResponse); + } + + let status = u32::from_le_bytes(response_buffer[..4].try_into().unwrap()); + let length = u32::from_le_bytes(response_buffer[4..].try_into().unwrap()); + return self.handle_response(status, length, stream.as_mut()).await; + } + + error!("Cannot send data. Client is not connected."); + Err(IggyError::NotConnected) + } } impl TcpClient {