Skip to content

Commit

Permalink
feat(txpool|p2p): use seqlock instead of small copy-able RwLocks (#2524)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
<!-- List of related issues/PRs -->
- none

## Description
<!-- List of detailed changes -->
We utilize [SeqLock](https://docs.kernel.org/locking/seqlock.html)
instead of an RwLock for data that satisfies the following conditions -

1. Occasional writes
2. Frequent reads

The `write` method no longer has a hot loop, and therefore writes aren't
blocked by reads. we don't use an internal locking mechanism, but rather
a sequence counter to check if data was changed before/during/after a
read. This may result in a few iterations while reading, but it is
minimal.

We should have a benchmark for txpool to see the performance difference.

It is important to note that this implementation *assumes* that there
will only be 1 writer thread.

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?

---------

Co-authored-by: AurelienFT <[email protected]>
  • Loading branch information
rymnc and AurelienFT authored Jan 13, 2025
1 parent 4783c2f commit b2c0b30
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2472](https://github.com/FuelLabs/fuel-core/pull/2472): Added the `amountU128` field to the `Balance` GraphQL schema, providing the total balance as a `U128`. The existing `amount` field clamps any balance exceeding `U64` to `u64::MAX`.
- [2526](https://github.com/FuelLabs/fuel-core/pull/2526): Add possibility to not have any cache set for RocksDB. Add an option to either load the RocksDB columns families on creation of the database or when the column is used.
- [2532](https://github.com/FuelLabs/fuel-core/pull/2532): Getters for inner rocksdb database handles.
- [2524](https://github.com/FuelLabs/fuel-core/pull/2524): Adds a new lock type which is optimized for certain workloads to the txpool and p2p services.

### Fixed
- [2365](https://github.com/FuelLabs/fuel-core/pull/2365): Fixed the error during dry run in the case of race condition.
Expand Down
32 changes: 11 additions & 21 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ use crate::{
};
use fuel_core_types::blockchain::consensus::Genesis;

use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::Checksum,
};
use fuel_core_services::seqlock::SeqLockReader;
use libp2p::{
gossipsub,
identity::{
Expand All @@ -22,18 +28,8 @@ use std::{
IpAddr,
Ipv4Addr,
},
sync::{
Arc,
RwLock,
},
time::Duration,
};

use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::Checksum,
};
mod connection_tracker;
mod fuel_authenticated;
pub(crate) mod fuel_upgrade;
Expand Down Expand Up @@ -258,20 +254,16 @@ impl Config<Initialized> {
/// mplex or yamux for multiplexing
pub(crate) fn build_transport_function(
p2p_config: &Config,
) -> (
impl FnOnce(&Keypair) -> Result<FuelAuthenticated<ConnectionTracker>, ()> + '_,
Arc<RwLock<ConnectionState>>,
) {
let connection_state = ConnectionState::new();
let kept_connection_state = connection_state.clone();
let transport_function = move |keypair: &Keypair| {
connection_state_reader: SeqLockReader<ConnectionState>,
) -> impl FnOnce(&Keypair) -> Result<FuelAuthenticated<ConnectionTracker>, ()> + '_ {
move |keypair: &Keypair| {
let noise_authenticated =
noise::Config::new(keypair).expect("Noise key generation failed");

let connection_state = if p2p_config.reserved_nodes_only_mode {
None
} else {
Some(connection_state)
Some(connection_state_reader.clone())
};

let connection_tracker =
Expand All @@ -282,9 +274,7 @@ pub(crate) fn build_transport_function(
connection_tracker,
p2p_config.checksum,
))
};

(transport_function, kept_connection_state)
}
}

fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet<PeerId> {
Expand Down
17 changes: 5 additions & 12 deletions crates/services/p2p/src/config/connection_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,25 @@ use super::{
peer_ids_set_from,
};
use crate::peer_manager::ConnectionState;
use fuel_core_services::seqlock::SeqLockReader;
use libp2p::{
Multiaddr,
PeerId,
};
use std::{
collections::HashSet,
sync::{
Arc,
RwLock,
},
};
use std::collections::HashSet;

/// A `ConnectionTracker` allows either Reserved Peers or other peers if there is an available slot.
/// It is synced with `PeerManager` which keeps track of the `ConnectionState`.
#[derive(Debug, Clone)]
pub(crate) struct ConnectionTracker {
reserved_nodes: HashSet<PeerId>,
connection_state: Option<Arc<RwLock<ConnectionState>>>,
connection_state: Option<SeqLockReader<ConnectionState>>,
}

impl ConnectionTracker {
pub(crate) fn new(
reserved_nodes: &[Multiaddr],
connection_state: Option<Arc<RwLock<ConnectionState>>>,
connection_state: Option<SeqLockReader<ConnectionState>>,
) -> Self {
Self {
reserved_nodes: peer_ids_set_from(reserved_nodes),
Expand All @@ -42,9 +37,7 @@ impl Approver for ConnectionTracker {
}

if let Some(connection_state) = &self.connection_state {
if let Ok(connection_state) = connection_state.read() {
return connection_state.available_slot()
}
return connection_state.read().available_slot();
}

false
Expand Down
7 changes: 5 additions & 2 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
},
heartbeat,
peer_manager::{
ConnectionState,
PeerManager,
Punisher,
},
Expand Down Expand Up @@ -225,7 +226,9 @@ impl FuelP2PService {
config.reserved_nodes = parse_multiaddrs(config.reserved_nodes).await?;

// configure and build P2P Service
let (transport_function, connection_state) = build_transport_function(&config);
let (connection_state_writer, connection_state_reader) = ConnectionState::new();
let transport_function =
build_transport_function(&config, connection_state_reader);
let tcp_config = tcp::Config::new().port_reuse(true);
let behaviour = FuelBehaviour::new(&config, codec.clone())?;

Expand Down Expand Up @@ -296,7 +299,7 @@ impl FuelP2PService {
peer_manager: PeerManager::new(
reserved_peers_updates,
reserved_peers,
connection_state,
connection_state_writer,
config.max_peers_connected as usize,
),
})
Expand Down
62 changes: 33 additions & 29 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
use crate::{
gossipsub_config::GRAYLIST_THRESHOLD,
peer_manager::heartbeat_data::HeartbeatData,
};
use fuel_core_services::seqlock::{
SeqLock,
SeqLockReader,
SeqLockWriter,
};
use fuel_core_types::{
fuel_types::BlockHeight,
services::p2p::peer_reputation::{
Expand All @@ -13,26 +22,15 @@ use libp2p::{
PeerId,
};
use rand::seq::IteratorRandom;
use std::{
collections::{
HashMap,
HashSet,
},
sync::{
Arc,
RwLock,
},
use std::collections::{
HashMap,
HashSet,
};
use tracing::{
debug,
info,
};

use crate::{
gossipsub_config::GRAYLIST_THRESHOLD,
peer_manager::heartbeat_data::HeartbeatData,
};

pub mod heartbeat_data;

/// At this point we better just ban the peer
Expand Down Expand Up @@ -65,7 +63,7 @@ pub struct PeerManager {
non_reserved_connected_peers: HashMap<PeerId, PeerInfo>,
reserved_connected_peers: HashMap<PeerId, PeerInfo>,
reserved_peers: HashSet<PeerId>,
connection_state: Arc<RwLock<ConnectionState>>,
connection_state_writer: SeqLockWriter<ConnectionState>,
max_non_reserved_peers: usize,
reserved_peers_updates: tokio::sync::broadcast::Sender<usize>,
}
Expand All @@ -74,15 +72,15 @@ impl PeerManager {
pub fn new(
reserved_peers_updates: tokio::sync::broadcast::Sender<usize>,
reserved_peers: HashSet<PeerId>,
connection_state: Arc<RwLock<ConnectionState>>,
connection_state_writer: SeqLockWriter<ConnectionState>,
max_non_reserved_peers: usize,
) -> Self {
Self {
score_config: ScoreConfig::default(),
non_reserved_connected_peers: HashMap::with_capacity(max_non_reserved_peers),
reserved_connected_peers: HashMap::with_capacity(reserved_peers.len()),
reserved_peers,
connection_state,
connection_state_writer,
max_non_reserved_peers,
reserved_peers_updates,
}
Expand Down Expand Up @@ -210,9 +208,9 @@ impl PeerManager {
{
// since all the slots were full prior to this disconnect
// let's allow new peer non-reserved peers connections
if let Ok(mut connection_state) = self.connection_state.write() {
connection_state.allow_new_peers();
}
self.connection_state_writer.write(|data| {
data.allow_new_peers();
});
}

false
Expand Down Expand Up @@ -258,9 +256,9 @@ impl PeerManager {
== self.max_non_reserved_peers
{
// this is the last non-reserved peer allowed
if let Ok(mut connection_state) = self.connection_state.write() {
connection_state.deny_new_peers();
}
self.connection_state_writer.write(|data| {
data.deny_new_peers();
});
}

self.non_reserved_connected_peers
Expand Down Expand Up @@ -313,10 +311,16 @@ pub struct ConnectionState {
}

impl ConnectionState {
pub fn new() -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self {
peers_allowed: true,
}))
pub fn new() -> (
SeqLockWriter<ConnectionState>,
SeqLockReader<ConnectionState>,
) {
// ConnectionState < 64 bytes, so it's safe to use SeqLock
unsafe {
SeqLock::new(Self {
peers_allowed: true,
})
}
}

pub fn available_slot(&self) -> bool {
Expand Down Expand Up @@ -399,14 +403,14 @@ mod tests {
reserved_peers: Vec<PeerId>,
max_non_reserved_peers: usize,
) -> PeerManager {
let connection_state = ConnectionState::new();
let (connection_state_writer, _) = ConnectionState::new();
let (sender, _) =
tokio::sync::broadcast::channel(reserved_peers.len().saturating_add(1));

PeerManager::new(
sender,
reserved_peers.into_iter().collect(),
connection_state,
connection_state_writer,
max_non_reserved_peers,
)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![deny(warnings)]

mod async_processor;
pub mod seqlock;
mod service;
mod state;
mod sync;
Expand Down Expand Up @@ -74,6 +75,11 @@ where
}

pub use async_processor::AsyncProcessor;
pub use seqlock::{
SeqLock,
SeqLockReader,
SeqLockWriter,
};
pub use service::{
EmptyShared,
RunnableService,
Expand Down
Loading

0 comments on commit b2c0b30

Please sign in to comment.