Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHERRYPICK TIKV]: PR 11782 TO RELEASE 5.2 #129

Open
wants to merge 1 commit into
base: raftstore-proxy-5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,10 @@ where
}
}
} else if key.term <= compacted_term
&& (key.idx < compacted_idx || key.idx == compacted_idx && !is_applying_snap)
&& (key.idx < compacted_idx
|| key.idx == compacted_idx
&& !is_applying_snap
&& !self.fsm.peer.pending_remove)
{
info!(
"deleting applied snap file";
Expand Down
138 changes: 137 additions & 1 deletion tests/failpoints/cases/test_snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,150 @@ fn test_destroy_peer_on_pending_snapshot() {
sleep_ms(100);

fail::remove(apply_snapshot_fp);

fail::remove(before_handle_normal_3_fp);

cluster.must_put(b"k120", b"v1");
// After peer 4 has applied snapshot, data should be got.
must_get_equal(&cluster.get_engine(3), b"k120", b"v1");
}

// The peer 3 in store 3 is isolated for a while and then recovered.
// During its applying snapshot, however the peer is destroyed and thus applying snapshot is canceled.
// And when it's destroyed (destroy is not finished either), the machine restarted.
// After the restart, the snapshot should be applied successfully.println!
// And new data should be written to store 3 successfully.
#[test]
fn test_destroy_peer_on_pending_snapshot_and_restart() {
let mut cluster = new_server_cluster(0, 3);
configure_for_snapshot(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
pd_client.must_add_peer(r1, new_peer(2, 2));
pd_client.must_add_peer(r1, new_peer(3, 3));

cluster.must_put(b"k1", b"v1");
// Ensure peer 3 is initialized.
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

cluster.must_transfer_leader(1, new_peer(1, 1));
let destroy_peer_fp = "destroy_peer_after_pending_move";
fail::cfg(destroy_peer_fp, "return(true)").unwrap();

cluster.add_send_filter(IsolationFilterFactory::new(3));

for i in 0..20 {
cluster.must_put(format!("k1{}", i).as_bytes(), b"v1");
}

// skip applying snapshot into RocksDB to keep peer status is Applying
let apply_snapshot_fp = "apply_pending_snapshot";
fail::cfg(apply_snapshot_fp, "return()").unwrap();

cluster.clear_send_filters();
// Wait for leader send snapshot.
sleep_ms(100);

// Don't send check stale msg to PD
let peer_check_stale_state_fp = "peer_check_stale_state";
fail::cfg(peer_check_stale_state_fp, "return()").unwrap();

pd_client.must_remove_peer(r1, new_peer(3, 3));
// Without it, pd_client.must_remove_peer does not trigger destroy_peer!
pd_client.must_add_peer(r1, new_peer(3, 4));

let before_handle_normal_3_fp = "before_handle_normal_3";
// to pause ApplyTaskRes::Destroy so that peer gc could finish
fail::cfg(before_handle_normal_3_fp, "pause").unwrap();
// Wait for leader send msg to peer 3.
// Then destroy peer 3
sleep_ms(100);

fail::remove(before_handle_normal_3_fp); // allow destroy run

// restart node 3
cluster.stop_node(3);
fail::remove(apply_snapshot_fp);
fail::remove(peer_check_stale_state_fp);
fail::remove(destroy_peer_fp);
cluster.run_node(3).unwrap();
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");
// After peer 3 has applied snapshot, data should be got.
must_get_equal(&cluster.get_engine(3), b"k119", b"v1");
// In the end the snapshot file should be gc-ed anyway, either by new peer or by store
let now = Instant::now();
loop {
let mut snap_files = vec![];
let snap_dir = cluster.get_snap_dir(3);
// snapfiles should be gc.
snap_files.extend(fs::read_dir(snap_dir).unwrap().map(|p| p.unwrap().path()));
if snap_files.is_empty() {
break;
}
if now.saturating_elapsed() > Duration::from_secs(5) {
panic!("snap files are not gc-ed");
}
sleep_ms(20);
}

cluster.must_put(b"k120", b"v1");
// new data should be replicated to peer 4 in store 3
must_get_equal(&cluster.get_engine(3), b"k120", b"v1");
}

// This test is to repro the issue #11618.
// Basically it aborts a snapshot and wait for an election done. (without fix, raft will panic)
// The test step is make peer 3 partitioned with rest.
// And then recover from partition and the leader will try to send a snapshot to peer3.
// Abort the snapshot and then wait for a election happening, we expect raft will panic
#[test]
fn test_abort_snapshot_and_wait_election() {
let mut cluster = new_server_cluster(0, 3);
cluster.cfg.raft_store.raft_base_tick_interval = ReadableDuration::millis(10);
cluster.cfg.raft_store.raft_election_timeout_ticks = 25; // > lease 240ms
cluster.cfg.raft_store.hibernate_regions = false;
configure_for_snapshot(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
pd_client.must_add_peer(r1, new_peer(2, 2));
pd_client.must_add_peer(r1, new_peer(3, 1003));

cluster.must_put(b"k1", b"v1");
let region = cluster.get_region(b"k1");
// Ensure peer 3 is initialized.
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

cluster.must_transfer_leader(1, new_peer(1, 1));

let apply_snapshot_fp = "region_apply_snap_abort";
fail::cfg(apply_snapshot_fp, "return()").unwrap();

cluster.add_send_filter(IsolationFilterFactory::new(3));
for i in 0..20 {
cluster.must_put(format!("k1{}", i).as_bytes(), b"v1");
}
// Wait for leader send snapshot.
let (sx, rx) = mpsc::sync_channel::<bool>(10);
let recv_snapshot_filter = RegionPacketFilter::new(region.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgSnapshot)
.allow(1)
.set_msg_callback(Arc::new(move |_| {
sx.send(true).unwrap();
}));
cluster.add_recv_filter(CloneFilterFactory(recv_snapshot_filter));

cluster.clear_send_filters(); // allow snapshot to sent over to peer 3
rx.recv().unwrap(); // got the snapshot message
cluster.add_send_filter(IsolationFilterFactory::new(3)); // partition the peer 3 again
sleep_ms(500); // wait for election happen and expect raft will panic
cluster.clear_send_filters();
cluster.clear_recv_filters();
}

#[test]
fn test_shutdown_when_snap_gc() {
let mut cluster = new_node_cluster(0, 2);
Expand Down