Skip to content

Commit

Permalink
finish incoming tx handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Boog900 committed Oct 13, 2024
1 parent d2c7e49 commit b6d94cf
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 227 deletions.
2 changes: 1 addition & 1 deletion binaries/cuprated/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ cuprate-levin = { path = "../../net/levin" }
cuprate-wire = { path = "../../net/wire" }
cuprate-p2p = { path = "../../p2p/p2p" }
cuprate-p2p-core = { path = "../../p2p/p2p-core" }
cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" }
cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower", features = ["txpool"] }
cuprate-async-buffer = { path = "../../p2p/async-buffer" }
cuprate-address-book = { path = "../../p2p/address-book" }
cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] }
Expand Down
1 change: 0 additions & 1 deletion binaries/cuprated/src/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@

mod dandelion;
mod incoming_tx;
mod manager;
mod txs_being_handled;
31 changes: 14 additions & 17 deletions binaries/cuprated/src/txpool/dandelion.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::time::Duration;

use bytes::Bytes;
use cuprate_dandelion_tower::pool::DandelionPoolService;
use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter};
use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter, Graph};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
Expand All @@ -11,10 +13,17 @@ mod stem_service;
mod tx_store;

#[derive(Clone)]
struct DandelionTx(Bytes);
pub struct DandelionTx(Bytes);

type TxId = [u8; 32];

const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
time_between_hop: Duration::from_millis(175),
epoch_duration: Duration::from_secs(10 * 60),
fluff_probability: 0.12,
graph: Graph::FourRegular,
};

type ConcreteDandelionRouter = DandelionRouter<
stem_service::OutboundPeerStream,
diffuse_service::DiffuseService,
Expand All @@ -35,12 +44,7 @@ pub fn start_dandelion_pool_manager(
txpool_read_handle,
txpool_write_handle,
},
DandelionConfig {
time_between_hop: Default::default(),
epoch_duration: Default::default(),
fluff_probability: 0.0,
graph: Default::default(),
},
DANDELION_CONFIG,
)
}

Expand All @@ -49,14 +53,7 @@ pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandel
diffuse_service::DiffuseService {
clear_net_broadcast_service: clear_net.broadcast_svc(),
},
stem_service::OutboundPeerStream {
clear_net: clear_net.clone(),
},
DandelionConfig {
time_between_hop: Default::default(),
epoch_duration: Default::default(),
fluff_probability: 0.0,
graph: Default::default(),
},
stem_service::OutboundPeerStream { clear_net },
DANDELION_CONFIG,
)
}
21 changes: 17 additions & 4 deletions binaries/cuprated/src/txpool/dandelion/diffuse_service.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
use std::task::{Context, Poll};
use std::{
future::{ready, Ready},
task::{Context, Poll},
};

use futures::FutureExt;
use tower::Service;

use crate::txpool::dandelion::DandelionTx;
use cuprate_dandelion_tower::traits::DiffuseRequest;
use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface};
use cuprate_p2p_core::ClearNet;

use super::DandelionTx;

/// The dandelion diffusion service.
pub struct DiffuseService {
pub clear_net_broadcast_service: BroadcastSvc<ClearNet>,
}

impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
type Response = BroadcastSvc::Response;
type Response = ();
type Error = tower::BoxError;
type Future = BroadcastSvc::Future;
type Future = Ready<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.clear_net_broadcast_service
Expand All @@ -22,11 +29,17 @@ impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
}

fn call(&mut self, req: DiffuseRequest<DandelionTx>) -> Self::Future {
// TODO: Call `into_inner` when 1.82.0 stabilizes
self.clear_net_broadcast_service
.call(BroadcastRequest::Transaction {
tx_bytes: req.0 .0,
direction: None,
received_from: None,
})
.now_or_never()
.unwrap()
.expect("Broadcast service is Infallible");

ready(Ok(()))
}
}
41 changes: 27 additions & 14 deletions binaries/cuprated/src/txpool/dandelion/stem_service.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use super::DandelionTx;
use bytes::Bytes;
use cuprate_dandelion_tower::traits::StemRequest;
use cuprate_dandelion_tower::OutboundPeer;
use std::{
pin::Pin,
task::{Context, Poll},
};

use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::client::Client;
use cuprate_p2p_core::{ClearNet, NetworkZone, PeerRequest, ProtocolRequest};
use cuprate_wire::protocol::NewTransactions;
use cuprate_wire::NetworkAddress;
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
};
use cuprate_wire::{protocol::NewTransactions, NetworkAddress};

use bytes::Bytes;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;

use super::DandelionTx;

/// The dandelion outbound peer stream.
pub struct OutboundPeerStream {
pub clear_net: NetworkInterface<ClearNet>,
}
Expand All @@ -20,22 +26,29 @@ impl Stream for OutboundPeerStream {
type Item = Result<OutboundPeer<NetworkAddress, StemPeerService<ClearNet>>, tower::BoxError>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO: make the outbound peer choice random.
Poll::Ready(Some(Ok(self
.clear_net
.client_pool()
.outbound_client()
.map_or(OutboundPeer::Exhausted, |client| {
OutboundPeer::Peer(client.info.id.into(), StemPeerService(client))
let addr = match client.info.id {
InternalPeerID::KnownAddr(addr) => addr,
InternalPeerID::Unknown(_) => panic!("Outbound peer had an unknown address"),
};

OutboundPeer::Peer(addr.into(), StemPeerService(client))
}))))
}
}

pub struct StemPeerService<N>(Client<N>);
/// The stem service, used to send stem txs.
pub struct StemPeerService<N: NetworkZone>(Client<N>);

impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
type Response = ();
type Response = <Client<N> as Service<PeerRequest>>::Response;
type Error = tower::BoxError;
type Future = Client::Future;
type Future = <Client<N> as Service<PeerRequest>>::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
Expand Down
59 changes: 44 additions & 15 deletions binaries/cuprated/src/txpool/dandelion/tx_store.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
use crate::txpool::dandelion::{DandelionTx, TxId};
use std::{
f32::consts::E,
task::{Context, Poll},
};

use bytes::Bytes;
use cuprate_dandelion_tower::traits::{TxStoreRequest, TxStoreResponse};
use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt};
use tower::{util::Oneshot, Service, ServiceExt};

use cuprate_dandelion_tower::{
traits::{TxStoreRequest, TxStoreResponse},
State,
};
use cuprate_database::RuntimeError;
use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse};
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt};
use std::task::{Context, Poll};
use tower::util::Oneshot;
use tower::{Service, ServiceExt};
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
TxpoolReadHandle, TxpoolWriteHandle,
};

use super::{DandelionTx, TxId};

/// The dandelion tx-store service.
///
/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides.
pub struct TxStoreService {
pub txpool_read_handle: TxpoolReadHandle,
pub txpool_write_handle: TxpoolWriteHandle,
Expand All @@ -31,17 +43,34 @@ impl Service<TxStoreRequest<TxId>> for TxStoreService {
.clone()
.oneshot(TxpoolReadRequest::TxBlob(tx_id))
.map(|res| match res {
Ok(TxpoolReadResponse::TxBlob(blob)) => Ok(TxStoreResponse::Transaction(Some(
(DandelionTx(Bytes::from(blob)), todo!()),
))),
Ok(TxpoolReadResponse::TxBlob {
tx_blob,
state_stem,
}) => {
let state = if state_stem {
State::Stem
} else {
State::Fluff
};

Ok(TxStoreResponse::Transaction(Some((
DandelionTx(Bytes::from(tx_blob)),
state,
))))
}
Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
Err(e) => Err(e.into()),
Ok(_) => unreachable!(),
})
.boxed(),
TxStoreRequest::Promote(tx_id) => {
todo!()
}
TxStoreRequest::Promote(tx_id) => self
.txpool_write_handle
.oneshot(TxpoolWriteRequest::Promote(tx_id))
.map(|res| match res {
Ok(_) | Err(RuntimeError::KeyNotFound) => TxStoreResponse::Ok,
Err(e) => Err(e.into()),
})
.boxed(),
}
}
}
Loading

0 comments on commit b6d94cf

Please sign in to comment.