Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(txpool|p2p): use seqlock instead of small copy-able RwLocks #2524

Merged
merged 18 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2472](https://github.com/FuelLabs/fuel-core/pull/2472): Added the `amountU128` field to the `Balance` GraphQL schema, providing the total balance as a `U128`. The existing `amount` field clamps any balance exceeding `U64` to `u64::MAX`.
- [2526](https://github.com/FuelLabs/fuel-core/pull/2526): Add possibility to not have any cache set for RocksDB. Add an option to either load the RocksDB columns families on creation of the database or when the column is used.
- [2532](https://github.com/FuelLabs/fuel-core/pull/2532): Getters for inner rocksdb database handles.
- [2524](https://github.com/FuelLabs/fuel-core/pull/2524): Adds a new lock type which is optimized for certain workloads to the txpool and p2p services.

### Fixed
- [2365](https://github.com/FuelLabs/fuel-core/pull/2365): Fixed the error during dry run in the case of race condition.
Expand Down
19 changes: 8 additions & 11 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ use crate::{
};
use fuel_core_types::blockchain::consensus::Genesis;

use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::Checksum,
};
use fuel_core_services::seqlock::SeqLock;
use libp2p::{
gossipsub,
identity::{
Expand All @@ -22,18 +28,9 @@ use std::{
IpAddr,
Ipv4Addr,
},
sync::{
Arc,
RwLock,
},
sync::Arc,
time::Duration,
};

use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::Checksum,
};
mod connection_tracker;
mod fuel_authenticated;
pub(crate) mod fuel_upgrade;
Expand Down Expand Up @@ -260,7 +257,7 @@ pub(crate) fn build_transport_function(
p2p_config: &Config,
) -> (
impl FnOnce(&Keypair) -> Result<FuelAuthenticated<ConnectionTracker>, ()> + '_,
Arc<RwLock<ConnectionState>>,
Arc<SeqLock<ConnectionState>>,
) {
let connection_state = ConnectionState::new();
let kept_connection_state = connection_state.clone();
Expand Down
14 changes: 5 additions & 9 deletions crates/services/p2p/src/config/connection_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@ use super::{
peer_ids_set_from,
};
use crate::peer_manager::ConnectionState;
use fuel_core_services::seqlock::SeqLock;
use libp2p::{
Multiaddr,
PeerId,
};
use std::{
collections::HashSet,
sync::{
Arc,
RwLock,
},
sync::Arc,
};

/// A `ConnectionTracker` allows either Reserved Peers or other peers if there is an available slot.
/// It is synced with `PeerManager` which keeps track of the `ConnectionState`.
#[derive(Debug, Clone)]
pub(crate) struct ConnectionTracker {
reserved_nodes: HashSet<PeerId>,
connection_state: Option<Arc<RwLock<ConnectionState>>>,
connection_state: Option<Arc<SeqLock<ConnectionState>>>,
}

impl ConnectionTracker {
pub(crate) fn new(
reserved_nodes: &[Multiaddr],
connection_state: Option<Arc<RwLock<ConnectionState>>>,
connection_state: Option<Arc<SeqLock<ConnectionState>>>,
) -> Self {
Self {
reserved_nodes: peer_ids_set_from(reserved_nodes),
Expand All @@ -42,9 +40,7 @@ impl Approver for ConnectionTracker {
}

if let Some(connection_state) = &self.connection_state {
if let Ok(connection_state) = connection_state.read() {
return connection_state.available_slot()
}
return connection_state.read().available_slot();
}

false
Expand Down
35 changes: 16 additions & 19 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use crate::{
gossipsub_config::GRAYLIST_THRESHOLD,
peer_manager::heartbeat_data::HeartbeatData,
};
use fuel_core_services::seqlock::SeqLock;
use fuel_core_types::{
fuel_types::BlockHeight,
services::p2p::peer_reputation::{
Expand All @@ -18,21 +23,13 @@ use std::{
HashMap,
HashSet,
},
sync::{
Arc,
RwLock,
},
sync::Arc,
};
use tracing::{
debug,
info,
};

use crate::{
gossipsub_config::GRAYLIST_THRESHOLD,
peer_manager::heartbeat_data::HeartbeatData,
};

pub mod heartbeat_data;

/// At this point we better just ban the peer
Expand Down Expand Up @@ -65,7 +62,7 @@ pub struct PeerManager {
non_reserved_connected_peers: HashMap<PeerId, PeerInfo>,
reserved_connected_peers: HashMap<PeerId, PeerInfo>,
reserved_peers: HashSet<PeerId>,
connection_state: Arc<RwLock<ConnectionState>>,
connection_state: Arc<SeqLock<ConnectionState>>,
max_non_reserved_peers: usize,
reserved_peers_updates: tokio::sync::broadcast::Sender<usize>,
}
Expand All @@ -74,7 +71,7 @@ impl PeerManager {
pub fn new(
reserved_peers_updates: tokio::sync::broadcast::Sender<usize>,
reserved_peers: HashSet<PeerId>,
connection_state: Arc<RwLock<ConnectionState>>,
connection_state: Arc<SeqLock<ConnectionState>>,
max_non_reserved_peers: usize,
) -> Self {
Self {
Expand Down Expand Up @@ -210,9 +207,9 @@ impl PeerManager {
{
// since all the slots were full prior to this disconnect
// let's allow new peer non-reserved peers connections
if let Ok(mut connection_state) = self.connection_state.write() {
connection_state.allow_new_peers();
}
self.connection_state.write(|data| {
data.allow_new_peers();
});
}

false
Expand Down Expand Up @@ -258,9 +255,9 @@ impl PeerManager {
== self.max_non_reserved_peers
{
// this is the last non-reserved peer allowed
if let Ok(mut connection_state) = self.connection_state.write() {
connection_state.deny_new_peers();
}
self.connection_state.write(|data| {
data.deny_new_peers();
});
}

self.non_reserved_connected_peers
Expand Down Expand Up @@ -313,8 +310,8 @@ pub struct ConnectionState {
}

impl ConnectionState {
pub fn new() -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self {
pub fn new() -> Arc<SeqLock<Self>> {
Arc::new(SeqLock::new(Self {
peers_allowed: true,
}))
}
Expand Down
1 change: 1 addition & 0 deletions crates/services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![deny(warnings)]

mod async_processor;
pub mod seqlock;
mod service;
mod state;
mod sync;
Expand Down
132 changes: 132 additions & 0 deletions crates/services/src/seqlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//! A simple implementation of a sequential lock.
//! More details: <https://docs.kernel.org/locking/seqlock.html>
//! Optimized for occasional writes and frequent reads
//! !!WARNING!!
//! ONLY USE IF ALL THE BELOW ARE MET
//! 1. Internal data < 64 bytes
//! 2. Data is Copy
//! 3. ONLY 1 writer
//! 4. VERY frequent reads

use std::{
cell::UnsafeCell,
sync::atomic::{
AtomicU64,
Ordering,
},
};

/// A simple implementation of a sequential lock.
/// some usage of unsafe, T must be Copy
#[derive(Debug)]
pub struct SeqLock<T> {
rymnc marked this conversation as resolved.
Show resolved Hide resolved
sequence: AtomicU64,
data: UnsafeCell<T>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could create a trait for this T to guarantee that it's below 64 bytes and provide some default impls for the types we expect to see.

/// Type is equal to or smaller than 64 bytes
pub trait Small;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(chiming in) perhaps Small should imply Copy in this case: pub trait Small: Copy;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't quite like the idea of creating default impls here ~ we won't be able to implement the trait for types outside the crate either due to the orphan rule.

I can enforce the type size at runtime if you'd like.

}

unsafe impl<T: Send> Sync for SeqLock<T> {}

impl<T> SeqLock<T> {
/// Create a new SeqLock with the given data
pub fn new(data: T) -> Self {
Self {
sequence: AtomicU64::new(0),
data: UnsafeCell::new(data),
}
}

/// Write the data
pub fn write<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
// starting guard
self.sequence.fetch_add(1, Ordering::Release);

// Modify the data
unsafe {
f(&mut *self.data.get());
}

// ending guard
self.sequence.fetch_add(1, Ordering::Release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if two writers try to update the value concurrently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well they shouldn't
this is designed for single writer only
there's a lot of foot guns with this implementation, and makes it highly specific for one use case -

  1. single writer
  2. small data structure
  3. heavy reads

I can add as a warning comment if you'd like

I want to avoid write locks because this assumes only a single writer exists

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lmk if this comment works: 118c37a

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, the comment looks good :)

Copy link
Member

@MitchTurner MitchTurner Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we guarantee that only a single writer exists? Like, on construction we just generate a SeqLockWriter and from that we can generate any number of SeqLockReaders, but it's SeqLockWriter isn't cloneable?

Essentially like a single producer, multiple consumer channel.

Internally, it's the same, but we expose different mut apis for the SeqLockWriter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 720d486

}

/// Read the data
pub fn read(&self) -> T
where
T: Copy,
{
loop {
// check starting guard
let start = self.sequence.load(Ordering::Acquire);

// if odd, write in progress
if start % 2 != 0 {
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 51d079c

}

let data = unsafe { *self.data.get() };

// check starting/ending guard
let end = self.sequence.load(Ordering::Acquire);

// if value changed, retry
if start == end && start % 2 == 0 {
return data;
}
acerone85 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

#[allow(non_snake_case)]
#[cfg(test)]
mod tests {
use super::*;
use std::{
sync::Arc,
thread,
};

#[test]
fn test_seqlock__provides_correct_values_in_order() {
let lock = Arc::new(SeqLock::new(42));
let iterations = 100;

let writer = {
let lock = lock.clone();
thread::spawn(move || {
for i in 0..iterations {
lock.write(|data| *data = i);
}
})
};

let reader = {
let lock = lock.clone();
thread::spawn(move || {
let seen = 0;

for _ in 0..iterations {
let value = lock.read();
assert!(value >= seen);
}
})
};

writer.join().unwrap();
reader.join().unwrap();
}

#[test]
fn test_seqlock__single_threaded() {
let lock = SeqLock::new(42);

lock.write(|data| {
*data = 100;
});

let value = lock.read();
assert_eq!(value, 100);
}
}
12 changes: 7 additions & 5 deletions crates/services/txpool_v2/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use fuel_core_services::TaskNextAction;

use fuel_core_metrics::txpool_metrics::txpool_metrics;
use fuel_core_services::{
seqlock::SeqLock,
AsyncProcessor,
RunnableService,
RunnableTask,
Expand Down Expand Up @@ -188,7 +189,7 @@ pub struct Task<View> {
p2p_sync_process: AsyncProcessor,
pruner: TransactionPruner,
pool: Shared<TxPool>,
current_height: Shared<BlockHeight>,
current_height: Arc<SeqLock<BlockHeight>>,
tx_sync_history: Shared<HashSet<PeerId>>,
shared_state: SharedState,
metrics: bool,
Expand Down Expand Up @@ -320,8 +321,9 @@ where
}

{
let mut block_height = self.current_height.write();
*block_height = new_height;
self.current_height.write(|data| {
*data = new_height;
});
}
}

Expand Down Expand Up @@ -396,7 +398,7 @@ where
let utxo_validation = self.utxo_validation;

let insert_transaction_thread_pool_op = move || {
let current_height = *current_height.read();
let current_height = current_height.read();

// TODO: This should be removed if the checked transactions
// can work with Arc in it
Expand Down Expand Up @@ -802,7 +804,7 @@ where
p2p_sync_process,
pruner,
p2p: Arc::new(p2p),
current_height: Arc::new(RwLock::new(current_height)),
current_height: Arc::new(SeqLock::new(current_height)),
pool: Arc::new(RwLock::new(txpool)),
shared_state,
metrics,
Expand Down
Loading