Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Filecoin.EthSubscribe and Filecoin.EthUnsubscribe #4591

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub struct MessagePool<T> {
/// Acts as a signal to republish messages from the republished set of
/// messages
pub repub_trigger: flume::Sender<()>,
local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
pub local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
/// Configurable parameters of the message pool
pub config: MpoolConfig,
/// Chain configuration
Expand Down
5 changes: 4 additions & 1 deletion src/rpc/auth_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::auth::{verify_token, JWT_IDENTIFIER};
use crate::key_management::KeyStore;
use crate::rpc::{chain, Permission, RpcMethod as _, CANCEL_METHOD_NAME};
use crate::rpc::{chain, eth, Permission, RpcMethod as _, CANCEL_METHOD_NAME};
use ahash::{HashMap, HashMapExt as _};
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -31,6 +31,9 @@ static METHOD_NAME2REQUIRED_PERMISSION: Lazy<HashMap<&str, Permission>> = Lazy::
super::for_each_method!(insert);

access.insert(chain::CHAIN_NOTIFY, Permission::Read);
access.insert(eth::ETH_SUBSCRIBE, Permission::Read);
access.insert(eth::ETH_SUBSCRIPTION, Permission::Read);
access.insert(eth::ETH_UNSUBSCRIBE, Permission::Read);
access.insert(CANCEL_METHOD_NAME, Permission::Read);

access
Expand Down
49 changes: 49 additions & 0 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::ipld::DfsIter;
use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
use crate::lotus_json::{lotus_json_with_self, HasLotusJson, LotusJson};
use crate::message::{ChainMessage, SignedMessage};
use crate::message_pool::Provider;
use crate::rpc::types::ApiTipsetKey;
use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError};
use crate::shim::clock::ChainEpoch;
Expand Down Expand Up @@ -669,6 +670,51 @@ pub(crate) fn chain_notify<DB: Blockstore>(
receiver
}

pub(crate) fn new_heads<DB: Blockstore>(data: &crate::rpc::RPCState<DB>) -> Subscriber<ApiHeaders> {
let (sender, receiver) = broadcast::channel(100);

let mut subscriber = data.chain_store().publisher().subscribe();

tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let headers = match v {
HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()),
};
if sender.send(headers).is_err() {
break;
}
}
});

receiver
}

pub(crate) fn pending_txn<DB: Blockstore + Send + Sync + 'static>(
data: Arc<crate::rpc::RPCState<DB>>,
) -> Subscriber<Vec<SignedMessage>> {
let (sender, receiver) = broadcast::channel(100);

let mut subscriber = data.mpool.api.subscribe_head_changes();

tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let messages = match v {
HeadChange::Apply(_ts) => {
let local_msgs = data.mpool.local_msgs.write();
let pending = local_msgs.iter().cloned().collect::<Vec<SignedMessage>>();
pending
}
};

if sender.send(messages).is_err() {
break;
}
}
});

receiver
}

fn load_api_messages_from_tipset(
store: &impl Blockstore,
tipset: &Tipset,
Expand Down Expand Up @@ -773,6 +819,9 @@ pub struct ApiHeadChange {
}
lotus_json_with_self!(ApiHeadChange);

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
pub struct ApiHeaders(#[serde(with = "crate::lotus_json")] pub Vec<CachingBlockHeader>);

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "Type", content = "Val", rename_all = "snake_case")]
pub enum PathChange<T = Arc<Tipset>> {
Expand Down
4 changes: 4 additions & 0 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::{ops::Add, sync::Arc};

pub const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe";
pub const ETH_SUBSCRIPTION: &str = "Filecoin.EthSubscription";
pub const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe";

const MASKED_ID_PREFIX: [u8; 12] = [0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];

/// Ethereum Bloom filter size in bits.
Expand Down
122 changes: 122 additions & 0 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod request;
pub use client::Client;
pub use error::ServerError;
use futures::FutureExt as _;
use methods::chain::{new_heads, pending_txn};
use reflect::Ctx;
pub use reflect::{ApiPath, ApiPaths, RpcMethod, RpcMethodExt};
pub use request::Request;
Expand All @@ -17,6 +18,7 @@ mod reflect;
pub mod types;
pub use methods::*;
use reflect::Permission;
use tokio::sync::broadcast::Receiver;

/// Protocol or transport-specific error
pub use jsonrpsee::core::ClientError;
Expand Down Expand Up @@ -300,16 +302,21 @@ use crate::rpc::channel::RpcModule as FilRpcModule;
pub use crate::rpc::channel::CANCEL_METHOD_NAME;

use crate::blocks::Tipset;
use ethereum_types::H256;
use fvm_ipld_blockstore::Blockstore;
use jsonrpsee::{
core::traits::IdProvider,
server::{stop_channel, RpcModule, RpcServiceBuilder, Server, StopHandle, TowerServiceBuilder},
types::SubscriptionId,
Methods,
};
use once_cell::sync::Lazy;
use rand::Rng;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{mpsc, RwLock};
use tower::Service;

Expand Down Expand Up @@ -377,6 +384,64 @@ struct PerConnection<RpcMiddleware, HttpMiddleware> {
keystore: Arc<RwLock<KeyStore>>,
}

#[derive(Debug, Copy, Clone)]
pub struct RandomHexStringIdProvider {}

impl RandomHexStringIdProvider {
pub fn new() -> Self {
Self {}
}
}

impl IdProvider for RandomHexStringIdProvider {
fn next_id(&self) -> SubscriptionId<'static> {
let mut bytes = [0u8; 32];
let mut rng = rand::thread_rng();
rng.fill(&mut bytes);

SubscriptionId::Str(format!("{:#x}", H256::from(bytes)).into())
}
}

enum ReceiverType {
Heads(Receiver<chain::ApiHeaders>),
Txn(Receiver<Vec<crate::message::SignedMessage>>),
}

async fn handle_subscription<T>(mut rx: Receiver<T>, sink: jsonrpsee::SubscriptionSink)
where
T: serde::Serialize + Clone,
{
loop {
let action = rx.recv().await;

tokio::select! {
action = async { action } => {
match action {
Ok(v) => {
match jsonrpsee::SubscriptionMessage::from_json(&v) {
Ok(msg) => {
// This fails only if the connection is closed
if sink.send(msg).await.is_err() {
break;
}
}
Err(e) => {
tracing::error!("Failed to serialize message: {:?}", e);
break;
}
}
}
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => {},
}
}
_ = sink.closed() => break,
}
}
tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id());
}

pub async fn start_rpc<DB>(state: RPCState<DB>, rpc_endpoint: SocketAddr) -> anyhow::Result<()>
where
DB: Blockstore + Send + Sync + 'static,
Expand All @@ -394,6 +459,61 @@ where
})?;
module.merge(pubsub_module)?;

module
.register_subscription(
eth::ETH_SUBSCRIBE,
eth::ETH_SUBSCRIPTION,
eth::ETH_UNSUBSCRIBE,
|params, pending, ctx, _| async move {
let event_types = match params.parse::<Vec<String>>() {
Ok(v) => v,
Err(e) => {
pending
.reject(jsonrpsee::types::ErrorObjectOwned::from(e))
.await;
// If the subscription has not been "accepted" then
// the return value will be "ignored" as it's not
// allowed to send out any further notifications on
// on the subscription.
return Ok(());
}
};
// `event_types` is one OR more of:
// - "newHeads": notify when new blocks arrive
// - "newPendingTransactions": notify when new messages arrive in the message pool
// - "logs": notify new event logs that match a criteria

tracing::trace!("Subscribing to events: {:?}", event_types);

let receiver = event_types
.iter()
.find_map(|event| match event.as_str() {
"newHeads" => Some(ReceiverType::Heads(new_heads(&ctx))),
"newPendingTransactions" => {
Some(ReceiverType::Txn(pending_txn(ctx.clone())))
}
_ => None,
})
.expect("No valid event type found");

tokio::spawn(async move {
// Mark the subscription is accepted after the params has been parsed successful.
// This is actually responds the underlying RPC method call and may fail if the
// connection is closed.
let sink = pending.accept().await.unwrap();
tracing::trace!("Subscription started (id: {:?})", sink.subscription_id());

match receiver {
ReceiverType::Heads(rx) => handle_subscription(rx, sink).await,
ReceiverType::Txn(rx) => handle_subscription(rx, sink).await,
}
});

Ok(())
},
)
.unwrap();

let (stop_handle, _server_handle) = stop_channel();

let per_conn = PerConnection {
Expand All @@ -403,6 +523,7 @@ where
// Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
.max_response_body_size(MAX_RESPONSE_BODY_SIZE)
.set_id_provider(RandomHexStringIdProvider::new())
.to_service_builder(),
keystore,
};
Expand Down Expand Up @@ -504,6 +625,7 @@ where
};
}
for_each_method!(register);

module
}

Expand Down
Loading