Skip to content

Commit

Permalink
Use tokio::sync::broadcast::Sender::new instead of channel
Browse files Browse the repository at this point in the history
Using `channel` creates an unneeded receiver.
  • Loading branch information
hrxi committed Oct 25, 2024
1 parent 5c7c54c commit f452013
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 72 deletions.
28 changes: 10 additions & 18 deletions blockchain/src/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use nimiq_primitives::{
coin::Coin, networks::NetworkId, policy::Policy, slots_allocation::Validators, trie::TrieItem,
};
use nimiq_utils::time::OffsetTime;
use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender};
use tokio::sync::broadcast;

#[cfg(feature = "metrics")]
use crate::chain_metrics::BlockchainMetrics;
Expand All @@ -37,11 +37,11 @@ pub struct Blockchain {
/// The OffsetTime struct. It allows us to query the current time.
pub time: Arc<OffsetTime>, // shared with network
/// The notifier processes events relative to the blockchain.
pub notifier: BroadcastSender<BlockchainEvent>,
pub notifier: broadcast::Sender<BlockchainEvent>,
/// The fork notifier processes fork events.
pub fork_notifier: BroadcastSender<ForkEvent>,
pub fork_notifier: broadcast::Sender<ForkEvent>,
/// The log notifier processes all events regarding accounts changes.
pub log_notifier: BroadcastSender<BlockLog>,
pub log_notifier: broadcast::Sender<BlockLog>,
/// The chain store is a database containing all of the chain infos, blocks and receipts.
pub chain_store: ChainStore,
/// The history store is a database containing all of the history trees and transactions.
Expand Down Expand Up @@ -277,18 +277,14 @@ impl Blockchain {
}
};

let (tx, _rx) = broadcast(BROADCAST_MAX_CAPACITY);
let (tx_fork, _rx_fork) = broadcast(BROADCAST_MAX_CAPACITY);
let (tx_log, _rx_log) = broadcast(BROADCAST_MAX_CAPACITY);

Ok(Blockchain {
db: env,
config,
network_id,
time,
notifier: tx,
fork_notifier: tx_fork,
log_notifier: tx_log,
notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
fork_notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
log_notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
chain_store,
history_store,
state: BlockchainState {
Expand Down Expand Up @@ -344,18 +340,14 @@ impl Blockchain {
chain_store.set_head(&mut txn, &head_hash);
txn.commit();

let (tx, _rx) = broadcast(BROADCAST_MAX_CAPACITY);
let (tx_fork, _rx_fork) = broadcast(BROADCAST_MAX_CAPACITY);
let (tx_log, _rx_log) = broadcast(BROADCAST_MAX_CAPACITY);

Ok(Blockchain {
db: env,
config,
network_id,
time,
notifier: tx,
fork_notifier: tx_fork,
log_notifier: tx_log,
notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
fork_notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
log_notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
chain_store,
history_store,
state: BlockchainState {
Expand Down
4 changes: 2 additions & 2 deletions blockchain/src/blockchain/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use nimiq_primitives::{
};
use nimiq_trie::WriteTransactionProxy as TrieMdbxWriteTransaction;
use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard};
use tokio::sync::broadcast::Sender as BroadcastSender;
use tokio::sync::broadcast;

use super::PostValidationHook;
use crate::{interface::HistoryInterface, Blockchain};

fn send_vec(log_notifier: &BroadcastSender<BlockLog>, logs: Vec<BlockLog>) {
fn send_vec(log_notifier: &broadcast::Sender<BlockLog>, logs: Vec<BlockLog>) {
for log in logs {
// The log notifier is for informational purposes only, thus may have no listeners.
// Therefore, no error logs should be produced in this case.
Expand Down
11 changes: 4 additions & 7 deletions consensus/src/consensus/consensus_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use nimiq_transaction::{
historic_transaction::HistoricTransaction, ControlTransaction, ControlTransactionTopic,
Transaction, TransactionTopic,
};
use tokio::sync::{
broadcast::Sender as BroadcastSender, mpsc::Sender as MpscSender,
oneshot::channel as oneshot_channel,
};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::BroadcastStream;

use super::{ConsensusRequest, ResolveBlockError, ResolveBlockRequest};
Expand All @@ -45,8 +42,8 @@ pub struct ConsensusProxy<N: Network> {
pub network: Arc<N>,
pub(crate) established_flag: Arc<AtomicBool>,
pub(crate) synced_validity_window_flag: Arc<AtomicBool>,
pub(crate) events: BroadcastSender<ConsensusEvent>,
pub(crate) request: MpscSender<ConsensusRequest<N>>,
pub(crate) events: broadcast::Sender<ConsensusEvent>,
pub(crate) request: mpsc::Sender<ConsensusRequest<N>>,
}

impl<N: Network> Clone for ConsensusProxy<N> {
Expand Down Expand Up @@ -606,7 +603,7 @@ impl<N: Network> ConsensusProxy<N> {
) -> Result<Block, ResolveBlockError<N>> {
// Create the oneshot sender whose receiver this fn will await and whose
// sender will be given to the consensus proper to resolve the call.
let (response_sender, receiver) = oneshot_channel();
let (response_sender, receiver) = oneshot::channel();

// Create the request structure.
let request = ResolveBlockRequest {
Expand Down
22 changes: 9 additions & 13 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ use nimiq_time::{interval, Interval};
use nimiq_utils::{spawn, WakerExt};
use nimiq_zkp_component::zkp_component::ZKPComponentProxy;
use tokio::sync::{
broadcast::{channel as broadcast, Sender as BroadcastSender},
mpsc::{
channel as mpsc_channel, error::SendError, Receiver as MpscReceiver, Sender as MpscSender,
},
oneshot::{error::RecvError, Sender as OneshotSender},
broadcast,
mpsc::{self, error::SendError},
oneshot::{self, error::RecvError},
};
use tokio_stream::wrappers::BroadcastStream;

Expand Down Expand Up @@ -116,7 +114,7 @@ pub struct ResolveBlockRequest<N: Network> {
pub(crate) first_peer_id: N::PeerId,

/// Sender to a oneshot channel where the response to the request is being awaited.
pub(crate) response_sender: OneshotSender<Result<Block, ResolveBlockError<N>>>,
pub(crate) response_sender: oneshot::Sender<Result<Block, ResolveBlockError<N>>>,
}

/// Enumeration of all ConsensusRequests available.
Expand All @@ -130,7 +128,7 @@ pub struct Consensus<N: Network> {

pub sync: SyncerProxy<N>,

events: BroadcastSender<ConsensusEvent>,
events: broadcast::Sender<ConsensusEvent>,
established_flag: Arc<AtomicBool>,
#[cfg(feature = "full")]
last_batch_number: u32,
Expand All @@ -153,8 +151,8 @@ pub struct Consensus<N: Network> {
/// somewhere deeper down the call stack would be adequate, as other requests may require different
/// structures. Putting it here seemed to be the most flexible.
requests: (
MpscSender<ConsensusRequest<N>>,
MpscReceiver<ConsensusRequest<N>>,
mpsc::Sender<ConsensusRequest<N>>,
mpsc::Receiver<ConsensusRequest<N>>,
),

zkp_proxy: ZKPComponentProxy<N>,
Expand Down Expand Up @@ -195,8 +193,6 @@ impl<N: Network> Consensus<N> {
min_peers: usize,
zkp_proxy: ZKPComponentProxy<N>,
) -> Self {
let (tx, _rx) = broadcast(256);

Self::init_network_request_receivers(&network, &blockchain);

#[cfg(feature = "full")]
Expand All @@ -216,7 +212,7 @@ impl<N: Network> Consensus<N> {
blockchain,
network,
sync: syncer,
events: tx,
events: broadcast::Sender::new(256),
established_flag,
#[cfg(feature = "full")]
last_batch_number: 0,
Expand All @@ -226,7 +222,7 @@ impl<N: Network> Consensus<N> {
head_requests_interval: interval(Self::HEAD_REQUESTS_TIMEOUT),
min_peers,
// Choose a small buffer as having a lot of items buffered here indicates a bigger problem.
requests: mpsc_channel(10),
requests: mpsc::channel(10),
zkp_proxy,
waker: None,
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/sync/live/block_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::stream::BoxStream;
use nimiq_block::Block;
use nimiq_network_interface::network::{MsgAcceptance, Network, PubsubId};
pub use proxy::BlockQueueProxy as BlockQueue;
use tokio::sync::oneshot::Sender as OneshotSender;
use tokio::sync::oneshot;

use crate::{
consensus::ResolveBlockError,
Expand All @@ -20,7 +20,7 @@ pub type GossipSubBlockStream<N> = BoxStream<'static, (Block, <N as Network>::Pu

pub type BlockAndSource<N> = (Block, BlockSource<N>);

pub type ResolveBlockSender<N> = OneshotSender<Result<Block, ResolveBlockError<N>>>;
pub type ResolveBlockSender<N> = oneshot::Sender<Result<Block, ResolveBlockError<N>>>;

pub enum QueuedBlock<N: Network> {
Head(BlockAndSource<N>),
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/sync/live/block_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use nimiq_network_interface::network::Network;
use nimiq_primitives::{policy::Policy, slots_allocation::Validators};
use nimiq_utils::WakerExt;
use parking_lot::RwLock;
use tokio::sync::oneshot::Sender as OneshotSender;
use tokio::sync::oneshot;

use crate::{
consensus::{ResolveBlockError, ResolveBlockRequest},
Expand Down Expand Up @@ -70,15 +70,15 @@ pub struct BlockQueue<N: Network> {

/// A list of all pending missing block requests which have someplace waiting for it to resolve.
///
/// `block_height` -> `block_hash` -> `OneshotSender` to resolve them.
/// `block_height` -> `block_hash` -> `oneshot::Sender` to resolve them.
///
/// Generally this would be empty as most missing block requests do not have another party waiting
/// for them to resolve. Currently the only other part waiting for a resolution of such a request is the
/// ProposalBuffer in the validator crate. It uses it to request predecessors of proposals if they
/// are unknown.
///
pending_requests:
BTreeMap<u32, HashMap<Blake2bHash, OneshotSender<Result<Block, ResolveBlockError<N>>>>>,
BTreeMap<u32, HashMap<Blake2bHash, oneshot::Sender<Result<Block, ResolveBlockError<N>>>>>,

/// Waker used for the poll function
pub(crate) waker: Option<Waker>,
Expand Down Expand Up @@ -707,7 +707,7 @@ impl<N: Network> BlockQueue<N> {
if let Err(error) = response_sender.send(Err(ResolveBlockError::Duplicate)) {
log::warn!(
?error,
"Failed to send on Oneshot, receiver already dropped"
"Failed to send on oneshot, receiver already dropped"
);
}
// Do not return as even though the request might not be awaited it should still be executed to
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/sync/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_bls::cache::PublicKeyCache;
use nimiq_network_interface::network::Network;
use parking_lot::Mutex;
use tokio::sync::mpsc::{channel as mpsc, Sender as MpscSender};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

#[cfg(feature = "full")]
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct LiveSyncer<N: Network, Q: LiveSyncQueue<N>> {
/// Channel used to communicate additional blocks to the queue.
/// We use this to wake up the queue and pass in new, unknown blocks
/// received in the consensus as part of the head requests.
block_tx: MpscSender<BlockAndSource<N>>,
block_tx: mpsc::Sender<BlockAndSource<N>>,
}

impl<N: Network, Q: LiveSyncQueue<N>> LiveSyncer<N, Q> {
Expand All @@ -63,7 +63,7 @@ impl<N: Network, Q: LiveSyncQueue<N>> LiveSyncer<N, Q> {
mut queue: Q,
bls_cache: Arc<Mutex<PublicKeyCache>>,
) -> Self {
let (tx, rx) = mpsc(MAX_BLOCK_STREAM_BUFFER);
let (tx, rx) = mpsc::channel(MAX_BLOCK_STREAM_BUFFER);
queue.add_block_stream(ReceiverStream::new(rx));
Self {
blockchain,
Expand Down
13 changes: 5 additions & 8 deletions light-blockchain/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use nimiq_primitives::{
};
use nimiq_utils::time::OffsetTime;
use nimiq_vrf::VrfEntropy;
use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender};
use tokio::sync::broadcast;

use crate::chain_store::ChainStore;

Expand All @@ -38,9 +38,9 @@ pub struct LightBlockchain {
/// The chain store is a database containing all of the chain infos in the current batch.
pub chain_store: ChainStore,
/// The notifier processes events relative to the blockchain.
pub notifier: BroadcastSender<BlockchainEvent>,
pub notifier: broadcast::Sender<BlockchainEvent>,
/// The fork notifier processes fork events.
pub fork_notifier: BroadcastSender<ForkEvent>,
pub fork_notifier: broadcast::Sender<ForkEvent>,
}

/// Implements methods to start a Blockchain.
Expand All @@ -62,9 +62,6 @@ impl LightBlockchain {

chain_store.put_chain_info(chain_info);

let (tx, _rx) = broadcast(BROADCAST_MAX_CAPACITY);
let (tx_fork, _rx_fork) = broadcast(BROADCAST_MAX_CAPACITY);

LightBlockchain {
network_id,
time,
Expand All @@ -74,8 +71,8 @@ impl LightBlockchain {
current_validators: genesis_block.validators(),
genesis_block,
chain_store,
notifier: tx,
fork_notifier: tx_fork,
notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
fork_notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
}
}

Expand Down
2 changes: 1 addition & 1 deletion network-libp2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Network {
let local_peer_id = *Swarm::local_peer_id(&swarm);
let connected_peers = Arc::new(RwLock::new(HashMap::new()));

let (events_tx, _) = broadcast::channel(64);
let events_tx = broadcast::Sender::new(64);
let (action_tx, action_rx) = mpsc::channel(64);
let (validate_tx, validate_rx) = mpsc::unbounded_channel();

Expand Down
2 changes: 1 addition & 1 deletion network-mock/src/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl MockHubInner {
.entry(topic_name)
.or_insert_with(|| MockTopic {
peers: HashSet::new(),
sender: broadcast::channel(16).0,
sender: broadcast::Sender::new(16),
});

// Add the peer address to the subscribed peer list.
Expand Down
6 changes: 4 additions & 2 deletions network-mock/src/observable_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ impl<K: Clone + Eq + Hash, V: Clone> Default for ObservableHashMap<K, V> {

impl<K: Clone + Eq + Hash, V: Clone> From<HashMap<K, V>> for ObservableHashMap<K, V> {
fn from(inner: HashMap<K, V>) -> ObservableHashMap<K, V> {
let (tx, _) = broadcast::channel(64);
ObservableHashMap { inner, tx }
ObservableHashMap {
inner,
tx: broadcast::Sender::new(64),
}
}
}

Expand Down
16 changes: 5 additions & 11 deletions zkp-component/src/zkp_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ use nimiq_network_interface::{
};
use nimiq_utils::spawn;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use tokio::sync::{
broadcast::{channel as broadcast, Sender as BroadcastSender},
oneshot::error::RecvError,
};
use tokio::sync::{broadcast, oneshot::error::RecvError};
use tokio_stream::wrappers::BroadcastStream;

#[cfg(feature = "zkp-prover")]
Expand All @@ -29,13 +26,13 @@ use crate::{proof_store::ProofStore, proof_utils::*, types::*, zkp_requests::ZKP

pub type ZKProofsStream<N> = BoxStream<'static, (ZKProof, <N as Network>::PubsubId)>;

pub(crate) const BROADCAST_MAX_CAPACITY: usize = 256;
const BROADCAST_MAX_CAPACITY: usize = 256;

pub struct ZKPComponentProxy<N: Network> {
network: Arc<N>,
zkp_state: Arc<RwLock<ZKPState>>,
zkp_requests: Arc<Mutex<ZKPRequests<N>>>,
pub(crate) zkp_events_notifier: BroadcastSender<ZKPEvent<N>>,
pub(crate) zkp_events_notifier: broadcast::Sender<ZKPEvent<N>>,
}

impl<N: Network> Clone for ZKPComponentProxy<N> {
Expand Down Expand Up @@ -116,7 +113,7 @@ pub struct ZKPComponent<N: Network> {
zk_proofs_stream: ZKProofsStream<N>,
proof_storage: Option<Box<dyn ProofStore>>,
zkp_requests: Arc<Mutex<ZKPRequests<N>>>,
zkp_events_notifier: BroadcastSender<ZKPEvent<N>>,
zkp_events_notifier: broadcast::Sender<ZKPEvent<N>>,
}

impl<N: Network> ZKPComponent<N> {
Expand All @@ -132,9 +129,6 @@ impl<N: Network> ZKPComponent<N> {
ZKPState::with_genesis(&genesis_block).expect("Invalid genesis block"),
));

// Creates the zk proofs events notifier.
let (zkp_events_notifier, _rx) = broadcast(BROADCAST_MAX_CAPACITY);

// Gets the stream zkps gossiped by peers.
let zk_proofs_stream = network.subscribe::<ZKProofTopic>().await.unwrap().boxed();

Expand All @@ -147,7 +141,7 @@ impl<N: Network> ZKPComponent<N> {
zk_proofs_stream,
proof_storage,
zkp_requests: Arc::new(Mutex::new(ZKPRequests::new(network))),
zkp_events_notifier,
zkp_events_notifier: broadcast::Sender::new(BROADCAST_MAX_CAPACITY),
};

// Loads the proof from the db if any.
Expand Down

0 comments on commit f452013

Please sign in to comment.