Skip to content

Commit

Permalink
Immediately acknowledge GRPC subscriptions (#2712)
Browse files Browse the repository at this point in the history
  • Loading branch information
ma2bd authored Oct 25, 2024
1 parent 8f2ad90 commit 14aada1
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 14 deletions.
22 changes: 18 additions & 4 deletions linera-core/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,27 @@ impl<N> Default for ChannelNotifier<N> {
}

impl<N> ChannelNotifier<N> {
/// Creates a subscription given a collection of ChainIds and a sender to the client.
pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<N> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
fn add_sender(&self, chain_ids: Vec<ChainId>, sender: &UnboundedSender<N>) {
for id in chain_ids {
let mut senders = self.inner.entry(id).or_default();
senders.push(tx.clone());
senders.push(sender.clone());
}
}

/// Creates a subscription given a collection of ChainIds and a sender to the client.
pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<N> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.add_sender(chain_ids, &tx);
rx
}

/// Creates a subscription given a collection of ChainIds and a sender to the client.
/// Immediately posts a first notification as a ACK.
pub fn subscribe_with_ack(&self, chain_ids: Vec<ChainId>, ack: N) -> UnboundedReceiver<N> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.add_sender(chain_ids, &tx);
tx.send(ack)
.expect("pushing to a new channel should succeed");
rx
}
}
Expand Down
15 changes: 12 additions & 3 deletions linera-rpc/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use linera_core::{
};
use linera_version::VersionInfo;
use tonic::{Code, IntoRequest, Request, Status};
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, info, instrument, warn};
#[cfg(not(web))]
use {
super::GrpcProtoConversionError,
Expand Down Expand Up @@ -259,7 +259,7 @@ impl ValidatorNode for GrpcClient {
// terminates after unexpected or fatal errors.
let notification_stream = endlessly_retrying_notification_stream
.map(|result| {
Notification::try_from(result?).map_err(|err| {
Option::<Notification>::try_from(result?).map_err(|err| {
let message = format!("Could not deserialize notification: {}", err);
tonic::Status::new(Code::Internal, message)
})
Expand All @@ -279,7 +279,16 @@ impl ValidatorNode for GrpcClient {
true
})
})
.filter_map(|result| future::ready(result.ok()));
.filter_map(|result| {
future::ready(match result {
Ok(notification @ Some(_)) => notification,
Ok(None) => None,
Err(err) => {
warn!("{}", err);
None
}
})
});

Ok(Box::pin(notification_stream))
}
Expand Down
23 changes: 17 additions & 6 deletions linera-rpc/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,18 @@ impl TryFrom<Notification> for api::Notification {
}
}

impl TryFrom<api::Notification> for Notification {
impl TryFrom<api::Notification> for Option<Notification> {
type Error = GrpcProtoConversionError;

fn try_from(notification: api::Notification) -> Result<Self, Self::Error> {
Ok(Self {
chain_id: try_proto_convert(notification.chain_id)?,
reason: bincode::deserialize(&notification.reason)?,
})
if notification.chain_id.is_none() && notification.reason.is_empty() {
Ok(None)
} else {
Ok(Some(Notification {
chain_id: try_proto_convert(notification.chain_id)?,
reason: bincode::deserialize(&notification.reason)?,
}))
}
}
}

Expand Down Expand Up @@ -903,6 +907,13 @@ pub mod tests {
hash: CryptoHash::new(&Foo("".into())),
},
};
round_trip_check::<_, api::Notification>(notification);
let message = api::Notification::try_from(notification.clone()).unwrap();
assert_eq!(
Some(notification),
Option::<Notification>::try_from(message).unwrap()
);

let ack = api::Notification::default();
assert_eq!(None, Option::<Notification>::try_from(ack).unwrap());
}
}
7 changes: 6 additions & 1 deletion linera-service/src/proxy/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,12 @@ where
.into_iter()
.map(ChainId::try_from)
.collect::<Result<Vec<ChainId>, _>>()?;
let rx = self.0.notifier.subscribe(chain_ids);
// The empty notification seems to be needed in some cases to force
// completion of HTTP2 headers.
let rx = self
.0
.notifier
.subscribe_with_ack(chain_ids, Ok(Notification::default()));
Ok(Response::new(UnboundedReceiverStream::new(rx)))
}

Expand Down

0 comments on commit 14aada1

Please sign in to comment.