Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add Gossipsub Message wrapper #436

Merged
merged 8 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Event {
{
match self {
Event::CapturedReceipt(captured) => {
let _ = captured.store_and_notify(event_handler);
let _ = captured.publish_and_notify(event_handler);
}
Event::Shutdown(tx) => {
info!("event_handler server shutting down");
Expand Down Expand Up @@ -243,7 +243,7 @@ impl Captured {
}

#[allow(dead_code)]
fn store_and_notify<DB>(
fn publish_and_notify<DB>(
mut self,
event_handler: &mut EventHandler<DB>,
) -> Result<(Cid, InvocationReceipt<Ipld>)>
Expand All @@ -267,7 +267,7 @@ impl Captured {
if event_handler.pubsub_enabled {
match event_handler.swarm.behaviour_mut().gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt.clone()),
TopicMessage::CapturedReceipt(pubsub::Message::new(receipt.clone())),
) {
Ok(msg_id) => {
info!(
Expand Down Expand Up @@ -397,7 +397,7 @@ impl Replay {
.behaviour_mut()
.gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt.clone()),
TopicMessage::CapturedReceipt(pubsub::Message::new(receipt.clone())),
)
.map(|msg_id| {
info!(cid=receipt_cid,
Expand Down Expand Up @@ -543,7 +543,7 @@ where
async fn handle_event(self, event_handler: &mut EventHandler<DB>, ipfs: IpfsCli) {
match self {
Event::CapturedReceipt(captured) => {
if let Ok((cid, receipt)) = captured.store_and_notify(event_handler) {
if let Ok((cid, receipt)) = captured.publish_and_notify(event_handler) {
#[cfg(not(feature = "test-utils"))]
{
// Spawn client call in the background, without awaiting.
Expand Down
70 changes: 38 additions & 32 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ use crate::{
Event, Handler, RequestResponseError,
},
libp2p::multiaddr::MultiaddrExt,
network::swarm::{
CapsuleTag, ComposedEvent, PeerDiscoveryInfo, RequestResponseKey, HOMESTAR_PROTOCOL_VER,
network::{
pubsub,
swarm::{
CapsuleTag, ComposedEvent, PeerDiscoveryInfo, RequestResponseKey, HOMESTAR_PROTOCOL_VER,
},
},
receipt::{RECEIPT_TAG, VERSION_KEY},
workflow,
Expand Down Expand Up @@ -403,38 +406,41 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
message,
propagation_source,
message_id,
} => match Receipt::try_from(message.data) {
// TODO: dont fail blindly if we get a non receipt message
Ok(receipt) => {
info!(
peer_id = propagation_source.to_string(),
message_id = message_id.to_string(),
"message received on receipts topic: {}",
receipt.cid()
);
} => {
let bytes: Vec<u8> = message.data;
match pubsub::Message::<Receipt>::try_from(bytes) {
// TODO: dont fail blindly if we get a non receipt message
Ok(msg) => {
let receipt = msg.payload;
info!(
peer_id = propagation_source.to_string(),
message_id = message_id.to_string(),
"message received on receipts topic: {receipt}"
);

// Store gossiped receipt.
let _ = event_handler
.db
.conn()
.as_mut()
.map(|conn| Db::store_receipt(receipt.clone(), conn));

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::ReceivedReceiptPubsub,
),
btreemap! {
"peerId" => propagation_source.to_string(),
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
},
);
// Store gossiped receipt.
let _ = event_handler
.db
.conn()
.as_mut()
.map(|conn| Db::store_receipt(receipt.clone(), conn));

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::ReceivedReceiptPubsub,
),
btreemap! {
"peerId" => propagation_source.to_string(),
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
},
);
}
Err(err) => info!(err=?err, "cannot handle incoming gossipsub message"),
}
Err(err) => info!(err=?err, "cannot handle incoming gossipsub message"),
},
}
gossipsub::Event::Subscribed { peer_id, topic } => {
debug!(
peer_id = peer_id.to_string(),
Expand Down
7 changes: 5 additions & 2 deletions homestar-runtime/src/network/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
use crate::settings;
use anyhow::Result;
use libp2p::{
gossipsub::{self, ConfigBuilder, Message, MessageAuthenticity, MessageId, ValidationMode},
gossipsub::{self, ConfigBuilder, MessageAuthenticity, MessageId, ValidationMode},
identity::Keypair,
};
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};

pub(crate) mod message;
pub(crate) use message::Message;

/// [Receipt]-related topic for pub(gossip)sub.
///
/// [Receipt]: homestar_core::workflow::receipt
Expand All @@ -23,7 +26,7 @@ pub(crate) const RECEIPTS_TOPIC: &str = "receipts";
/// [gossipsub]: libp2p::gossipsub
pub(crate) fn new(keypair: Keypair, settings: &settings::Node) -> Result<gossipsub::Behaviour> {
// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &Message| {
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
MessageId::from(s.finish().to_string())
Expand Down
153 changes: 153 additions & 0 deletions homestar-runtime/src/network/pubsub/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use anyhow::{anyhow, Result};
use homestar_core::workflow::Nonce;
use libipld::{self, cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Ipld};
use std::collections::BTreeMap;

const HEADER_KEY: &str = "header";
const PAYLOAD_KEY: &str = "payload";
const NONCE_KEY: &str = "nonce";

#[derive(Debug)]
pub(crate) struct Message<T> {
pub(crate) header: Header,
pub(crate) payload: T,
}

impl<T> Message<T> {
pub(crate) fn new(payload: T) -> Self {
let header = Header {
nonce: Nonce::generate(),
};

Self { header, payload }
}
}

impl<T> TryFrom<Message<T>> for Vec<u8>
where
Ipld: From<Message<T>> + From<T>,
{
type Error = anyhow::Error;

fn try_from(message: Message<T>) -> Result<Self, Self::Error> {
let message_ipld = Ipld::from(message);
DagCborCodec.encode(&message_ipld)
}
}

impl<T> TryFrom<Vec<u8>> for Message<T>
where
T: TryFrom<Ipld, Error = anyhow::Error>,
{
type Error = anyhow::Error;

fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
let ipld: Ipld = DagCborCodec.decode(&bytes)?;
ipld.try_into()
.map_err(|_| anyhow!("Could not convert IPLD to pubsub message."))
}
}

impl<T> From<Message<T>> for Ipld
where
Ipld: From<T>,
{
fn from(message: Message<T>) -> Self {
Ipld::Map(BTreeMap::from([
(HEADER_KEY.into(), message.header.into()),
(PAYLOAD_KEY.into(), message.payload.into()),
]))
}
}

impl<T> TryFrom<Ipld> for Message<T>
where
T: TryFrom<Ipld, Error = anyhow::Error>,
{
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;

let header = map
.get(HEADER_KEY)
.ok_or_else(|| anyhow!("missing {HEADER_KEY}"))?
.to_owned()
.try_into()?;

let payload = map
.get(PAYLOAD_KEY)
.ok_or_else(|| anyhow!("missing {PAYLOAD_KEY}"))?
.to_owned()
.try_into()?;

Ok(Message { header, payload })
}
}

#[derive(Clone, Debug)]
pub(crate) struct Header {
nonce: Nonce,
}

impl From<Header> for Ipld {
fn from(header: Header) -> Self {
Ipld::Map(BTreeMap::from([(
NONCE_KEY.into(),
header.nonce.to_owned().into(),
)]))
}
}

impl TryFrom<Ipld> for Header {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;

let nonce = map
.get(NONCE_KEY)
.ok_or_else(|| anyhow!("Missing {NONCE_KEY}"))?
.try_into()?;

Ok(Header { nonce })
}
}

impl TryFrom<Header> for Vec<u8> {
type Error = anyhow::Error;

fn try_from(header: Header) -> Result<Self, Self::Error> {
let header_ipld = Ipld::from(header);
DagCborCodec.encode(&header_ipld)
}
}

impl TryFrom<Vec<u8>> for Header {
type Error = anyhow::Error;

fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
let ipld: Ipld = DagCborCodec.decode(&bytes)?;
ipld.try_into()
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{test_utils, Receipt};

#[test]
fn pubsub_message_rountrip() {
let (_, receipt) = test_utils::receipt::receipts();
let message = Message::new(receipt.clone());
let bytes: Vec<u8> = message
.try_into()
.expect("Could not serialize message into bytes");

let parsed =
Message::<Receipt>::try_from(bytes).expect("Could not deserialize message from bytes");

assert_eq!(receipt, parsed.payload);
}
}
7 changes: 4 additions & 3 deletions homestar-runtime/src/network/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub(crate) enum ComposedEvent {
#[derive(Debug)]
pub(crate) enum TopicMessage {
/// Receipt topic, wrapping [Receipt].
CapturedReceipt(Receipt),
CapturedReceipt(pubsub::Message<Receipt>),
}

/// Custom behaviours for [Swarm].
Expand Down Expand Up @@ -316,8 +316,9 @@ impl ComposedBehaviour {
if let Some(gossipsub) = self.gossipsub.as_mut() {
let id_topic = gossipsub::IdentTopic::new(topic);
// Make this a match once we have other topics.
let TopicMessage::CapturedReceipt(receipt) = msg;
let msg_bytes: Vec<u8> = receipt.try_into()?;
let TopicMessage::CapturedReceipt(message) = msg;
let msg_bytes: Vec<u8> = message.try_into()?;

if gossipsub
.mesh_peers(&TopicHash::from_raw(topic))
.peekable()
Expand Down