Skip to content

Commit

Permalink
gazette: refactor Router to make clients cheap to clone
Browse files Browse the repository at this point in the history
Don't configure Router with a default service address.

Instead, journal and shard Client instances are configured with a
default service address and metadata which is _used_ by Router when
picking a route.

This makes it possible to cheaply clone Client instances and give each a
different service address and authorization header, while still using
the same underlying pool of gRPC connections.
  • Loading branch information
jgraettinger committed Sep 20, 2024
1 parent 244345a commit 030133f
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 43 deletions.
10 changes: 5 additions & 5 deletions crates/agent/src/controlplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ impl PGControlPlane {
.context("failed to sign claims for data-plane")?;

// Create the journal and shard clients that are used for interacting with the data plane
let journal_router = gazette::Router::new(&data_plane.broker_address, "local")?;
let router = gazette::Router::new("local");
let journal_client = gazette::journal::Client::new(
reqwest::Client::default(),
journal_router,
data_plane.broker_address,
metadata.clone(),
router.clone(),
);
let shard_router = gazette::Router::new(&data_plane.reactor_address, "local")?;
let shard_client = gazette::shard::Client::new(shard_router, metadata);
let shard_client =
gazette::shard::Client::new(data_plane.reactor_address, metadata, router);

Ok((
shard_client,
Expand Down
7 changes: 3 additions & 4 deletions crates/agent/src/publications/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,10 @@ pub async fn test_catalog(

// Activate all derivations.
let metadata = gazette::Metadata::default();
let journal_router = gazette::Router::new(&broker_sock, "local")?;
let router = gazette::Router::new("local");
let journal_client =
gazette::journal::Client::new(Default::default(), journal_router, metadata.clone());
let shard_router = gazette::Router::new(&consumer_sock, "local")?;
let shard_client = gazette::shard::Client::new(shard_router, metadata);
gazette::journal::Client::new(broker_sock.clone(), metadata.clone(), router.clone());
let shard_client = gazette::shard::Client::new(consumer_sock.clone(), metadata, router);

for built in catalog
.built
Expand Down
10 changes: 5 additions & 5 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl Collection {
gateway_url: String,
}

let auth: [Auth; 1] = client
let [auth]: [Auth; 1] = client
.rpc("gateway_auth_token", body)
.build()
.send()
Expand All @@ -260,15 +260,15 @@ impl Collection {

tracing::debug!(
collection,
gateway = auth[0].gateway_url,
gateway = auth.gateway_url,
"fetched data-plane token"
);

let mut metadata = gazette::Metadata::default();
metadata.bearer_token(&auth[0].token)?;
metadata.bearer_token(&auth.token)?;

let router = gazette::Router::new(&auth[0].gateway_url, "dekaf")?;
let client = journal::Client::new(Default::default(), router, metadata);
let router = gazette::Router::new("dekaf");
let client = journal::Client::new(auth.gateway_url, metadata, router);

Ok(client)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/gazette/src/journal/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Client {
router: &crate::Router,
req: &broker::ListRequest,
) -> crate::Result<tonic::Streaming<broker::ListResponse>> {
let mut client = self.into_sub(router.route(None, false).await?);
let mut client = self.into_sub(router.route(None, false, &self.default).await?);
Ok(client.list(req.clone()).await?.into_inner())
}
}
Expand Down
30 changes: 26 additions & 4 deletions crates/gazette/src/journal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,43 @@ type SubClient = proto_grpc::broker::journal_client::JournalClient<

#[derive(Clone)]
pub struct Client {
default: broker::process_spec::Id,
http: reqwest::Client,
metadata: crate::Metadata,
router: crate::Router,
}

impl Client {
pub fn new(http: reqwest::Client, router: crate::Router, metadata: crate::Metadata) -> Self {
/// Build a Client which dispatches request to the given default endpoint with the given Metadata.
/// The provider Router enables re-use of connections to brokers.
pub fn new(endpoint: String, metadata: crate::Metadata, router: crate::Router) -> Self {
Self {
default: broker::process_spec::Id {
zone: String::new(),
suffix: endpoint,
},
metadata,
http,
http: reqwest::Client::default(),
router,
}
}

/// Build a new Client which uses a different endpoint and metadata but re-uses underlying connections.
pub fn with_endpoint_and_metadata(&self, endpoint: String, metadata: crate::Metadata) -> Self {
Self {
default: broker::process_spec::Id {
zone: String::new(),
suffix: endpoint,
},
http: self.http.clone(),
metadata,
router: self.router.clone(),
}
}

/// Invoke the Gazette journal Apply API.
pub async fn apply(&self, req: broker::ApplyRequest) -> crate::Result<broker::ApplyResponse> {
let mut client = self.into_sub(self.router.route(None, false).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);

let resp = client
.apply(req)
Expand All @@ -40,11 +61,12 @@ impl Client {
check_ok(resp.status(), resp)
}

/// Invoke the Gazette journal ListFragments API.
pub async fn list_fragments(
&self,
req: broker::FragmentsRequest,
) -> crate::Result<broker::FragmentsResponse> {
let mut client = self.into_sub(self.router.route(None, false).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);

let resp = client
.list_fragments(req)
Expand Down
5 changes: 4 additions & 1 deletion crates/gazette/src/journal/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use futures::TryStreamExt;
use proto_gazette::broker;

impl Client {
/// Invoke the Gazette journal Read API.
/// This routine directly fetches journal fragments from cloud storage where possible,
/// rather than reading through the broker.
pub fn read(
self,
mut req: broker::ReadRequest,
Expand Down Expand Up @@ -46,7 +49,7 @@ impl Client {
write_head: &mut i64,
) -> crate::Result<()> {
let route = req.header.as_ref().and_then(|hdr| hdr.route.as_ref());
let mut client = self.into_sub(self.router.route(route, false).await?);
let mut client = self.into_sub(self.router.route(route, false, &self.default).await?);

// Fetch metadata first before we start the actual read.
req.metadata_only = true;
Expand Down
33 changes: 16 additions & 17 deletions crates/gazette/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,21 @@ pub struct Router {
}
struct Inner {
states: std::sync::Mutex<HashMap<MemberId, DialState>>,
default_endpoint: String,
zone: String,
}

impl Router {
/// Create a new Router with the given default service endpoint,
/// which prefers to route to members in `zone` where possible.
pub fn new(default_endpoint: &str, zone: &str) -> Result<Self, Error> {
let (default_endpoint, zone) = (default_endpoint.to_string(), zone.to_string());
pub fn new(zone: &str) -> Self {
let zone = zone.to_string();

let _endpoint = tonic::transport::Endpoint::from_shared(default_endpoint.clone())
.map_err(|_err| Error::InvalidEndpoint(default_endpoint.clone()))?;

Ok(Self {
Self {
inner: Arc::new(Inner {
states: Default::default(),
default_endpoint,
zone,
}),
})
}
}

/// Map an optional broker::Route and indication of whether the "primary"
Expand All @@ -52,10 +47,11 @@ impl Router {
&self,
route: Option<&broker::Route>,
primary: bool,
default: &MemberId,
) -> Result<Channel, Error> {
let (index, state) = self.pick(route, primary);
let (index, state) = self.pick(route, primary, &default);

// Acquire `id`-specific, async-aware lock.
// Acquire MemberId-specific, async-aware lock.
let mut state = state.lock().await;

// Fast path: client is dialed and ready.
Expand All @@ -67,7 +63,7 @@ impl Router {
// Slow path: start dialing the endpoint.
let channel = super::dial_channel(match index {
Some(index) => &route.unwrap().endpoints[index],
None => &self.inner.default_endpoint,
None => &default.suffix,
})
.await?;

Expand All @@ -76,16 +72,19 @@ impl Router {
Ok(channel)
}

fn pick(&self, route: Option<&broker::Route>, primary: bool) -> (Option<usize>, DialState) {
fn pick(
&self,
route: Option<&broker::Route>,
primary: bool,
default: &MemberId,
) -> (Option<usize>, DialState) {
// Acquire non-async lock which *cannot* be held across an await point.
let mut states = self.inner.states.lock().unwrap();
let index = pick(route, primary, &self.inner.zone, &states);

let default_id = MemberId::default();

let id = match index {
Some(index) => &route.unwrap().members[index],
None => &default_id,
None => default,
};

let state = match states.get(id) {
Expand All @@ -96,7 +95,7 @@ impl Router {
(index, state)
}

// Identify Channels which have not been used since the preceeding sweep, and close them.
// Identify Channels which have not been used since the preceding sweep, and close them.
// As members come and go, Channels may no longer needed.
// Call sweep() periodically to clear them out.
pub fn sweep(&self) {
Expand Down
37 changes: 31 additions & 6 deletions crates/gazette/src/shard/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use proto_gazette::consumer;
use proto_gazette::{broker, consumer};
use tonic::transport::Channel;

// SubClient is the routed sub-client of Client.
Expand All @@ -8,20 +8,43 @@ type SubClient = proto_grpc::consumer::shard_client::ShardClient<

#[derive(Clone)]
pub struct Client {
default: broker::process_spec::Id,
metadata: crate::Metadata,
router: crate::Router,
}

impl Client {
pub fn new(router: crate::Router, metadata: crate::Metadata) -> Self {
Self { metadata, router }
/// Build a Client which dispatches request to the given default endpoint with the given Metadata.
/// The provider Router enables re-use of connections to consumers.
pub fn new(endpoint: String, metadata: crate::Metadata, router: crate::Router) -> Self {
Self {
default: broker::process_spec::Id {
zone: String::new(),
suffix: endpoint,
},
metadata,
router,
}
}

/// Build a new Client which uses a different endpoint and metadata but re-uses underlying connections.
pub fn with_endpoint_and_metadata(&self, endpoint: String, metadata: crate::Metadata) -> Self {
Self {
default: broker::process_spec::Id {
zone: String::new(),
suffix: endpoint,
},
metadata,
router: self.router.clone(),
}
}

/// Invoke the Gazette shard List RPC.
pub async fn list(
&self,
req: consumer::ListRequest,
) -> Result<consumer::ListResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);

let resp = client
.list(req)
Expand All @@ -32,11 +55,12 @@ impl Client {
check_ok(resp.status(), resp)
}

/// Invoke the Gazette shard Apply RPC.
pub async fn apply(
&self,
req: consumer::ApplyRequest,
) -> Result<consumer::ApplyResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);

let resp = client
.apply(req)
Expand All @@ -47,11 +71,12 @@ impl Client {
check_ok(resp.status(), resp)
}

/// Invoke the Gazette shard Unassign RPC.
pub async fn unassign(
&self,
req: consumer::UnassignRequest,
) -> Result<consumer::UnassignResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);

let resp = client
.unassign(req)
Expand Down

0 comments on commit 030133f

Please sign in to comment.