Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactoring fetcher #32

Draft
wants to merge 4 commits into
base: p/b3work
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6949,7 +6949,8 @@ dependencies = [
"lightning-interfaces",
"lightning-metrics",
"lightning-notifier",
"lightning-origin-demuxer",
"lightning-origin-b3fs",
"lightning-origin-http",
"lightning-origin-ipfs",
"lightning-pool",
"lightning-rep-collector",
Expand Down Expand Up @@ -6984,7 +6985,6 @@ dependencies = [
"lightning-interfaces",
"lightning-keystore",
"lightning-notifier",
"lightning-origin-demuxer",
"lightning-pinger",
"lightning-pool",
"lightning-rep-collector",
Expand Down Expand Up @@ -7265,27 +7265,37 @@ dependencies = [
]

[[package]]
name = "lightning-origin-demuxer"
name = "lightning-origin-b3fs"
version = "0.0.0"
dependencies = [
"affair",
"anyhow",
"b3fs",
"bytes",
"cid",
"derive_more",
"fleek-crypto",
"fleek-ipld",
"futures",
"humantime-serde",
"hyper 0.14.30",
"hyper-rustls 0.24.2",
"lightning-application",
"lightning-blockstore",
"lightning-indexer",
"lightning-interfaces",
"lightning-origin-http",
"lightning-origin-ipfs",
"lightning-signer",
"lightning-test-utils",
"lightning-workspace-hack",
"multihash 0.18.1",
"rustls 0.21.12",
"serde",
"tempfile",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util 0.7.11",
"tracing",
"unsigned-varint 0.8.0",
]

[[package]]
Expand Down Expand Up @@ -7513,7 +7523,6 @@ dependencies = [
"lightning-notifier",
"lightning-openrpc",
"lightning-openrpc-macros",
"lightning-origin-demuxer",
"lightning-pool",
"lightning-rep-collector",
"lightning-signer",
Expand Down
4 changes: 3 additions & 1 deletion core/fetcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ edition = "2021"
lightning-interfaces = { path = "../interfaces" }
blake3-tree = { path = "../../lib/blake3-tree" }
lightning-metrics = { path = "../metrics" }
lightning-origin-ipfs = { path = "../origin-ipfs" }
lightning-origin-http = { path = "../origin-http" }
lightning-origin-b3fs = { path = "../origin-b3fs" }
futures.workspace = true
serde.workspace = true
anyhow.workspace = true
Expand All @@ -23,7 +26,6 @@ lightning-workspace-hack.workspace = true
[dev-dependencies]
lightning-blockstore = { path = "../blockstore" }
lightning-indexer = { path = "../indexer" }
lightning-origin-demuxer = { path = "../origin-demuxer" }
lightning-origin-ipfs = { path = "../origin-ipfs" }
lightning-resolver = { path = "../resolver" }
lightning-blockstore-server = { path = "../blockstore-server" }
Expand Down
8 changes: 7 additions & 1 deletion core/fetcher/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct Config {
// Maximum number of concurrent origin requests we send out.
pub max_conc_origin_req: usize,
pub http: lightning_origin_http::Config,
pub ipfs: lightning_origin_ipfs::Config,
pub b3fs: lightning_origin_b3fs::Config,
}

impl Default for Config {
fn default() -> Self {
Self {
max_conc_origin_req: 5,
http: lightning_origin_http::Config::default(),
ipfs: lightning_origin_ipfs::Config::default(),
b3fs: lightning_origin_b3fs::Config::default(),
}
}
}
12 changes: 5 additions & 7 deletions core/fetcher/src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use types::{NodeIndex, PeerRequestError};

use crate::config::Config;
use crate::origin::{OriginError, OriginFetcher, OriginRequest};
use crate::router::Router;

pub(crate) type Uri = Vec<u8>;

Expand All @@ -31,21 +32,18 @@ impl<C: Collection> Fetcher<C> {
pub fn new(
config: &C::ConfigProviderInterface,
blockstore_server: &C::BlockstoreServerInterface,
origin: &C::OriginProviderInterface,
app: &C::ApplicationInterface,
fdi::Cloned(blockstore): fdi::Cloned<C::BlockstoreInterface>,
fdi::Cloned(resolver): fdi::Cloned<C::ResolverInterface>,
fdi::Cloned(shutdown): fdi::Cloned<ShutdownWaiter>,
) -> anyhow::Result<Self> {
let config = config.get::<Self>();

let router = Router::new(config.clone(), blockstore.clone())?;

let (origin_tx, rx) = mpsc::channel(128);
let origin_fetcher = OriginFetcher::<C>::new(
config.max_conc_origin_req,
origin.get_socket(),
rx,
resolver.clone(),
);
let origin_fetcher =
OriginFetcher::<C>::new(config.max_conc_origin_req, router, rx, resolver.clone());

let waiter = shutdown.clone();
let app_query = app.sync_query();
Expand Down
1 change: 1 addition & 0 deletions core/fetcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;
pub mod fetcher;
mod origin;
mod router;
#[cfg(test)]
mod tests;
20 changes: 8 additions & 12 deletions core/fetcher/src/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,36 @@ use std::collections::{HashMap, VecDeque};

use lightning_interfaces::prelude::*;
use lightning_interfaces::types::{Blake3Hash, ImmutablePointer};
use lightning_interfaces::OriginProviderSocket;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::task::JoinSet;
use tracing::error;

use crate::fetcher::Uri;
use crate::router::Router;

pub struct OriginFetcher<C: Collection> {
tasks: JoinSet<Result<SuccessResponse, ErrorResponse>>,
queue: VecDeque<ImmutablePointer>,
rx: mpsc::Receiver<OriginRequest>,
origin_socket: OriginProviderSocket,
resolver: C::ResolverInterface,
capacity: usize,
router: Router<C>,
}

impl<C: Collection> OriginFetcher<C> {
pub fn new(
capacity: usize,
origin_socket: OriginProviderSocket,
router: Router<C>,
rx: mpsc::Receiver<OriginRequest>,
resolver: C::ResolverInterface,
) -> Self {
Self {
tasks: JoinSet::new(),
queue: VecDeque::new(),
rx,
origin_socket,
resolver,
capacity,
router,
}
}

Expand Down Expand Up @@ -74,7 +74,6 @@ impl<C: Collection> OriginFetcher<C> {
}
Ok(Err(e)) => {
match e {
ErrorResponse::OriginSocketError => error!("Failed to get response from socket"),
ErrorResponse::OriginFetchError(uri) => {
if let Some(tx) = pending_requests.remove(&uri) {
tx.send(Err(OriginError)).expect("Failed to send response");
Expand All @@ -99,12 +98,11 @@ impl<C: Collection> OriginFetcher<C> {
}

async fn spawn(&mut self, pointer: ImmutablePointer) {
let origin_socket = self.origin_socket.clone();
let router = self.router.clone();
self.tasks.spawn(async move {
match origin_socket.run(pointer.clone()).await {
Ok(Ok(hash)) => Ok(SuccessResponse { pointer, hash }),
Ok(Err(_)) => Err(ErrorResponse::OriginFetchError(pointer.uri)),
Err(_) => Err(ErrorResponse::OriginSocketError),
match router.route(&pointer).await {
Ok(hash) => Ok(SuccessResponse { pointer, hash }),
Err(_) => Err(ErrorResponse::OriginFetchError(pointer.uri)),
}
});
}
Expand All @@ -122,8 +120,6 @@ struct SuccessResponse {

#[derive(Debug, thiserror::Error)]
enum ErrorResponse {
#[error("Failed to get message from origin socket")]
OriginSocketError,
#[error("Failed to fetch data from origin: {0:?}")]
OriginFetchError(Uri),
}
Expand Down
31 changes: 15 additions & 16 deletions core/origin-demuxer/src/demuxer.rs → core/fetcher/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,33 @@
use affair::AsyncWorkerUnordered;
use lightning_interfaces::types::{Blake3Hash, ImmutablePointer, OriginProvider};
use lightning_interfaces::Collection;
use lightning_origin_b3fs::B3FSOrigin;
use lightning_origin_http::HttpOrigin;
use lightning_origin_ipfs::IPFSOrigin;

use crate::Config;
use crate::config::Config;

pub struct Demuxer<C: Collection> {
#[derive(Clone)]
pub(crate) struct Router<C: Collection> {
http: HttpOrigin<C>,
ipfs: IPFSOrigin<C>,
b3fs: B3FSOrigin<C>,
}

impl<C: Collection> AsyncWorkerUnordered for Demuxer<C> {
type Request = ImmutablePointer;
type Response = anyhow::Result<Blake3Hash>;
impl<C: Collection> Router<C> {
pub fn new(config: Config, blockstore: C::BlockstoreInterface) -> anyhow::Result<Self> {
Ok(Self {
http: HttpOrigin::<C>::new(config.http, blockstore.clone())?,
ipfs: IPFSOrigin::<C>::new(config.ipfs, blockstore)?,
b3fs: B3FSOrigin::<C>::new(config.b3fs)?,
})
}

async fn handle(&self, req: Self::Request) -> Self::Response {
pub async fn route(&self, req: &ImmutablePointer) -> anyhow::Result<Blake3Hash> {
match &req.origin {
OriginProvider::HTTP => self.http.fetch(&req.uri).await,
OriginProvider::IPFS => self.ipfs.fetch(&req.uri).await,
OriginProvider::B3FS => self.b3fs.fetch(&req.uri).await,
_ => Err(anyhow::anyhow!("unknown origin type")),
}
}
}

impl<C: Collection> Demuxer<C> {
pub fn new(config: Config, blockstore: C::BlockstoreInterface) -> anyhow::Result<Self> {
Ok(Self {
http: HttpOrigin::<C>::new(config.http, blockstore.clone())?,
ipfs: IPFSOrigin::<C>::new(config.ipfs, blockstore)?,
})
}
}
2 changes: 0 additions & 2 deletions core/fetcher/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use lightning_interfaces::types::{
OriginProvider,
};
use lightning_notifier::Notifier;
use lightning_origin_demuxer::{Config as DemuxerOriginConfig, OriginDemuxer};
use lightning_origin_ipfs::config::{Gateway, Protocol, RequestFormat};
use lightning_origin_ipfs::Config as IPFSOriginConfig;
use lightning_pool::{Config as PoolConfig, PoolProvider};
Expand Down Expand Up @@ -49,7 +48,6 @@ partial!(TestBinding {
ConfigProviderInterface = JsonConfigProvider;
FetcherInterface = Fetcher<Self>;
ForwarderInterface = MockForwarder<Self>;
OriginProviderInterface = OriginDemuxer<Self>;
BroadcastInterface = Broadcast<Self>;
BlockstoreInterface = Blockstore<Self>;
BlockstoreServerInterface = BlockstoreServer<Self>;
Expand Down
1 change: 0 additions & 1 deletion core/final-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ lightning-handshake = { path = "../handshake" }
lightning-indexer = { path = "../indexer" }
lightning-interfaces = { path = "../interfaces" }
lightning-notifier = { path = "../notifier" }
lightning-origin-demuxer = { path = "../origin-demuxer" }
lightning-pool = { path = "../pool" }
lightning-rep-collector = { path = "../rep-collector" }
lightning-resolver = { path = "../resolver" }
Expand Down
3 changes: 0 additions & 3 deletions core/final-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use lightning_indexer::Indexer;
use lightning_interfaces::partial;
use lightning_keystore::Keystore;
use lightning_notifier::Notifier;
use lightning_origin_demuxer::OriginDemuxer;
use lightning_pinger::Pinger;
use lightning_pool::PoolProvider;
use lightning_rep_collector::ReputationAggregator;
Expand All @@ -38,7 +37,6 @@ partial!(FinalTypes require full {
ArchiveInterface = Archive<Self>;
HandshakeInterface = Handshake<Self>;
NotifierInterface = Notifier<Self>;
OriginProviderInterface = OriginDemuxer<Self>;
ReputationAggregatorInterface = ReputationAggregator<Self>;
ResolverInterface = Resolver<Self>;
RpcInterface = Rpc<Self>;
Expand Down Expand Up @@ -66,7 +64,6 @@ partial!(UseMockConsensus require full {
ArchiveInterface = Archive<Self>;
HandshakeInterface = Handshake<Self>;
NotifierInterface = Notifier<Self>;
OriginProviderInterface = OriginDemuxer<Self>;
ReputationAggregatorInterface = ReputationAggregator<Self>;
ResolverInterface = Resolver<Self>;
RpcInterface = Rpc<Self>;
Expand Down
1 change: 0 additions & 1 deletion core/interfaces/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ collection!([
ConsensusInterface,
HandshakeInterface,
NotifierInterface,
OriginProviderInterface,
DeliveryAcknowledgmentAggregatorInterface,
ReputationAggregatorInterface,
ResolverInterface,
Expand Down
2 changes: 0 additions & 2 deletions core/interfaces/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod indexer;
mod keystore;
mod macros;
mod notifier;
mod origin;
mod pinger;
mod pool;
mod reputation;
Expand Down Expand Up @@ -42,7 +41,6 @@ pub use handshake::*;
pub use indexer::*;
pub use keystore::*;
pub use notifier::*;
pub use origin::*;
pub use pinger::*;
pub use pool::*;
pub use reputation::*;
Expand Down
1 change: 0 additions & 1 deletion core/interfaces/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ macro_rules! partial {
ConsensusInterface,
HandshakeInterface,
NotifierInterface,
OriginProviderInterface,
DeliveryAcknowledgmentAggregatorInterface,
ReputationAggregatorInterface,
ResolverInterface,
Expand Down
34 changes: 0 additions & 34 deletions core/interfaces/src/origin.rs

This file was deleted.

Loading