From 030133f8295bbc18bafa6183c846bd9a95de5025 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Tue, 17 Sep 2024 15:14:37 -0500 Subject: [PATCH] gazette: refactor Router to make clients cheap to clone Don't configure Router with a default service address. Instead, journal and shard Client instances are configured with a default service address and metadata which is _used_ by Router when picking a route. This makes it possible to cheaply clone Client instances and give each a different service address and authorization header, while still using the same underlying pool of gRPC connections. --- crates/agent/src/controlplane.rs | 10 +++---- crates/agent/src/publications/builds.rs | 7 ++--- crates/dekaf/src/topology.rs | 10 +++---- crates/gazette/src/journal/list.rs | 2 +- crates/gazette/src/journal/mod.rs | 30 +++++++++++++++++--- crates/gazette/src/journal/read.rs | 5 +++- crates/gazette/src/router.rs | 33 +++++++++++----------- crates/gazette/src/shard/mod.rs | 37 +++++++++++++++++++++---- 8 files changed, 91 insertions(+), 43 deletions(-) diff --git a/crates/agent/src/controlplane.rs b/crates/agent/src/controlplane.rs index 5252eabad1..263fc186b4 100644 --- a/crates/agent/src/controlplane.rs +++ b/crates/agent/src/controlplane.rs @@ -212,14 +212,14 @@ impl PGControlPlane { .context("failed to sign claims for data-plane")?; // Create the journal and shard clients that are used for interacting with the data plane - let journal_router = gazette::Router::new(&data_plane.broker_address, "local")?; + let router = gazette::Router::new("local"); let journal_client = gazette::journal::Client::new( - reqwest::Client::default(), - journal_router, + data_plane.broker_address, metadata.clone(), + router.clone(), ); - let shard_router = gazette::Router::new(&data_plane.reactor_address, "local")?; - let shard_client = gazette::shard::Client::new(shard_router, metadata); + let shard_client = + gazette::shard::Client::new(data_plane.reactor_address, metadata, router); Ok(( shard_client, diff --git a/crates/agent/src/publications/builds.rs b/crates/agent/src/publications/builds.rs index fa8cad2c9c..eeb69d36f0 100644 --- a/crates/agent/src/publications/builds.rs +++ b/crates/agent/src/publications/builds.rs @@ -156,11 +156,10 @@ pub async fn test_catalog( // Activate all derivations. let metadata = gazette::Metadata::default(); - let journal_router = gazette::Router::new(&broker_sock, "local")?; + let router = gazette::Router::new("local"); let journal_client = - gazette::journal::Client::new(Default::default(), journal_router, metadata.clone()); - let shard_router = gazette::Router::new(&consumer_sock, "local")?; - let shard_client = gazette::shard::Client::new(shard_router, metadata); + gazette::journal::Client::new(broker_sock.clone(), metadata.clone(), router.clone()); + let shard_client = gazette::shard::Client::new(consumer_sock.clone(), metadata, router); for built in catalog .built diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 286f435a1c..66911314f4 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -248,7 +248,7 @@ impl Collection { gateway_url: String, } - let auth: [Auth; 1] = client + let [auth]: [Auth; 1] = client .rpc("gateway_auth_token", body) .build() .send() @@ -260,15 +260,15 @@ impl Collection { tracing::debug!( collection, - gateway = auth[0].gateway_url, + gateway = auth.gateway_url, "fetched data-plane token" ); let mut metadata = gazette::Metadata::default(); - metadata.bearer_token(&auth[0].token)?; + metadata.bearer_token(&auth.token)?; - let router = gazette::Router::new(&auth[0].gateway_url, "dekaf")?; - let client = journal::Client::new(Default::default(), router, metadata); + let router = gazette::Router::new("dekaf"); + let client = journal::Client::new(auth.gateway_url, metadata, router); Ok(client) } diff --git a/crates/gazette/src/journal/list.rs b/crates/gazette/src/journal/list.rs index c51c335fb6..142bcce412 100644 --- a/crates/gazette/src/journal/list.rs +++ b/crates/gazette/src/journal/list.rs @@ -51,7 +51,7 @@ impl Client { router: &crate::Router, req: &broker::ListRequest, ) -> crate::Result> { - let mut client = self.into_sub(router.route(None, false).await?); + let mut client = self.into_sub(router.route(None, false, &self.default).await?); Ok(client.list(req.clone()).await?.into_inner()) } } diff --git a/crates/gazette/src/journal/mod.rs b/crates/gazette/src/journal/mod.rs index af13df0203..db47ad7c1d 100644 --- a/crates/gazette/src/journal/mod.rs +++ b/crates/gazette/src/journal/mod.rs @@ -14,22 +14,43 @@ type SubClient = proto_grpc::broker::journal_client::JournalClient< #[derive(Clone)] pub struct Client { + default: broker::process_spec::Id, http: reqwest::Client, metadata: crate::Metadata, router: crate::Router, } impl Client { - pub fn new(http: reqwest::Client, router: crate::Router, metadata: crate::Metadata) -> Self { + /// Build a Client which dispatches request to the given default endpoint with the given Metadata. + /// The provider Router enables re-use of connections to brokers. + pub fn new(endpoint: String, metadata: crate::Metadata, router: crate::Router) -> Self { Self { + default: broker::process_spec::Id { + zone: String::new(), + suffix: endpoint, + }, metadata, - http, + http: reqwest::Client::default(), router, } } + /// Build a new Client which uses a different endpoint and metadata but re-uses underlying connections. + pub fn with_endpoint_and_metadata(&self, endpoint: String, metadata: crate::Metadata) -> Self { + Self { + default: broker::process_spec::Id { + zone: String::new(), + suffix: endpoint, + }, + http: self.http.clone(), + metadata, + router: self.router.clone(), + } + } + + /// Invoke the Gazette journal Apply API. pub async fn apply(&self, req: broker::ApplyRequest) -> crate::Result { - let mut client = self.into_sub(self.router.route(None, false).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); let resp = client .apply(req) @@ -40,11 +61,12 @@ impl Client { check_ok(resp.status(), resp) } + /// Invoke the Gazette journal ListFragments API. pub async fn list_fragments( &self, req: broker::FragmentsRequest, ) -> crate::Result { - let mut client = self.into_sub(self.router.route(None, false).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); let resp = client .list_fragments(req) diff --git a/crates/gazette/src/journal/read.rs b/crates/gazette/src/journal/read.rs index 443b39a146..ef50199fff 100644 --- a/crates/gazette/src/journal/read.rs +++ b/crates/gazette/src/journal/read.rs @@ -4,6 +4,9 @@ use futures::TryStreamExt; use proto_gazette::broker; impl Client { + /// Invoke the Gazette journal Read API. + /// This routine directly fetches journal fragments from cloud storage where possible, + /// rather than reading through the broker. pub fn read( self, mut req: broker::ReadRequest, @@ -46,7 +49,7 @@ impl Client { write_head: &mut i64, ) -> crate::Result<()> { let route = req.header.as_ref().and_then(|hdr| hdr.route.as_ref()); - let mut client = self.into_sub(self.router.route(route, false).await?); + let mut client = self.into_sub(self.router.route(route, false, &self.default).await?); // Fetch metadata first before we start the actual read. req.metadata_only = true; diff --git a/crates/gazette/src/router.rs b/crates/gazette/src/router.rs index 4ef323957b..5564f4d3df 100644 --- a/crates/gazette/src/router.rs +++ b/crates/gazette/src/router.rs @@ -21,26 +21,21 @@ pub struct Router { } struct Inner { states: std::sync::Mutex>, - default_endpoint: String, zone: String, } impl Router { /// Create a new Router with the given default service endpoint, /// which prefers to route to members in `zone` where possible. - pub fn new(default_endpoint: &str, zone: &str) -> Result { - let (default_endpoint, zone) = (default_endpoint.to_string(), zone.to_string()); + pub fn new(zone: &str) -> Self { + let zone = zone.to_string(); - let _endpoint = tonic::transport::Endpoint::from_shared(default_endpoint.clone()) - .map_err(|_err| Error::InvalidEndpoint(default_endpoint.clone()))?; - - Ok(Self { + Self { inner: Arc::new(Inner { states: Default::default(), - default_endpoint, zone, }), - }) + } } /// Map an optional broker::Route and indication of whether the "primary" @@ -52,10 +47,11 @@ impl Router { &self, route: Option<&broker::Route>, primary: bool, + default: &MemberId, ) -> Result { - let (index, state) = self.pick(route, primary); + let (index, state) = self.pick(route, primary, &default); - // Acquire `id`-specific, async-aware lock. + // Acquire MemberId-specific, async-aware lock. let mut state = state.lock().await; // Fast path: client is dialed and ready. @@ -67,7 +63,7 @@ impl Router { // Slow path: start dialing the endpoint. let channel = super::dial_channel(match index { Some(index) => &route.unwrap().endpoints[index], - None => &self.inner.default_endpoint, + None => &default.suffix, }) .await?; @@ -76,16 +72,19 @@ impl Router { Ok(channel) } - fn pick(&self, route: Option<&broker::Route>, primary: bool) -> (Option, DialState) { + fn pick( + &self, + route: Option<&broker::Route>, + primary: bool, + default: &MemberId, + ) -> (Option, DialState) { // Acquire non-async lock which *cannot* be held across an await point. let mut states = self.inner.states.lock().unwrap(); let index = pick(route, primary, &self.inner.zone, &states); - let default_id = MemberId::default(); - let id = match index { Some(index) => &route.unwrap().members[index], - None => &default_id, + None => default, }; let state = match states.get(id) { @@ -96,7 +95,7 @@ impl Router { (index, state) } - // Identify Channels which have not been used since the preceeding sweep, and close them. + // Identify Channels which have not been used since the preceding sweep, and close them. // As members come and go, Channels may no longer needed. // Call sweep() periodically to clear them out. pub fn sweep(&self) { diff --git a/crates/gazette/src/shard/mod.rs b/crates/gazette/src/shard/mod.rs index 099ec84980..aa54058f77 100644 --- a/crates/gazette/src/shard/mod.rs +++ b/crates/gazette/src/shard/mod.rs @@ -1,4 +1,4 @@ -use proto_gazette::consumer; +use proto_gazette::{broker, consumer}; use tonic::transport::Channel; // SubClient is the routed sub-client of Client. @@ -8,20 +8,43 @@ type SubClient = proto_grpc::consumer::shard_client::ShardClient< #[derive(Clone)] pub struct Client { + default: broker::process_spec::Id, metadata: crate::Metadata, router: crate::Router, } impl Client { - pub fn new(router: crate::Router, metadata: crate::Metadata) -> Self { - Self { metadata, router } + /// Build a Client which dispatches request to the given default endpoint with the given Metadata. + /// The provider Router enables re-use of connections to consumers. + pub fn new(endpoint: String, metadata: crate::Metadata, router: crate::Router) -> Self { + Self { + default: broker::process_spec::Id { + zone: String::new(), + suffix: endpoint, + }, + metadata, + router, + } } + /// Build a new Client which uses a different endpoint and metadata but re-uses underlying connections. + pub fn with_endpoint_and_metadata(&self, endpoint: String, metadata: crate::Metadata) -> Self { + Self { + default: broker::process_spec::Id { + zone: String::new(), + suffix: endpoint, + }, + metadata, + router: self.router.clone(), + } + } + + /// Invoke the Gazette shard List RPC. pub async fn list( &self, req: consumer::ListRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); let resp = client .list(req) @@ -32,11 +55,12 @@ impl Client { check_ok(resp.status(), resp) } + /// Invoke the Gazette shard Apply RPC. pub async fn apply( &self, req: consumer::ApplyRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); let resp = client .apply(req) @@ -47,11 +71,12 @@ impl Client { check_ok(resp.status(), resp) } + /// Invoke the Gazette shard Unassign RPC. pub async fn unassign( &self, req: consumer::UnassignRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); let resp = client .unassign(req)