Skip to content

Commit

Permalink
refactor: Add Gossipsub Message wrapper (#436)
Browse files Browse the repository at this point in the history
# Description

This PR implements the following changes:

- [x] Add gosssipsub message wrapper
- [x] Rename `store_and_notify` to `publish_and_notify`

The message wrapper includes a header with a nonce to force the gossip
of duplicate receipts. We will likely expand on the header in future
work and make the nonce optional.

## Link to issue

Implements #421.

## Type of change

- [x] Refactor (non-breaking change that updates existing functionality)

## Test plan (required)

We've added a unit test to roundtrip a gossiped message to bytes and
back again. We also have a gossip notifications integration test to
confirm messages are still sent.

---------

Co-authored-by: Zeeshan Lakhani <[email protected]>
  • Loading branch information
bgins and zeeshanlakhani committed Nov 29, 2023
1 parent 0cd381d commit 59383e5
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 42 deletions.
10 changes: 5 additions & 5 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Event {
{
match self {
Event::CapturedReceipt(captured) => {
let _ = captured.store_and_notify(event_handler);
let _ = captured.publish_and_notify(event_handler);
}
Event::Shutdown(tx) => {
info!("event_handler server shutting down");
Expand Down Expand Up @@ -243,7 +243,7 @@ impl Captured {
}

#[allow(dead_code)]
fn store_and_notify<DB>(
fn publish_and_notify<DB>(
mut self,
event_handler: &mut EventHandler<DB>,
) -> Result<(Cid, InvocationReceipt<Ipld>)>
Expand All @@ -267,7 +267,7 @@ impl Captured {
if event_handler.pubsub_enabled {
match event_handler.swarm.behaviour_mut().gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt.clone()),
TopicMessage::CapturedReceipt(pubsub::Message::new(receipt.clone())),
) {
Ok(msg_id) => {
info!(
Expand Down Expand Up @@ -397,7 +397,7 @@ impl Replay {
.behaviour_mut()
.gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt.clone()),
TopicMessage::CapturedReceipt(pubsub::Message::new(receipt.clone())),
)
.map(|msg_id| {
info!(cid=receipt_cid,
Expand Down Expand Up @@ -543,7 +543,7 @@ where
async fn handle_event(self, event_handler: &mut EventHandler<DB>, ipfs: IpfsCli) {
match self {
Event::CapturedReceipt(captured) => {
if let Ok((cid, receipt)) = captured.store_and_notify(event_handler) {
if let Ok((cid, receipt)) = captured.publish_and_notify(event_handler) {
#[cfg(not(feature = "test-utils"))]
{
// Spawn client call in the background, without awaiting.
Expand Down
70 changes: 38 additions & 32 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ use crate::{
Event, Handler, RequestResponseError,
},
libp2p::multiaddr::MultiaddrExt,
network::swarm::{
CapsuleTag, ComposedEvent, PeerDiscoveryInfo, RequestResponseKey, HOMESTAR_PROTOCOL_VER,
network::{
pubsub,
swarm::{
CapsuleTag, ComposedEvent, PeerDiscoveryInfo, RequestResponseKey, HOMESTAR_PROTOCOL_VER,
},
},
receipt::{RECEIPT_TAG, VERSION_KEY},
workflow,
Expand Down Expand Up @@ -403,38 +406,41 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
message,
propagation_source,
message_id,
} => match Receipt::try_from(message.data) {
// TODO: dont fail blindly if we get a non receipt message
Ok(receipt) => {
info!(
peer_id = propagation_source.to_string(),
message_id = message_id.to_string(),
"message received on receipts topic: {}",
receipt.cid()
);
} => {
let bytes: Vec<u8> = message.data;
match pubsub::Message::<Receipt>::try_from(bytes) {
// TODO: dont fail blindly if we get a non receipt message
Ok(msg) => {
let receipt = msg.payload;
info!(
peer_id = propagation_source.to_string(),
message_id = message_id.to_string(),
"message received on receipts topic: {receipt}"
);

// Store gossiped receipt.
let _ = event_handler
.db
.conn()
.as_mut()
.map(|conn| Db::store_receipt(receipt.clone(), conn));

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::ReceivedReceiptPubsub,
),
btreemap! {
"peerId" => propagation_source.to_string(),
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
},
);
// Store gossiped receipt.
let _ = event_handler
.db
.conn()
.as_mut()
.map(|conn| Db::store_receipt(receipt.clone(), conn));

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::ReceivedReceiptPubsub,
),
btreemap! {
"peerId" => propagation_source.to_string(),
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
},
);
}
Err(err) => info!(err=?err, "cannot handle incoming gossipsub message"),
}
Err(err) => info!(err=?err, "cannot handle incoming gossipsub message"),
},
}
gossipsub::Event::Subscribed { peer_id, topic } => {
debug!(
peer_id = peer_id.to_string(),
Expand Down
7 changes: 5 additions & 2 deletions homestar-runtime/src/network/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
use crate::settings;
use anyhow::Result;
use libp2p::{
gossipsub::{self, ConfigBuilder, Message, MessageAuthenticity, MessageId, ValidationMode},
gossipsub::{self, ConfigBuilder, MessageAuthenticity, MessageId, ValidationMode},
identity::Keypair,
};
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};

pub(crate) mod message;
pub(crate) use message::Message;

/// [Receipt]-related topic for pub(gossip)sub.
///
/// [Receipt]: homestar_core::workflow::receipt
Expand All @@ -23,7 +26,7 @@ pub(crate) const RECEIPTS_TOPIC: &str = "receipts";
/// [gossipsub]: libp2p::gossipsub
pub(crate) fn new(keypair: Keypair, settings: &settings::Node) -> Result<gossipsub::Behaviour> {
// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &Message| {
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
MessageId::from(s.finish().to_string())
Expand Down
153 changes: 153 additions & 0 deletions homestar-runtime/src/network/pubsub/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use anyhow::{anyhow, Result};
use homestar_core::workflow::Nonce;
use libipld::{self, cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Ipld};
use std::collections::BTreeMap;

const HEADER_KEY: &str = "header";
const PAYLOAD_KEY: &str = "payload";
const NONCE_KEY: &str = "nonce";

#[derive(Debug)]
pub(crate) struct Message<T> {
pub(crate) header: Header,
pub(crate) payload: T,
}

impl<T> Message<T> {
pub(crate) fn new(payload: T) -> Self {
let header = Header {
nonce: Nonce::generate(),
};

Self { header, payload }
}
}

impl<T> TryFrom<Message<T>> for Vec<u8>
where
Ipld: From<Message<T>> + From<T>,
{
type Error = anyhow::Error;

fn try_from(message: Message<T>) -> Result<Self, Self::Error> {
let message_ipld = Ipld::from(message);
DagCborCodec.encode(&message_ipld)
}
}

impl<T> TryFrom<Vec<u8>> for Message<T>
where
T: TryFrom<Ipld, Error = anyhow::Error>,
{
type Error = anyhow::Error;

fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
let ipld: Ipld = DagCborCodec.decode(&bytes)?;
ipld.try_into()
.map_err(|_| anyhow!("Could not convert IPLD to pubsub message."))
}
}

impl<T> From<Message<T>> for Ipld
where
Ipld: From<T>,
{
fn from(message: Message<T>) -> Self {
Ipld::Map(BTreeMap::from([
(HEADER_KEY.into(), message.header.into()),
(PAYLOAD_KEY.into(), message.payload.into()),
]))
}
}

impl<T> TryFrom<Ipld> for Message<T>
where
T: TryFrom<Ipld, Error = anyhow::Error>,
{
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;

let header = map
.get(HEADER_KEY)
.ok_or_else(|| anyhow!("missing {HEADER_KEY}"))?
.to_owned()
.try_into()?;

let payload = map
.get(PAYLOAD_KEY)
.ok_or_else(|| anyhow!("missing {PAYLOAD_KEY}"))?
.to_owned()
.try_into()?;

Ok(Message { header, payload })
}
}

#[derive(Clone, Debug)]
pub(crate) struct Header {
nonce: Nonce,
}

impl From<Header> for Ipld {
fn from(header: Header) -> Self {
Ipld::Map(BTreeMap::from([(
NONCE_KEY.into(),
header.nonce.to_owned().into(),
)]))
}
}

impl TryFrom<Ipld> for Header {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;

let nonce = map
.get(NONCE_KEY)
.ok_or_else(|| anyhow!("Missing {NONCE_KEY}"))?
.try_into()?;

Ok(Header { nonce })
}
}

impl TryFrom<Header> for Vec<u8> {
type Error = anyhow::Error;

fn try_from(header: Header) -> Result<Self, Self::Error> {
let header_ipld = Ipld::from(header);
DagCborCodec.encode(&header_ipld)
}
}

impl TryFrom<Vec<u8>> for Header {
type Error = anyhow::Error;

fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
let ipld: Ipld = DagCborCodec.decode(&bytes)?;
ipld.try_into()
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{test_utils, Receipt};

#[test]
fn pubsub_message_rountrip() {
let (_, receipt) = test_utils::receipt::receipts();
let message = Message::new(receipt.clone());
let bytes: Vec<u8> = message
.try_into()
.expect("Could not serialize message into bytes");

let parsed =
Message::<Receipt>::try_from(bytes).expect("Could not deserialize message from bytes");

assert_eq!(receipt, parsed.payload);
}
}
7 changes: 4 additions & 3 deletions homestar-runtime/src/network/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub(crate) enum ComposedEvent {
#[derive(Debug)]
pub(crate) enum TopicMessage {
/// Receipt topic, wrapping [Receipt].
CapturedReceipt(Receipt),
CapturedReceipt(pubsub::Message<Receipt>),
}

/// Custom behaviours for [Swarm].
Expand Down Expand Up @@ -316,8 +316,9 @@ impl ComposedBehaviour {
if let Some(gossipsub) = self.gossipsub.as_mut() {
let id_topic = gossipsub::IdentTopic::new(topic);
// Make this a match once we have other topics.
let TopicMessage::CapturedReceipt(receipt) = msg;
let msg_bytes: Vec<u8> = receipt.try_into()?;
let TopicMessage::CapturedReceipt(message) = msg;
let msg_bytes: Vec<u8> = message.try_into()?;

if gossipsub
.mesh_peers(&TopicHash::from_raw(topic))
.peekable()
Expand Down

0 comments on commit 59383e5

Please sign in to comment.