Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.2: remove duplicates from fanout leaders (backport of #5109) #5138

Open
wants to merge 1 commit into
base: v2.2
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,29 +191,28 @@ impl ConnectionWorkersScheduler {
}
};

let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect);
let connect_leaders = leader_updater.next_leaders(leaders_fanout.connect);
let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send);

let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);
// add future leaders to the cache to hide the latency of opening
// the connection.
for peer in connect_leaders {
if !workers.contains(peer) {
if !workers.contains(&peer) {
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
let worker = Self::spawn_worker(
&endpoint,
peer,
&peer,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),
);
maybe_shutdown_worker(workers.push(*peer, worker));
maybe_shutdown_worker(workers.push(peer, worker));
}
}

if let Err(error) =
Broadcaster::send_to_workers(&mut workers, fanout_leaders, transaction_batch).await
Broadcaster::send_to_workers(&mut workers, &send_leaders, transaction_batch).await
{
last_error = Some(error);
break;
Expand Down Expand Up @@ -309,21 +308,23 @@ impl WorkersBroadcaster for NonblockingBroadcaster {
}
}

/// Splits `leaders` into two slices based on the `fanout` configuration:
/// * the first slice contains the leaders to which transactions will be sent,
/// * the second vector contains the leaders, used to warm up connections. This
/// slice includes the first set.
fn split_leaders<'leaders>(
leaders: &'leaders [SocketAddr],
fanout: &Fanout,
) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) {
let Fanout { send, connect } = fanout;
assert!(send <= connect);
let send_count = (*send).min(leaders.len());
let connect_count = (*connect).min(leaders.len());

let send_slice = &leaders[..send_count];
let connect_slice = &leaders[..connect_count];

(send_slice, connect_slice)
/// Extracts a list of unique leader addresses to which transactions will be sent.
///
/// This function selects up to `send_fanout` addresses from the `leaders` list, ensuring that
/// only unique addresses are included while maintaining their original order.
fn extract_send_leaders(leaders: &[SocketAddr], send_fanout: usize) -> Vec<SocketAddr> {
let send_count = send_fanout.min(leaders.len());
remove_duplicates(&leaders[..send_count])
}

/// Removes duplicate `SocketAddr` elements from the given slice while
/// preserving their original order.
fn remove_duplicates(input: &[SocketAddr]) -> Vec<SocketAddr> {
let mut res = Vec::with_capacity(input.len());
for address in input {
if !res.contains(address) {
res.push(*address);
}
}
res
}
Loading