From 12c105db7b696325387e29ab4eba03a750ae188e Mon Sep 17 00:00:00 2001
From: CalvinNeo <calvinneo1995@gmail.com>
Date: Mon, 29 Aug 2022 22:55:09 +0800
Subject: [PATCH] add test_snap_restart

Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
---
 .../raftstore/src/coprocessor/dispatcher.rs   |  24 ++--
 components/raftstore/src/coprocessor/mod.rs   |   5 +-
 .../src/engine_store_ffi/observer.rs          |  10 +-
 .../raftstore/src/store/worker/region.rs      |  14 ++-
 new-mock-engine-store/src/lib.rs              |   6 -
 tests/proxy/normal.rs                         | 104 ++++++++++++++++--
 6 files changed, 127 insertions(+), 36 deletions(-)

diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs
index bd65c37b9b7..f8b48031816 100644
--- a/components/raftstore/src/coprocessor/dispatcher.rs
+++ b/components/raftstore/src/coprocessor/dispatcher.rs
@@ -537,15 +537,16 @@ impl<E: KvEngine> CoprocessorHost<E> {
         peer_id: u64,
         snap_key: &crate::store::SnapKey,
         snap: Option<&crate::store::Snapshot>,
-    ) {
-        loop_ob!(
-            region,
-            &self.registry.apply_snapshot_observers,
-            post_apply_snapshot,
-            peer_id,
-            snap_key,
-            snap,
-        );
+    ) -> Result<()> {
+        let mut ctx = ObserverContext::new(region);
+        for observer in &self.registry.apply_snapshot_observers {
+            let observer = observer.observer.inner();
+            let res = observer.post_apply_snapshot(&mut ctx, peer_id, snap_key, snap);
+            if res.is_err() {
+                return res;
+            }
+        }
+        Ok(())
     }
 
     pub fn new_split_checker_host<'a>(
@@ -909,10 +910,11 @@ mod tests {
             _: u64,
             _: &crate::store::SnapKey,
             _: Option<&Snapshot>,
-        ) {
+        ) -> Result<()> {
             self.called
                 .fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst);
             ctx.bypass = self.bypass.load(Ordering::SeqCst);
+            Ok(())
         }
 
         fn should_pre_apply_snapshot(&self) -> bool {
@@ -1072,7 +1074,7 @@ mod tests {
         index += ObserverIndex::PreApplySnapshot as usize;
         assert_all!([&ob.called], &[index]);
 
-        host.post_apply_snapshot(&region, 0, &key, None);
+        let _ = host.post_apply_snapshot(&region, 0, &key, None);
         index += ObserverIndex::PostApplySnapshot as usize;
         assert_all!([&ob.called], &[index]);
 
diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs
index 0093e2e473f..9f657d9a3a7 100644
--- a/components/raftstore/src/coprocessor/mod.rs
+++ b/components/raftstore/src/coprocessor/mod.rs
@@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole};
 pub mod config;
 mod consistency_check;
 pub mod dispatcher;
-mod error;
+pub mod error;
 mod metrics;
 pub mod region_info_accessor;
 mod split_check;
@@ -198,7 +198,8 @@ pub trait ApplySnapshotObserver: Coprocessor {
         _: u64,
         _: &crate::store::SnapKey,
         _snapshot: Option<&crate::store::Snapshot>,
-    ) {
+    ) -> Result<()> {
+        Ok(())
     }
 
     /// We call pre_apply_snapshot only when one of the observer returns true.
diff --git a/components/raftstore/src/engine_store_ffi/observer.rs b/components/raftstore/src/engine_store_ffi/observer.rs
index 31621b1bf94..b686499b340 100644
--- a/components/raftstore/src/engine_store_ffi/observer.rs
+++ b/components/raftstore/src/engine_store_ffi/observer.rs
@@ -28,6 +28,7 @@ use yatp::{
 };
 
 use crate::{
+    coprocessor,
     coprocessor::{
         AdminObserver, ApplyCtxInfo, ApplySnapshotObserver, BoxAdminObserver,
         BoxApplySnapshotObserver, BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver,
@@ -698,15 +699,17 @@ impl ApplySnapshotObserver for TiFlashObserver {
         peer_id: u64,
         snap_key: &crate::store::SnapKey,
         snap: Option<&crate::store::Snapshot>,
-    ) {
-        fail::fail_point!("on_ob_post_apply_snapshot", |_| {});
+    ) -> std::result::Result<(), coprocessor::error::Error> {
+        fail::fail_point!("on_ob_post_apply_snapshot", |_| {
+            return Err(box_err!("on_ob_post_apply_snapshot"));
+        });
         info!("post apply snapshot";
             "peer_id" => ?peer_id,
             "snap_key" => ?snap_key,
             "region" => ?ob_ctx.region(),
         );
         let snap = match snap {
-            None => return,
+            None => return Ok(()),
             Some(s) => s,
         };
         let maybe_snapshot = {
@@ -767,6 +770,7 @@ impl ApplySnapshotObserver for TiFlashObserver {
             self.engine_store_server_helper
                 .apply_pre_handled_snapshot(ptr.0);
         }
+        Ok(())
     }
 
     fn should_pre_apply_snapshot(&self) -> bool {
diff --git a/components/raftstore/src/store/worker/region.rs b/components/raftstore/src/store/worker/region.rs
index 753176b7deb..7d14778c63e 100644
--- a/components/raftstore/src/store/worker/region.rs
+++ b/components/raftstore/src/store/worker/region.rs
@@ -420,8 +420,15 @@ where
             coprocessor_host: self.coprocessor_host.clone(),
         };
         s.apply(options)?;
-        self.coprocessor_host
-            .post_apply_snapshot(&region, peer_id, &snap_key, Some(&s));
+        match self
+            .coprocessor_host
+            .post_apply_snapshot(&region, peer_id, &snap_key, Some(&s))
+        {
+            Ok(_) => (),
+            Err(e) => {
+                return Err(box_err!("post apply snapshot error {:?}", e));
+            }
+        };
 
         let mut wb = self.engine.write_batch();
         region_state.set_state(PeerState::Normal);
@@ -1412,7 +1419,7 @@ mod tests {
             peer_id: u64,
             key: &crate::store::SnapKey,
             snapshot: Option<&crate::store::Snapshot>,
-        ) {
+        ) -> std::result::Result<(), crate::coprocessor::error::Error> {
             let code = snapshot.unwrap().total_size().unwrap()
                 + key.term
                 + key.region_id
@@ -1421,6 +1428,7 @@ mod tests {
             self.post_apply_count.fetch_add(1, Ordering::SeqCst);
             self.post_apply_hash
                 .fetch_add(code as usize, Ordering::SeqCst);
+            Ok(())
         }
 
         fn should_pre_apply_snapshot(&self) -> bool {
diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs
index 3397e7ad87a..e1f074ec835 100644
--- a/new-mock-engine-store/src/lib.rs
+++ b/new-mock-engine-store/src/lib.rs
@@ -235,17 +235,11 @@ unsafe fn load_from_db(store: &mut EngineStoreServer, region_id: u64) {
                     "region_id" => region_id,
                     "cf" => cf,
                     "k" => ?k,
-                    "v" => ?v,
                 );
                 Ok(true)
             })
             .unwrap();
     }
-    debug!("after restore";
-        "store" => store_id,
-        "region_id" => region_id,
-        "default size" => region.data[2].len(),
-    );
 }
 
 unsafe fn write_to_db_data(
diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs
index 8d9e64f65e4..aa494664485 100644
--- a/tests/proxy/normal.rs
+++ b/tests/proxy/normal.rs
@@ -1157,16 +1157,100 @@ mod ingest {
 mod restart {
     use super::*;
     #[test]
-    fn test_restart() {
+    fn test_snap_restart() {
+        let (mut cluster, pd_client) = new_mock_cluster(0, 3);
+
+        fail::cfg("on_can_apply_snapshot", "return(true)").unwrap();
+        disable_auto_gen_compact_log(&mut cluster);
+        cluster.cfg.raft_store.max_snapshot_file_raw_size = ReadableSize(u64::MAX);
+
+        // Disable default max peer count check.
+        pd_client.disable_default_operator();
+        let r1 = cluster.run_conf_change();
+
+        let first_value = vec![0; 10240];
+        for i in 0..10 {
+            let key = format!("{:03}", i);
+            cluster.must_put(key.as_bytes(), &first_value);
+        }
+        let first_key: &[u8] = b"000";
+
+        let eng_ids = cluster
+            .engines
+            .iter()
+            .map(|e| e.0.to_owned())
+            .collect::<Vec<_>>();
+
+        tikv_util::info!("engine_2 is {}", eng_ids[1]);
+        // engine 2 will not exec post apply snapshot.
+        fail::cfg("on_ob_pre_handle_snapshot", "return").unwrap();
+        fail::cfg("on_ob_post_apply_snapshot", "return").unwrap();
+
+        let engine_2 = cluster.get_engine(eng_ids[1]);
+        must_get_none(&engine_2, first_key);
+        // add peer (engine_2,engine_2) to region 1.
+        pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1]));
+
+        check_key(&cluster, first_key, &first_value, Some(false), None, None);
+
+        info!("stop node {}", eng_ids[1]);
+        cluster.stop_node(eng_ids[1]);
+        {
+            let lock = cluster.ffi_helper_set.lock();
+            lock.unwrap()
+                .deref_mut()
+                .get_mut(&eng_ids[1])
+                .unwrap()
+                .engine_store_server
+                .stop();
+        }
+
+        fail::remove("on_ob_pre_handle_snapshot");
+        fail::remove("on_ob_post_apply_snapshot");
+        info!("resume node {}", eng_ids[1]);
+        {
+            let lock = cluster.ffi_helper_set.lock();
+            lock.unwrap()
+                .deref_mut()
+                .get_mut(&eng_ids[1])
+                .unwrap()
+                .engine_store_server
+                .restore();
+        }
+        info!("restored node {}", eng_ids[1]);
+        cluster.run_node(eng_ids[1]).unwrap();
+
+        let (key, value) = (b"k2", b"v2");
+        cluster.must_put(key, value);
+        // we can get in memory, since snapshot is pre handled, though it is not persisted
+        check_key(
+            &cluster,
+            key,
+            value,
+            Some(true),
+            None,
+            Some(vec![eng_ids[1]]),
+        );
+        // now snapshot must be applied on peer engine_2
+        check_key(
+            &cluster,
+            first_key,
+            first_value.as_slice(),
+            Some(true),
+            None,
+            Some(vec![eng_ids[1]]),
+        );
+
+        cluster.shutdown();
+    }
+
+    #[test]
+    fn test_kv_restart() {
         // Test if a empty command can be observed when leadership changes.
         let (mut cluster, pd_client) = new_mock_cluster(0, 3);
 
         // Disable AUTO generated compact log.
-        // This will not totally disable, so we use some failpoints later.
-        cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000);
-        cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000);
-        cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000);
-        cluster.cfg.raft_store.raft_log_gc_threshold = 1000;
+        disable_auto_gen_compact_log(&mut cluster);
 
         // We don't handle CompactLog at all.
         fail::cfg("try_flush_data", "return(0)").unwrap();
@@ -1205,7 +1289,7 @@ mod restart {
                 k.as_bytes(),
                 v.as_bytes(),
                 Some(true),
-                None,
+                Some(true),
                 Some(vec![eng_ids[0]]),
             );
         }
@@ -1225,7 +1309,7 @@ mod restart {
                 k.as_bytes(),
                 v.as_bytes(),
                 Some(true),
-                None,
+                Some(false),
                 Some(vec![eng_ids[0]]),
             );
         }
@@ -1284,9 +1368,7 @@ mod snapshot {
         let (mut cluster, pd_client) = new_mock_cluster(0, 3);
 
         fail::cfg("on_can_apply_snapshot", "return(true)").unwrap();
-        cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000);
-        cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10);
-        cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(500);
+        disable_auto_gen_compact_log(&mut cluster);
         cluster.cfg.raft_store.max_snapshot_file_raw_size = ReadableSize(u64::MAX);
 
         // Disable default max peer count check.