Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(txpool|p2p): use seqlock instead of small copy-able RwLocks #2524

Merged
merged 18 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2472](https://github.com/FuelLabs/fuel-core/pull/2472): Added the `amountU128` field to the `Balance` GraphQL schema, providing the total balance as a `U128`. The existing `amount` field clamps any balance exceeding `U64` to `u64::MAX`.
- [2526](https://github.com/FuelLabs/fuel-core/pull/2526): Add possibility to not have any cache set for RocksDB. Add an option to either load the RocksDB columns families on creation of the database or when the column is used.
- [2532](https://github.com/FuelLabs/fuel-core/pull/2532): Getters for inner rocksdb database handles.
- [2524](https://github.com/FuelLabs/fuel-core/pull/2524): Adds a new lock type which is optimized for certain workloads to the txpool and p2p services.

### Fixed
- [2365](https://github.com/FuelLabs/fuel-core/pull/2365): Fixed the error during dry run in the case of race condition.
Expand Down
32 changes: 11 additions & 21 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ use crate::{
};
use fuel_core_types::blockchain::consensus::Genesis;

use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::Checksum,
};
use fuel_core_services::seqlock::SeqLockReader;
use libp2p::{
gossipsub,
identity::{
Expand All @@ -22,18 +28,8 @@ use std::{
IpAddr,
Ipv4Addr,
},
sync::{
Arc,
RwLock,
},
time::Duration,
};

use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::Checksum,
};
mod connection_tracker;
mod fuel_authenticated;
pub(crate) mod fuel_upgrade;
Expand Down Expand Up @@ -258,20 +254,16 @@ impl Config<Initialized> {
/// mplex or yamux for multiplexing
pub(crate) fn build_transport_function(
p2p_config: &Config,
) -> (
impl FnOnce(&Keypair) -> Result<FuelAuthenticated<ConnectionTracker>, ()> + '_,
Arc<RwLock<ConnectionState>>,
) {
let connection_state = ConnectionState::new();
let kept_connection_state = connection_state.clone();
let transport_function = move |keypair: &Keypair| {
connection_state_reader: SeqLockReader<ConnectionState>,
) -> impl FnOnce(&Keypair) -> Result<FuelAuthenticated<ConnectionTracker>, ()> + '_ {
move |keypair: &Keypair| {
let noise_authenticated =
noise::Config::new(keypair).expect("Noise key generation failed");

let connection_state = if p2p_config.reserved_nodes_only_mode {
None
} else {
Some(connection_state)
Some(connection_state_reader.clone())
};

let connection_tracker =
Expand All @@ -282,9 +274,7 @@ pub(crate) fn build_transport_function(
connection_tracker,
p2p_config.checksum,
))
};

(transport_function, kept_connection_state)
}
}

fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet<PeerId> {
Expand Down
17 changes: 5 additions & 12 deletions crates/services/p2p/src/config/connection_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,25 @@ use super::{
peer_ids_set_from,
};
use crate::peer_manager::ConnectionState;
use fuel_core_services::seqlock::SeqLockReader;
use libp2p::{
Multiaddr,
PeerId,
};
use std::{
collections::HashSet,
sync::{
Arc,
RwLock,
},
};
use std::collections::HashSet;

/// A `ConnectionTracker` allows either Reserved Peers or other peers if there is an available slot.
/// It is synced with `PeerManager` which keeps track of the `ConnectionState`.
#[derive(Debug, Clone)]
pub(crate) struct ConnectionTracker {
reserved_nodes: HashSet<PeerId>,
connection_state: Option<Arc<RwLock<ConnectionState>>>,
connection_state: Option<SeqLockReader<ConnectionState>>,
}

impl ConnectionTracker {
pub(crate) fn new(
reserved_nodes: &[Multiaddr],
connection_state: Option<Arc<RwLock<ConnectionState>>>,
connection_state: Option<SeqLockReader<ConnectionState>>,
) -> Self {
Self {
reserved_nodes: peer_ids_set_from(reserved_nodes),
Expand All @@ -42,9 +37,7 @@ impl Approver for ConnectionTracker {
}

if let Some(connection_state) = &self.connection_state {
if let Ok(connection_state) = connection_state.read() {
return connection_state.available_slot()
}
return connection_state.read().available_slot();
}

false
Expand Down
7 changes: 5 additions & 2 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
},
heartbeat,
peer_manager::{
ConnectionState,
PeerManager,
Punisher,
},
Expand Down Expand Up @@ -225,7 +226,9 @@ impl FuelP2PService {
config.reserved_nodes = parse_multiaddrs(config.reserved_nodes).await?;

// configure and build P2P Service
let (transport_function, connection_state) = build_transport_function(&config);
let (connection_state_writer, connection_state_reader) = ConnectionState::new();
let transport_function =
build_transport_function(&config, connection_state_reader);
let tcp_config = tcp::Config::new().port_reuse(true);
let behaviour = FuelBehaviour::new(&config, codec.clone())?;

Expand Down Expand Up @@ -296,7 +299,7 @@ impl FuelP2PService {
peer_manager: PeerManager::new(
reserved_peers_updates,
reserved_peers,
connection_state,
connection_state_writer,
config.max_peers_connected as usize,
),
})
Expand Down
57 changes: 29 additions & 28 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
use crate::{
gossipsub_config::GRAYLIST_THRESHOLD,
peer_manager::heartbeat_data::HeartbeatData,
};
use fuel_core_services::seqlock::{
SeqLock,
SeqLockReader,
SeqLockWriter,
};
use fuel_core_types::{
fuel_types::BlockHeight,
services::p2p::peer_reputation::{
Expand All @@ -13,26 +22,15 @@ use libp2p::{
PeerId,
};
use rand::seq::IteratorRandom;
use std::{
collections::{
HashMap,
HashSet,
},
sync::{
Arc,
RwLock,
},
use std::collections::{
HashMap,
HashSet,
};
use tracing::{
debug,
info,
};

use crate::{
gossipsub_config::GRAYLIST_THRESHOLD,
peer_manager::heartbeat_data::HeartbeatData,
};

pub mod heartbeat_data;

/// At this point we better just ban the peer
Expand Down Expand Up @@ -65,7 +63,7 @@ pub struct PeerManager {
non_reserved_connected_peers: HashMap<PeerId, PeerInfo>,
reserved_connected_peers: HashMap<PeerId, PeerInfo>,
reserved_peers: HashSet<PeerId>,
connection_state: Arc<RwLock<ConnectionState>>,
connection_state_writer: SeqLockWriter<ConnectionState>,
max_non_reserved_peers: usize,
reserved_peers_updates: tokio::sync::broadcast::Sender<usize>,
}
Expand All @@ -74,15 +72,15 @@ impl PeerManager {
pub fn new(
reserved_peers_updates: tokio::sync::broadcast::Sender<usize>,
reserved_peers: HashSet<PeerId>,
connection_state: Arc<RwLock<ConnectionState>>,
connection_state_writer: SeqLockWriter<ConnectionState>,
max_non_reserved_peers: usize,
) -> Self {
Self {
score_config: ScoreConfig::default(),
non_reserved_connected_peers: HashMap::with_capacity(max_non_reserved_peers),
reserved_connected_peers: HashMap::with_capacity(reserved_peers.len()),
reserved_peers,
connection_state,
connection_state_writer,
max_non_reserved_peers,
reserved_peers_updates,
}
Expand Down Expand Up @@ -210,9 +208,9 @@ impl PeerManager {
{
// since all the slots were full prior to this disconnect
// let's allow new peer non-reserved peers connections
if let Ok(mut connection_state) = self.connection_state.write() {
connection_state.allow_new_peers();
}
self.connection_state_writer.write(|data| {
data.allow_new_peers();
});
}

false
Expand Down Expand Up @@ -258,9 +256,9 @@ impl PeerManager {
== self.max_non_reserved_peers
{
// this is the last non-reserved peer allowed
if let Ok(mut connection_state) = self.connection_state.write() {
connection_state.deny_new_peers();
}
self.connection_state_writer.write(|data| {
data.deny_new_peers();
});
}

self.non_reserved_connected_peers
Expand Down Expand Up @@ -313,10 +311,13 @@ pub struct ConnectionState {
}

impl ConnectionState {
pub fn new() -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self {
pub fn new() -> (
SeqLockWriter<ConnectionState>,
SeqLockReader<ConnectionState>,
) {
SeqLock::new(Self {
peers_allowed: true,
}))
})
}

pub fn available_slot(&self) -> bool {
Expand Down Expand Up @@ -399,14 +400,14 @@ mod tests {
reserved_peers: Vec<PeerId>,
max_non_reserved_peers: usize,
) -> PeerManager {
let connection_state = ConnectionState::new();
let (connection_state_writer, _) = ConnectionState::new();
let (sender, _) =
tokio::sync::broadcast::channel(reserved_peers.len().saturating_add(1));

PeerManager::new(
sender,
reserved_peers.into_iter().collect(),
connection_state,
connection_state_writer,
max_non_reserved_peers,
)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![deny(warnings)]

mod async_processor;
pub mod seqlock;
mod service;
mod state;
mod sync;
Expand Down Expand Up @@ -74,6 +75,11 @@ where
}

pub use async_processor::AsyncProcessor;
pub use seqlock::{
SeqLock,
SeqLockReader,
SeqLockWriter,
};
pub use service::{
EmptyShared,
RunnableService,
Expand Down
Loading
Loading