Skip to content

Commit

Permalink
chore: rename SpuSocket to StreamSocket (#3958)
Browse files Browse the repository at this point in the history
* feat: create stream connection via fluvio admin

* chore: rename SpuSocket to StreamSocket
  • Loading branch information
fraidev authored Apr 24, 2024
1 parent 34fb872 commit 29d1a11
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ k8-diff = { version = "0.1.2" }
trybuild = { branch = "check_option", git = "https://github.com/infinyon/trybuild" }

# Internal fluvio dependencies
fluvio = { version = "0.21.0", path = "crates/fluvio" }
fluvio = { version = "0.22.0", path = "crates/fluvio" }
fluvio-auth = { path = "crates/fluvio-auth" }
fluvio-channel = { path = "crates/fluvio-channel" }
fluvio-cli-common = { path = "crates/fluvio-cli-common"}
Expand Down
9 changes: 5 additions & 4 deletions crates/fluvio-spu/src/core/leader_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use async_lock::Mutex;
use async_trait::async_trait;

use fluvio::metrics::ClientMetrics;
use fluvio::stream_socket::StreamSocket;
use fluvio::{FluvioError, PartitionConsumer};
use fluvio::spu::{SpuDirectory, SpuSocket};
use fluvio::spu::SpuDirectory;
use fluvio_controlplane_metadata::partition::ReplicaKey;
use fluvio_socket::{MultiplexerSocket, ClientConfig, VersionedSerialSocket};
use fluvio_types::{SpuId, PartitionId};
Expand All @@ -20,7 +21,7 @@ use super::spus::SharedSpuLocalStore;
pub struct LeaderConnections {
spus: SharedSpuLocalStore,
replicas: SharedReplicaLocalStore,
leaders: Arc<Mutex<HashMap<SpuId, SpuSocket>>>,
leaders: Arc<Mutex<HashMap<SpuId, StreamSocket>>>,
metrics: Arc<ClientMetrics>,
}

Expand All @@ -39,13 +40,13 @@ impl LeaderConnections {

/// create a connection to leader, it can't find it, return
#[instrument(skip(self))]
async fn connect_to_leader(&self, leader: SpuId) -> Result<SpuSocket, FluvioError> {
async fn connect_to_leader(&self, leader: SpuId) -> Result<StreamSocket, FluvioError> {
if let Some(spu) = self.spus.spec(&leader) {
debug!("connecting to spu : {:#?}", spu);
let client_config = ClientConfig::with_addr(spu.public_endpoint.addr());
let versioned_socket = client_config.connect().await?;
let (socket, config, versions) = versioned_socket.split();
Ok(SpuSocket::new(
Ok(StreamSocket::new(
config,
MultiplexerSocket::shared(socket),
versions,
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.21.9"
version = "0.22.0"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod config;
pub mod consumer;
pub mod metrics;
pub mod spu;
pub mod stream_socket;

pub use error::FluvioError;
pub use config::FluvioConfig;
Expand Down
67 changes: 7 additions & 60 deletions crates/fluvio/src/spu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,14 @@ use async_trait::async_trait;

use fluvio_protocol::record::ReplicaKey;
use fluvio_protocol::api::Request;
use fluvio_protocol::api::RequestMessage;
use fluvio_types::SpuId;
use fluvio_socket::{
Versions, VersionedSerialSocket, ClientConfig, MultiplexerSocket, SharedMultiplexerSocket,
SocketError, AsyncResponse,
VersionedSerialSocket, ClientConfig, MultiplexerSocket, SocketError, AsyncResponse,
};
use crate::stream_socket::StreamSocket;
use crate::FluvioError;
use crate::sync::MetadataStores;

const DEFAULT_STREAM_QUEUE_SIZE: usize = 10;

/// used for connecting to spu
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
Expand All @@ -43,61 +40,11 @@ pub trait SpuDirectory {
R: Sync + Send;
}

/// Stream Socket to SPU
#[derive(Debug)]
pub struct SpuSocket {
config: Arc<ClientConfig>,
socket: SharedMultiplexerSocket,
versions: Versions,
}

impl SpuSocket {
pub fn new(
config: Arc<ClientConfig>,
socket: SharedMultiplexerSocket,
versions: Versions,
) -> Self {
Self {
config,
socket,
versions,
}
}

pub async fn create_serial_socket(&mut self) -> VersionedSerialSocket {
VersionedSerialSocket::new(
self.socket.clone(),
self.config.clone(),
self.versions.clone(),
)
}

pub fn is_stale(&self) -> bool {
self.socket.is_stale()
}

pub async fn create_stream_with_version<R: Request>(
&mut self,
request: R,
version: i16,
) -> Result<AsyncResponse<R>, FluvioError> {
let mut req_msg = RequestMessage::new_request(request);
req_msg.header.set_api_version(version);
req_msg
.header
.set_client_id(self.config.client_id().to_owned());
self.socket
.create_stream(req_msg, DEFAULT_STREAM_QUEUE_SIZE)
.await
.map_err(|err| err.into())
}
}

/// connection pool to spu
pub struct SpuPool {
config: Arc<ClientConfig>,
pub(crate) metadata: MetadataStores,
spu_clients: Arc<Mutex<HashMap<SpuId, SpuSocket>>>,
spu_clients: Arc<Mutex<HashMap<SpuId, StreamSocket>>>,
}

impl Drop for SpuPool {
Expand All @@ -123,7 +70,7 @@ impl SpuPool {

/// create new spu socket
#[instrument(skip(self))]
async fn connect_to_leader(&self, leader: SpuId) -> Result<SpuSocket, FluvioError> {
async fn connect_to_leader(&self, leader: SpuId) -> Result<StreamSocket, FluvioError> {
let spu = self.metadata.spus().look_up_by_id(leader).await?;

let mut client_config = self.config.with_prefix_sni_domain(spu.key());
Expand All @@ -141,11 +88,11 @@ impl SpuPool {
client_config.set_addr(spu_addr);
let versioned_socket = client_config.connect().await?;
let (socket, config, versions) = versioned_socket.split();
Ok(SpuSocket {
socket: MultiplexerSocket::shared(socket),
Ok(StreamSocket::new(
config,
MultiplexerSocket::shared(socket),
versions,
})
))
}

#[instrument(skip(self))]
Expand Down
59 changes: 59 additions & 0 deletions crates/fluvio/src/stream_socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::sync::Arc;

use fluvio_protocol::api::{Request, RequestMessage};
use fluvio_socket::{
AsyncResponse, ClientConfig, SharedMultiplexerSocket, VersionedSerialSocket, Versions,
};

use crate::FluvioError;
const DEFAULT_STREAM_QUEUE_SIZE: usize = 10;

/// Stream Socket
#[derive(Debug)]
pub struct StreamSocket {
config: Arc<ClientConfig>,
socket: SharedMultiplexerSocket,
versions: Versions,
}

impl StreamSocket {
pub fn new(
config: Arc<ClientConfig>,
socket: SharedMultiplexerSocket,
versions: Versions,
) -> Self {
Self {
config,
socket,
versions,
}
}

pub async fn create_serial_socket(&mut self) -> VersionedSerialSocket {
VersionedSerialSocket::new(
self.socket.clone(),
self.config.clone(),
self.versions.clone(),
)
}

pub fn is_stale(&self) -> bool {
self.socket.is_stale()
}

pub async fn create_stream_with_version<R: Request>(
&mut self,
request: R,
version: i16,
) -> Result<AsyncResponse<R>, FluvioError> {
let mut req_msg = RequestMessage::new_request(request);
req_msg.header.set_api_version(version);
req_msg
.header
.set_client_id(self.config.client_id().to_owned());
self.socket
.create_stream(req_msg, DEFAULT_STREAM_QUEUE_SIZE)
.await
.map_err(|err| err.into())
}
}

0 comments on commit 29d1a11

Please sign in to comment.