Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.0: fix: ensure vote packets can be retried (backport of #2605) #2612

Merged
merged 1 commit into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 81 additions & 33 deletions core/src/banking_stage/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
},
solana_vote_program::vote_instruction::VoteInstruction,
std::{
cmp,
collections::HashMap,
ops::DerefMut,
sync::{
Expand Down Expand Up @@ -166,12 +167,13 @@ impl LatestUnprocessedVotes {
pub(crate) fn insert_batch(
&self,
votes: impl Iterator<Item = LatestValidatorVotePacket>,
should_replenish_taken_votes: bool,
) -> VoteBatchInsertionMetrics {
let mut num_dropped_gossip = 0;
let mut num_dropped_tpu = 0;

for vote in votes {
if let Some(vote) = self.update_latest_vote(vote) {
if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) {
match vote.vote_source {
VoteSource::Gossip => num_dropped_gossip += 1,
VoteSource::Tpu => num_dropped_tpu += 1,
Expand Down Expand Up @@ -199,26 +201,41 @@ impl LatestUnprocessedVotes {
pub fn update_latest_vote(
&self,
vote: LatestValidatorVotePacket,
should_replenish_taken_votes: bool,
) -> Option<LatestValidatorVotePacket> {
let pubkey = vote.pubkey();
let slot = vote.slot();
let timestamp = vote.timestamp();

// Allow votes for later slots or the same slot with later timestamp (refreshed votes)
// We directly compare as options to prioritize votes for same slot with timestamp as
// Some > None
let allow_update = |latest_vote: &LatestValidatorVotePacket| -> bool {
match slot.cmp(&latest_vote.slot()) {
cmp::Ordering::Less => return false,
cmp::Ordering::Greater => return true,
cmp::Ordering::Equal => {}
};

// Slots are equal, now check timestamp
match timestamp.cmp(&latest_vote.timestamp()) {
cmp::Ordering::Less => return false,
cmp::Ordering::Greater => return true,
cmp::Ordering::Equal => {}
};

// Timestamps are equal, lastly check if vote was taken previously
// and should be replenished
should_replenish_taken_votes && latest_vote.is_vote_taken()
};

let with_latest_vote = |latest_vote: &RwLock<LatestValidatorVotePacket>,
vote: LatestValidatorVotePacket|
-> Option<LatestValidatorVotePacket> {
let (latest_slot, latest_timestamp) = latest_vote
.read()
.map(|vote| (vote.slot(), vote.timestamp()))
.unwrap();
// Allow votes for later slots or the same slot with later timestamp (refreshed votes)
// We directly compare as options to prioritize votes for same slot with timestamp as
// Some > None
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
let should_try_update = allow_update(&latest_vote.read().unwrap());
if should_try_update {
let mut latest_vote = latest_vote.write().unwrap();
let latest_slot = latest_vote.slot();
let latest_timestamp = latest_vote.timestamp();
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
if allow_update(&latest_vote) {
let old_vote = std::mem::replace(latest_vote.deref_mut(), vote);
if old_vote.is_vote_taken() {
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -536,10 +553,10 @@ mod tests {
);

assert!(latest_unprocessed_votes
.update_latest_vote(vote_a)
.update_latest_vote(vote_a, false /* should replenish */)
.is_none());
assert!(latest_unprocessed_votes
.update_latest_vote(vote_b)
.update_latest_vote(vote_b, false /* should replenish */)
.is_none());
assert_eq!(2, latest_unprocessed_votes.len());

Expand Down Expand Up @@ -569,15 +586,15 @@ mod tests {
assert_eq!(
1,
latest_unprocessed_votes
.update_latest_vote(vote_a)
.update_latest_vote(vote_a, false /* should replenish */)
.unwrap()
.slot
);
// Drop current vote
assert_eq!(
6,
latest_unprocessed_votes
.update_latest_vote(vote_b)
.update_latest_vote(vote_b, false /* should replenish */)
.unwrap()
.slot
);
Expand All @@ -597,8 +614,8 @@ mod tests {
&keypair_b,
None,
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand Down Expand Up @@ -627,8 +644,8 @@ mod tests {
&keypair_b,
Some(2),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -653,8 +670,8 @@ mod tests {
&keypair_b,
Some(6),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -679,8 +696,10 @@ mod tests {
&keypair_b,
Some(3),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes
.update_latest_vote(vote_a.clone(), false /* should replenish */);
latest_unprocessed_votes
.update_latest_vote(vote_b.clone(), false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -691,6 +710,33 @@ mod tests {
Some(6),
latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey())
);

// Drain all latest votes
for packet in latest_unprocessed_votes
.latest_votes_per_pubkey
.read()
.unwrap()
.values()
{
packet.write().unwrap().take_vote().inspect(|_vote| {
latest_unprocessed_votes
.num_unprocessed_votes
.fetch_sub(1, Ordering::Relaxed);
});
}
assert_eq!(0, latest_unprocessed_votes.len());

// Same votes with same timestamps should not replenish without flag
latest_unprocessed_votes
.update_latest_vote(vote_a.clone(), false /* should replenish */);
latest_unprocessed_votes
.update_latest_vote(vote_b.clone(), false /* should replenish */);
assert_eq!(0, latest_unprocessed_votes.len());

// Same votes with same timestamps should replenish with the flag
latest_unprocessed_votes.update_latest_vote(vote_a, true /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, true /* should replenish */);
assert_eq!(0, latest_unprocessed_votes.len());
}

#[test]
Expand All @@ -711,7 +757,7 @@ mod tests {
keypairs: &Arc<Vec<ValidatorVoteKeypairs>>,
i: usize| {
let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None);
latest_unprocessed_votes.update_latest_vote(vote);
latest_unprocessed_votes.update_latest_vote(vote, false /* should replenish */);
};

let hdl = Builder::new()
Expand Down Expand Up @@ -756,7 +802,8 @@ mod tests {
&keypairs[rng.gen_range(0..10)],
None,
);
latest_unprocessed_votes.update_latest_vote(vote);
latest_unprocessed_votes
.update_latest_vote(vote, false /* should replenish */);
}
})
.unwrap();
Expand All @@ -771,7 +818,8 @@ mod tests {
&keypairs_tpu[rng.gen_range(0..10)],
None,
);
latest_unprocessed_votes_tpu.update_latest_vote(vote);
latest_unprocessed_votes_tpu
.update_latest_vote(vote, false /* should replenish */);
if i % 214 == 0 {
// Simulate draining and processing packets
let latest_votes_per_pubkey = latest_unprocessed_votes_tpu
Expand Down Expand Up @@ -807,8 +855,8 @@ mod tests {

let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None);
let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

// Don't forward 0 stake accounts
let forwarded = latest_unprocessed_votes
Expand Down Expand Up @@ -902,10 +950,10 @@ mod tests {
let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None);
let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None);

latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_c);
latest_unprocessed_votes.update_latest_vote(vote_d);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_c, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_d, false /* should replenish */);
assert_eq!(4, latest_unprocessed_votes.len());

latest_unprocessed_votes.clear_forwarded_packets();
Expand Down
86 changes: 71 additions & 15 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,18 +442,18 @@ impl VoteStorage {
&mut self,
deserialized_packets: Vec<ImmutableDeserializedPacket>,
) -> VoteBatchInsertionMetrics {
self.latest_unprocessed_votes
.insert_batch(
deserialized_packets
.into_iter()
.filter_map(|deserialized_packet| {
LatestValidatorVotePacket::new_from_immutable(
Arc::new(deserialized_packet),
self.vote_source,
)
.ok()
}),
)
self.latest_unprocessed_votes.insert_batch(
deserialized_packets
.into_iter()
.filter_map(|deserialized_packet| {
LatestValidatorVotePacket::new_from_immutable(
Arc::new(deserialized_packet),
self.vote_source,
)
.ok()
}),
false, // should_replenish_taken_votes
)
}

fn filter_forwardable_packets_and_add_batches(
Expand Down Expand Up @@ -524,12 +524,15 @@ impl VoteStorage {
)
.ok()
}),
true, // should_replenish_taken_votes
);
} else {
self.latest_unprocessed_votes
.insert_batch(vote_packets.into_iter().filter_map(|packet| {
self.latest_unprocessed_votes.insert_batch(
vote_packets.into_iter().filter_map(|packet| {
LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok()
}));
}),
true, // should_replenish_taken_votes
);
}
}

Expand Down Expand Up @@ -998,6 +1001,7 @@ mod tests {
super::*,
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_perf::packet::{Packet, PacketFlags},
solana_runtime::genesis_utils,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -1266,6 +1270,58 @@ mod tests {
Ok(())
}

#[test]
fn test_process_packets_retryable_indexes_reinserted() -> Result<(), Box<dyn Error>> {
let node_keypair = Keypair::new();
let genesis_config =
genesis_utils::create_genesis_config_with_leader(100, &node_keypair.pubkey(), 200)
.genesis_config;
let (bank, _bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config);
let vote_keypair = Keypair::new();
let mut vote = Packet::from_data(
None,
new_tower_sync_transaction(
TowerSync::default(),
Hash::new_unique(),
&node_keypair,
&vote_keypair,
&vote_keypair,
None,
),
)?;
vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true);

let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage(
Arc::new(LatestUnprocessedVotes::new()),
VoteSource::Tpu,
);

transaction_storage.insert_batch(vec![ImmutableDeserializedPacket::new(vote.clone())?]);
assert_eq!(1, transaction_storage.len());

// When processing packets, return all packets as retryable so that they
// are reinserted into storage
let _ = transaction_storage.process_packets(
bank.clone(),
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
|packets, _payload| {
// Return all packets indexes as retryable
Some(
packets
.iter()
.enumerate()
.map(|(index, _packet)| index)
.collect_vec(),
)
},
);

// All packets should remain in the transaction storage
assert_eq!(1, transaction_storage.len());
Ok(())
}

#[test]
fn test_prepare_packets_to_forward() {
solana_logger::setup();
Expand Down