diff --git a/Cargo.lock b/Cargo.lock index a0934490dce..a42d38acf46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2067,11 +2067,10 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.0.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" dependencies = [ - "matches", "percent-encoding", ] @@ -2674,11 +2673,10 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.2.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" dependencies = [ - "matches", "unicode-bidi", "unicode-normalization", ] @@ -3961,9 +3959,9 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" [[package]] name = "percent-encoding" -version = "2.1.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "perfcnt" @@ -4443,8 +4441,10 @@ dependencies = [ "keys", "kvproto", "lazy_static", + "pd_client", "protobuf", "raftstore", + "reqwest", "rocksdb", "slog", "slog-global", @@ -4452,6 +4452,7 @@ dependencies = [ "tokio", "tokio-timer", "tracker", + "url", ] [[package]] @@ -4563,6 +4564,7 @@ dependencies = [ "futures 0.3.15", "grpcio", "grpcio-health", + "hex 0.4.2", "hyper", "keys", "kvproto", @@ -6123,9 +6125,8 @@ checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" [[package]] name = "sysinfo" -version = "0.26.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ade661fa5e048ada64ad7901713301c21d2dbc5b65ee7967de8826c111452960" +version = "0.26.9" +source = "git+https://github.com/tikv/sysinfo?branch=0.26-fix-cpu#5a1bcf08816979624ef2ad79cfb896de432a9501" dependencies = [ "cfg-if 1.0.0", "core-foundation-sys", @@ -7059,6 +7060,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tipb" version = "0.0.1" @@ -7413,12 +7429,9 @@ checksum = "eeba86d422ce181a719445e51872fa30f1f7413b62becb52e95ec91aa262d85c" [[package]] name = "unicode-bidi" -version = "0.3.4" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" -dependencies = [ - "matches", -] +checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" @@ -7428,11 +7441,11 @@ checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" [[package]] name = "unicode-normalization" -version = "0.1.12" +version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" dependencies = [ - "smallvec", + "tinyvec", ] [[package]] @@ -7455,13 +7468,12 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "url" -version = "2.2.2" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" +checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" dependencies = [ "form_urlencoded", "idna", - "matches", "percent-encoding", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 42725dae955..a0cbfe23084 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -222,6 +222,8 @@ cmake = { git = "https://github.com/rust-lang/cmake-rs" } # remove this when https://github.com/rust-lang/backtrace-rs/pull/503 is merged. backtrace = { git = 'https://github.com/hehechen/backtrace-rs', branch = "v0.3.61" } +sysinfo ={ git = "https://github.com/tikv/sysinfo", branch = "0.26-fix-cpu" } + [target.'cfg(target_os = "linux")'.dependencies] procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "6599eb9dca74229b2c1fcc44118bef7eff127128" } # When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to @@ -424,7 +426,7 @@ opt-level = 1 [profile.dev] opt-level = 0 -debug = 0 +debug = false codegen-units = 4 lto = false incremental = true diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 0e45ef1d09d..27fa7b508d4 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -554,13 +554,20 @@ impl CoprocessorHost { } // (index, term) is for the applying entry. - pub fn pre_exec(&self, region: &Region, cmd: &RaftCmdRequest, index: u64, term: u64) -> bool { + pub fn pre_exec( + &self, + region: &Region, + cmd: &RaftCmdRequest, + index: u64, + term: u64, + apply_state: &RaftApplyState, + ) -> bool { let mut ctx = ObserverContext::new(region); if !cmd.has_admin_request() { let query = cmd.get_requests(); for observer in &self.registry.query_observers { let observer = observer.observer.inner(); - if observer.pre_exec_query(&mut ctx, query, index, term) { + if observer.pre_exec_query(&mut ctx, query, index, term, apply_state) { return true; } } @@ -569,7 +576,7 @@ impl CoprocessorHost { let admin = cmd.get_admin_request(); for observer in &self.registry.admin_observers { let observer = observer.observer.inner(); - if observer.pre_exec_admin(&mut ctx, admin, index, term) { + if observer.pre_exec_admin(&mut ctx, admin, index, term, apply_state) { return true; } } @@ -846,6 +853,32 @@ impl CoprocessorHost { } } + pub fn post_compact_log_from_underlying_engine( + &self, + region_id: u64, + do_write: bool, + compact_index: u64, + compact_term: u64, + max_compact_index: u64, + max_compact_term: u64, + request_applied_index: u64, + raftstore_applied_index: u64, + ) { + for observer in &self.registry.region_change_observers { + let observer = observer.observer.inner(); + observer.post_compact_log_from_underlying_engine( + region_id, + do_write, + compact_index, + compact_term, + max_compact_index, + max_compact_term, + request_applied_index, + raftstore_applied_index, + ); + } + } + pub fn shutdown(&self) { for entry in &self.registry.admin_observers { entry.observer.inner().stop(); @@ -947,6 +980,7 @@ mod tests { _: &AdminRequest, _: u64, _: u64, + _: &RaftApplyState, ) -> bool { self.called .fetch_add(ObserverIndex::PreExecAdmin as usize, Ordering::SeqCst); @@ -1002,6 +1036,7 @@ mod tests { _: &[Request], _: u64, _: u64, + _: &RaftApplyState, ) -> bool { self.called .fetch_add(ObserverIndex::PreExecQuery as usize, Ordering::SeqCst); @@ -1262,14 +1297,15 @@ mod tests { assert_all!([&ob.called], &[index]); let mut query_req = RaftCmdRequest::default(); + let apply_state = RaftApplyState::default(); query_req.set_requests(vec![Request::default()].into()); - host.pre_exec(®ion, &query_req, 0, 0); + host.pre_exec(®ion, &query_req, 0, 0, &apply_state); index += ObserverIndex::PreExecQuery as usize; assert_all!([&ob.called], &[index]); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); - host.pre_exec(®ion, &admin_req, 0, 0); + host.pre_exec(®ion, &admin_req, 0, 0, &apply_state); index += ObserverIndex::PreExecAdmin as usize; assert_all!([&ob.called], &[index]); diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 7c84b09ce7e..c2fd88391c5 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -113,6 +113,7 @@ pub trait AdminObserver: Coprocessor { _: &AdminRequest, _: u64, _: u64, + _: &RaftApplyState, ) -> bool { false } @@ -153,7 +154,14 @@ pub trait QueryObserver: Coprocessor { /// Hook before exec write request, returns whether we should skip this /// write. - fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool { + fn pre_exec_query( + &self, + _: &mut ObserverContext<'_>, + _: &[Request], + _: u64, + _: u64, + _: &RaftApplyState, + ) -> bool { false } @@ -337,6 +345,19 @@ pub trait RegionChangeObserver: Coprocessor { fn pre_write_apply_state(&self, _: &mut ObserverContext<'_>) -> bool { true } + + fn post_compact_log_from_underlying_engine( + &self, + _region_id: u64, + _do_write: bool, + _compact_index: u64, + _compact_term: u64, + _max_compact_index: u64, + _max_compact_term: u64, + _request_applied_index: u64, + _raftstore_applied_index: u64, + ) { + } } pub trait MessageObserver: Coprocessor { diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 1ef7bd843ee..c5b09484b6d 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -999,6 +999,13 @@ where /// The counter of pending request snapshots. See more in `Peer`. pending_request_snapshot_count: Arc, + /// Keep track of the max compact index from leader. + /// This index may not be exactly accurate, since a restart may have these + /// values set to 0. However, it can always be advanced, since + /// `CompactLog` will try to update it. + max_compact_index: u64, + max_compact_term: u64, + /// Indicates the peer is in merging, if that compact log won't be /// performed. is_merging: bool, @@ -1066,6 +1073,8 @@ where ready_source_region_id: 0, yield_state: None, wait_merge_state: None, + max_compact_index: 0, + max_compact_term: 0, is_merging: reg.is_merging, pending_cmds: PendingCmdQueue::new(), metrics: Default::default(), @@ -1189,6 +1198,24 @@ where }); } + fn write_apply_state_by_underlying_engine( + &self, + wb: &mut EK::WriteBatch, + apply_state: &RaftApplyState, + ) { + wb.put_msg_cf( + CF_RAFT, + &keys::apply_state_key(self.region.get_id()), + apply_state, + ) + .unwrap_or_else(|e| { + panic!( + "{} failed to save apply state to write batch, error: {:?}", + self.tag, e + ); + }); + } + fn maybe_write_apply_state(&self, apply_ctx: &mut ApplyContext) { let can_write = apply_ctx.host.pre_write_apply_state(&self.region); if can_write { @@ -1427,59 +1454,78 @@ where // E.g. `RaftApplyState` must not be changed. let mut origin_epoch = None; - let (resp, exec_result) = if ctx.host.pre_exec(&self.region, &req, index, term) { - // One of the observers want to filter execution of the command. - let mut resp = RaftCmdResponse::default(); - if !req.get_header().get_uuid().is_empty() { - let uuid = req.get_header().get_uuid().to_vec(); - resp.mut_header().set_uuid(uuid); - } - (resp, ApplyResult::None) - } else { - ctx.exec_log_index = index; - ctx.exec_log_term = term; - ctx.kv_wb_mut().set_save_point(); - let (resp, exec_result) = match self.exec_raft_cmd(ctx, &req) { - Ok(a) => { - ctx.kv_wb_mut().pop_save_point().unwrap(); - if req.has_admin_request() { - origin_epoch = Some(self.region.get_region_epoch().clone()); - } - a + let (resp, exec_result) = + if ctx + .host + .pre_exec(&self.region, &req, index, term, &self.apply_state) + { + // One of the observers want to filter execution of the command. + let mut resp = RaftCmdResponse::default(); + if !req.get_header().get_uuid().is_empty() { + let uuid = req.get_header().get_uuid().to_vec(); + resp.mut_header().set_uuid(uuid); } - Err(e) => { - // clear dirty values. - ctx.kv_wb_mut().rollback_to_save_point().unwrap(); - match e { - Error::EpochNotMatch(..) => debug!( - "epoch not match"; - "region_id" => self.region_id(), - "peer_id" => self.id(), - "err" => ?e - ), - Error::FlashbackInProgress(..) => debug!( - "flashback is in process"; - "region_id" => self.region_id(), - "peer_id" => self.id(), - "err" => ?e - ), - Error::FlashbackNotPrepared(..) => debug!( - "flashback is not prepared"; - "region_id" => self.region_id(), - "peer_id" => self.id(), - "err" => ?e - ), - _ => error!(?e; - "execute raft command"; - "region_id" => self.region_id(), - "peer_id" => self.id(), - ), + if req.has_admin_request() { + let request = req.get_admin_request(); + // Filtering a CompactLog cmd means the underlying engine has not persisted to + // `compact_index` yet. + // We will keep track of the index so later we can try to compact to a index not + // greater than `max_compact_index`. + if request.get_cmd_type() == AdminCmdType::CompactLog { + let compact_index = request.get_compact_log().get_compact_index(); + let compact_term = request.get_compact_log().get_compact_term(); + if compact_index > self.max_compact_index { + self.max_compact_index = compact_index; + self.max_compact_term = compact_term; + } } - (cmd_resp::new_error(e), ApplyResult::None) } + (resp, ApplyResult::None) + } else { + ctx.exec_log_index = index; + ctx.exec_log_term = term; + ctx.kv_wb_mut().set_save_point(); + let (resp, exec_result) = match self.exec_raft_cmd(ctx, &req) { + Ok(a) => { + ctx.kv_wb_mut().pop_save_point().unwrap(); + if req.has_admin_request() { + origin_epoch = Some(self.region.get_region_epoch().clone()); + } + a + } + Err(e) => { + // clear dirty values. + ctx.kv_wb_mut().rollback_to_save_point().unwrap(); + match e { + Error::EpochNotMatch(..) => debug!( + "epoch not match"; + "region_id" => self.region_id(), + "peer_id" => self.id(), + "err" => ?e + ), + Error::FlashbackInProgress(..) => debug!( + "flashback is in process"; + "region_id" => self.region_id(), + "peer_id" => self.id(), + "err" => ?e + ), + Error::FlashbackNotPrepared(..) => debug!( + "flashback is not prepared"; + "region_id" => self.region_id(), + "peer_id" => self.id(), + "err" => ?e + ), + _ => error!(?e; + "execute raft command"; + "region_id" => self.region_id(), + "peer_id" => self.id(), + ), + } + (cmd_resp::new_error(e), ApplyResult::None) + } + }; + (resp, exec_result) }; - (resp, exec_result) - }; let cmd = Cmd::new(index, term, req, resp); if let ApplyResult::WaitMergeSource(_) = exec_result { @@ -3029,6 +3075,7 @@ where &mut self, voter_replicated_index: u64, voter_replicated_term: u64, + from_underlying_engine: bool, ) -> Result<(bool, Option>)> { PEER_ADMIN_CMD_COUNTER.compact.all.inc(); let first_index = entry_storage::first_index(&self.apply_state); @@ -3043,6 +3090,43 @@ where return Ok((false, None)); } + if from_underlying_engine { + let mut compact_index = voter_replicated_index; + // `compact_index` is reported by underlying engine, it may be greater than the + // recorded `max_compact_index`. We will use the smaller one of this + // two. + let mut compact_term = voter_replicated_term; + if compact_index > self.max_compact_index { + compact_index = self.max_compact_index; + compact_term = self.max_compact_term; + } + if compact_index < first_index { + debug!( + "compact_index < first index, no need to compact"; + "region_id" => self.region_id(), + "peer_id" => self.id(), + "compact_index" => compact_index, + "first_index" => first_index, + ); + return Ok((false, Some(ExecResult::HasPendingCompactCmd(false)))); + } + compact_raft_log( + &self.tag, + &mut self.apply_state, + compact_index, + compact_term, + )?; + PEER_ADMIN_CMD_COUNTER.compact.success.inc(); + return Ok(( + true, + Some(ExecResult::CompactLog { + state: self.apply_state.get_truncated_state().clone(), + first_index, + has_pending: false, + }), + )); + } + // When the witness restarted, the pending compact cmd has been lost, so use // `voter_replicated_index` for gc to avoid log accumulation. if !self.pending_cmds.has_compact() { @@ -3175,6 +3259,25 @@ where } } } + + // Safety: compact index is monotonicly increased guarded by `compact_raft_log` + // and `entry_storage::first_index`. + if compact_index > self.max_compact_index { + self.max_compact_index = compact_index; + self.max_compact_term = compact_term; + } + + if compact_index < first_index { + debug!( + "compact index < first index, no need to compact"; + "region_id" => self.region_id(), + "peer_id" => self.id(), + "compact_index" => compact_index, + "first_index" => first_index, + "leader compact index" => req.get_compact_log().get_compact_index(), + ); + return Ok((resp, ApplyResult::None)); + } // compact failure is safe to be omitted, no need to assert. compact_raft_log( &self.tag, @@ -3745,6 +3848,8 @@ where region_id: u64, voter_replicated_index: u64, voter_replicated_term: u64, + applied_index: Option, + from_underlying_engine: bool, }, } @@ -3826,11 +3931,17 @@ where region_id, voter_replicated_index, voter_replicated_term, + applied_index, + from_underlying_engine, } => { write!( f, - "[region {}] check compact, voter_replicated_index: {}, voter_replicated_term: {}", - region_id, voter_replicated_index, voter_replicated_term + "[region {}] check compact, voter_replicated_index: {}, voter_replicated_term: {}, applied_index: {:?}, from_underlying_engine {}", + region_id, + voter_replicated_index, + voter_replicated_term, + applied_index, + from_underlying_engine ) } } @@ -4292,14 +4403,18 @@ where ctx: &mut ApplyContext, voter_replicated_index: u64, voter_replicated_term: u64, + maybe_applied_index: Option, + from_underlying_engine: bool, ) { if self.delegate.pending_remove || self.delegate.stopped { return; } - let res = self - .delegate - .try_compact_log(voter_replicated_index, voter_replicated_term); + let res = self.delegate.try_compact_log( + voter_replicated_index, + voter_replicated_term, + from_underlying_engine, + ); match res { Ok((should_write, res)) => { if let Some(res) = res { @@ -4310,8 +4425,25 @@ where let mut result = VecDeque::new(); // If modified `truncated_state` in `try_compact_log`, the apply state should be // persisted. + ctx.host.post_compact_log_from_underlying_engine( + self.delegate.region.get_id(), + should_write, + voter_replicated_index, + voter_replicated_term, + self.delegate.max_compact_index, + self.delegate.max_compact_term, + maybe_applied_index.unwrap_or(0), + self.delegate.apply_state.get_applied_index(), + ); if should_write { - self.delegate.write_apply_state(ctx.kv_wb_mut()); + if let Some(underlying_engine_applied) = maybe_applied_index.as_ref() { + let mut state = self.delegate.apply_state.clone(); + state.set_applied_index(*underlying_engine_applied); + self.delegate + .write_apply_state_by_underlying_engine(ctx.kv_wb_mut(), &state); + } else { + self.delegate.write_apply_state(ctx.kv_wb_mut()); + }; ctx.commit_opt(&mut self.delegate, true); } result.push_back(res); @@ -4404,12 +4536,16 @@ where Msg::CheckCompact { voter_replicated_index, voter_replicated_term, + applied_index, + from_underlying_engine, .. } => { self.check_pending_compact_log( apply_ctx, voter_replicated_index, voter_replicated_term, + applied_index, + from_underlying_engine, ); } } @@ -5819,6 +5955,7 @@ mod tests { req: &AdminRequest, _: u64, _: u64, + _: &RaftApplyState, ) -> bool { let cmd_type = req.get_cmd_type(); if cmd_type == AdminCmdType::CompactLog diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 910a08c3a0b..43ab9ffdd0d 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -2736,6 +2736,8 @@ where region_id: self.region_id(), voter_replicated_index, voter_replicated_term, + applied_index: None, + from_underlying_engine: false, }, ) } @@ -3972,8 +3974,10 @@ where let total_cnt = self.fsm.peer.last_applying_idx - first_index; // the size of current CompactLog command can be ignored. let remain_cnt = self.fsm.peer.last_applying_idx - state.get_index() - 1; - self.fsm.peer.raft_log_size_hint = - self.fsm.peer.raft_log_size_hint * remain_cnt / total_cnt; + if total_cnt != 0 { + self.fsm.peer.raft_log_size_hint = + self.fsm.peer.raft_log_size_hint * remain_cnt / total_cnt; + } let compact_to = state.get_index() + 1; self.fsm.peer.schedule_raftlog_gc(self.ctx, compact_to); self.fsm.peer.last_compacted_idx = compact_to; diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs index 985617289b1..aa414208637 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs @@ -1,6 +1,86 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use encryption::DataKeyManager; +use proxy_ffi::snapshot_reader_impls::{sst_file_reader::SSTFileReader, LockCFFileReader}; + use crate::core::{common::*, ProxyForwarder}; +pub fn get_first_key( + path: &str, + cf: ColumnFamilyType, + key_manager: Option>, +) -> Vec { + unsafe { + if cf == ColumnFamilyType::Lock { + let mut reader = LockCFFileReader::ffi_get_cf_file_reader(path, key_manager.as_ref()); + reader.as_mut_sst_lock().ffi_key().to_slice().to_vec() + } else { + let mut reader = SSTFileReader::ffi_get_cf_file_reader(path, key_manager); + reader.as_mut_sst_other().ffi_key().to_slice().to_vec() + } + } +} + +pub fn sort_sst_by_start_key( + ssts: Vec<(PathBuf, ColumnFamilyType)>, + key_manager: Option>, +) -> Vec<(PathBuf, ColumnFamilyType)> { + let mut sw: Vec<(PathBuf, ColumnFamilyType)> = vec![]; + let mut sd: Vec<(PathBuf, ColumnFamilyType)> = vec![]; + let mut sl: Vec<(PathBuf, ColumnFamilyType)> = vec![]; + + for (p, c) in ssts.into_iter() { + match c { + ColumnFamilyType::Default => sd.push((p, c)), + ColumnFamilyType::Write => sw.push((p, c)), + ColumnFamilyType::Lock => sl.push((p, c)), + }; + } + + sw.sort_by(|a, b| { + let fk1 = get_first_key( + a.0.to_str().unwrap(), + ColumnFamilyType::Write, + key_manager.clone(), + ); + let fk2 = get_first_key( + b.0.to_str().unwrap(), + ColumnFamilyType::Write, + key_manager.clone(), + ); + fk1.cmp(&fk2) + }); + sd.sort_by(|a, b| { + let fk1 = get_first_key( + a.0.to_str().unwrap(), + ColumnFamilyType::Default, + key_manager.clone(), + ); + let fk2 = get_first_key( + b.0.to_str().unwrap(), + ColumnFamilyType::Default, + key_manager.clone(), + ); + fk1.cmp(&fk2) + }); + sl.sort_by(|a, b| { + let fk1 = get_first_key( + a.0.to_str().unwrap(), + ColumnFamilyType::Lock, + key_manager.clone(), + ); + let fk2 = get_first_key( + b.0.to_str().unwrap(), + ColumnFamilyType::Lock, + key_manager.clone(), + ); + fk1.cmp(&fk2) + }); + + sw.append(&mut sd); + sw.append(&mut sl); + sw +} + impl ProxyForwarder { fn handle_ingest_sst_for_engine_store( &self, @@ -40,6 +120,7 @@ impl ProxyForwarder { )); } + let ssts_wrap = sort_sst_by_start_key(ssts_wrap, self.key_manager.clone()); for (path, cf) in &ssts_wrap { sst_views.push((path.to_str().unwrap().as_bytes(), *cf)); } @@ -75,6 +156,7 @@ impl ProxyForwarder { req: &AdminRequest, index: u64, term: u64, + apply_state: &RaftApplyState, ) -> bool { match req.get_cmd_type() { AdminCmdType::CompactLog => { @@ -84,6 +166,8 @@ impl ProxyForwarder { false, index, term, + apply_state.get_truncated_state().get_index(), + apply_state.get_truncated_state().get_term(), ) { info!("can't flush data, filter CompactLog"; "region_id" => ?ob_region.get_id(), diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/region.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/region.rs index b402124d54b..67e2ef08959 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/region.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/region.rs @@ -120,4 +120,34 @@ impl ProxyForwarder { .access_cached_region_info_mut(region_id, f) .unwrap(); } + + pub fn post_compact_log_from_underlying_engine( + &self, + region_id: u64, + do_write: bool, + compact_index: u64, + compact_term: u64, + max_compact_index: u64, + max_compact_term: u64, + request_applied_index: u64, + raftstore_applied_index: u64, + ) { + debug!( + "post_compact_log_from_underlying_engine"; + "region_id" => region_id, + "do_write" => do_write, + "compact_index" => compact_index, + "compact_term" => compact_term, + "max_compact_index" => max_compact_index, + "max_compact_term" => max_compact_term, + "request_applied_index" => request_applied_index, + "raftstore_applied_index" => raftstore_applied_index, + ); + #[cfg(any(test, feature = "testexport"))] + self.engine + .proxy_ext + .debug_struct + .proactive_compact_log_count + .fetch_add(1, Ordering::SeqCst); + } } diff --git a/proxy_components/engine_store_ffi/src/core/forwarder.rs b/proxy_components/engine_store_ffi/src/core/forwarder.rs index a301751ec9a..9f5069e05dc 100644 --- a/proxy_components/engine_store_ffi/src/core/forwarder.rs +++ b/proxy_components/engine_store_ffi/src/core/forwarder.rs @@ -1,5 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use encryption::DataKeyManager; + use crate::core::common::*; pub struct PtrWrapper(pub RawCppPtr); @@ -53,6 +55,7 @@ pub struct ProxyForwarder { pub snap_mgr: Arc, pub packed_envs: Arc, pub debug_struct: Arc, + pub key_manager: Option>, } impl ProxyForwarder { @@ -84,6 +87,7 @@ impl ProxyForwarder { snap_mgr: SnapManager, packed_envs: PackedEnvs, debug_struct: DebugStruct, + key_manager: Option>, ) -> Self { let engine_store_server_helper = gen_engine_store_server_helper(engine.proxy_ext.engine_store_server_helper); @@ -106,6 +110,7 @@ impl ProxyForwarder { snap_mgr: Arc::new(snap_mgr), packed_envs: Arc::new(packed_envs), debug_struct: Arc::new(debug_struct), + key_manager, } } @@ -130,6 +135,7 @@ impl Clone for ProxyForwarder { snap_mgr: self.snap_mgr.clone(), packed_envs: self.packed_envs.clone(), debug_struct: self.debug_struct.clone(), + key_manager: self.key_manager.clone(), } } } diff --git a/proxy_components/engine_store_ffi/src/ffi/mod.rs b/proxy_components/engine_store_ffi/src/ffi/mod.rs index 2c907629262..e1bfee333cc 100644 --- a/proxy_components/engine_store_ffi/src/ffi/mod.rs +++ b/proxy_components/engine_store_ffi/src/ffi/mod.rs @@ -4,6 +4,7 @@ pub mod raftstore_proxy_engine; pub use engine_tiflash::EngineStoreConfig; pub use proxy_ffi::{ + apply_router_helper, basic_ffi_impls::*, domain_impls::*, encryption_impls::*, diff --git a/proxy_components/engine_store_ffi/src/observer.rs b/proxy_components/engine_store_ffi/src/observer.rs index 3c0dde48b5e..b12b9d574f5 100644 --- a/proxy_components/engine_store_ffi/src/observer.rs +++ b/proxy_components/engine_store_ffi/src/observer.rs @@ -1,6 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use std::sync::Arc; +use encryption::DataKeyManager; use engine_traits::RaftEngine; use kvproto::{ raft_cmdpb::{AdminRequest, RaftCmdRequest}, @@ -42,6 +43,7 @@ impl TiFlashObserver { snap_mgr: SnapManager, packed_envs: PackedEnvs, debug_struct: DebugStruct, + key_manager: Option>, ) -> Self { TiFlashObserver { forwarder: ProxyForwarder::new( @@ -53,6 +55,7 @@ impl TiFlashObserver { snap_mgr, packed_envs, debug_struct, + key_manager, ), } } @@ -110,9 +113,10 @@ impl AdminObserver for TiFlashObserver bool { self.forwarder - .pre_exec_admin(ob_ctx.region(), req, index, term) + .pre_exec_admin(ob_ctx.region(), req, index, term, apply_state) } fn post_exec_admin( @@ -187,6 +191,29 @@ impl RegionChangeObserver for TiFlashObs fn pre_write_apply_state(&self, ob_ctx: &mut ObserverContext<'_>) -> bool { self.forwarder.pre_write_apply_state(ob_ctx.region()) } + + fn post_compact_log_from_underlying_engine( + &self, + region_id: u64, + do_write: bool, + compact_index: u64, + compact_term: u64, + max_compact_index: u64, + max_compact_term: u64, + request_applied_index: u64, + raftstore_applied_index: u64, + ) { + self.forwarder.post_compact_log_from_underlying_engine( + region_id, + do_write, + compact_index, + compact_term, + max_compact_index, + max_compact_term, + request_applied_index, + raftstore_applied_index, + ) + } } impl MessageObserver for TiFlashObserver { diff --git a/proxy_components/engine_tiflash/src/engine.rs b/proxy_components/engine_tiflash/src/engine.rs index 488ffab81bb..3cf2f84ebc6 100644 --- a/proxy_components/engine_tiflash/src/engine.rs +++ b/proxy_components/engine_tiflash/src/engine.rs @@ -66,6 +66,7 @@ impl RocksEngine { engine_store_hub, config_set, cached_region_info_manager: Some(Arc::new(crate::CachedRegionInfoManager::new())), + debug_struct: Arc::new(Default::default()), }; let ps_ext = PageStorageExt { engine_store_server_helper, diff --git a/proxy_components/engine_tiflash/src/proxy_utils/proxy_ext.rs b/proxy_components/engine_tiflash/src/proxy_utils/proxy_ext.rs index a03cfbd623b..6f85fc09ece 100644 --- a/proxy_components/engine_tiflash/src/proxy_utils/proxy_ext.rs +++ b/proxy_components/engine_tiflash/src/proxy_utils/proxy_ext.rs @@ -2,13 +2,23 @@ use std::{ fmt::Formatter, sync::{ - atomic::{AtomicIsize, Ordering}, + atomic::{AtomicIsize, AtomicU64, Ordering}, Arc, }, }; use crate::proxy_utils::EngineStoreHub; +#[cfg(not(feature = "testexport"))] +#[derive(Debug, Default)] +pub struct DebugStruct {} + +#[cfg(feature = "testexport")] +#[derive(Debug, Default)] +pub struct DebugStruct { + pub proactive_compact_log_count: AtomicU64, +} + // This struct should be safe to copy. #[derive(Clone)] pub struct ProxyEngineExt { @@ -18,6 +28,7 @@ pub struct ProxyEngineExt { pub engine_store_hub: Option>, pub config_set: Option>, pub cached_region_info_manager: Option>, + pub debug_struct: Arc, } impl std::fmt::Debug for ProxyEngineExt { @@ -45,6 +56,7 @@ impl Default for ProxyEngineExt { engine_store_hub: None, config_set: None, cached_region_info_manager: None, + debug_struct: Arc::new(Default::default()), } } } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs index 687e94a2721..8e176f84c89 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs @@ -41,6 +41,7 @@ pub struct TestData { pub expected_self_safe_ts: u64, } +#[allow(clippy::type_complexity)] #[derive(Default)] pub struct ClusterExt { // Helper to set ffi_helper_set. @@ -49,12 +50,26 @@ pub struct ClusterExt { pub(crate) ffi_helper_set: Arc>>, pub test_data: TestData, pub cluster_ptr: isize, + pub pre_run_node_callback: Option>, } impl ClusterExt { + pub fn pre_node_start(&mut self, cfg: &mut crate::mock_cluster::MixedClusterConfig) { + if let Some(c) = self.pre_run_node_callback.as_mut() { + c(cfg); + } + } + + pub fn post_cluster_start(&mut self) { + self.iter_ffi_helpers(None, &mut |_, ffi: &mut FFIHelperSet| { + ffi.proxy.refresh_cluster_raftstore_version(-1); + }); + } + pub fn get_cluster_size(&self) -> usize { self.ffi_helper_set.lock().expect("poisoned").len() } + pub fn make_ffi_helper_set_no_bind( id: u64, engines: Engines, @@ -64,6 +79,7 @@ impl ClusterExt { cluster_ptr: isize, cluster_ext_ptr: isize, mock_cfg: MockConfig, + pd_client: Option>, ) -> (FFIHelperSet, TikvConfig) { // We must allocate on heap to avoid move. let proxy = Box::new(engine_store_ffi::ffi::RaftStoreProxy::new( @@ -79,6 +95,8 @@ impl ClusterExt { None => None, }, None, + None, + pd_client, )); let proxy_ref = proxy.as_ref(); @@ -131,6 +149,7 @@ impl ClusterExt { engines: Engines, key_manager: &Option>, router: &Option>, + pd_client: Option>, ) { init_global_ffi_helper_set(); // We don't know `node_id` now. @@ -144,6 +163,7 @@ impl ClusterExt { cluster_ptr, cluster_ext_ptr, mock_cfg, + pd_client, ); // We can not use moved or cloned engines any more. diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs index c7ddcba340a..5f018c3375c 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs @@ -259,7 +259,6 @@ impl> Cluster { debug!("recover node"; "node_id" => node_id); // Like TiKVServer::init self.run_node(node_id)?; - self.post_node_start(node_id); } // Try start new nodes. @@ -271,6 +270,7 @@ impl> Cluster { } let (router, system) = create_raft_batch_system(&self.cfg.raft_store, &self.resource_manager); + let apply_router = system.apply_router(); self.create_engine(Some(router.clone())); let store_meta = Arc::new(Mutex::new(StoreMeta::new(PENDING_MSG_CAP))); @@ -281,8 +281,9 @@ impl> Cluster { let key_manager = self.key_managers.last().unwrap().clone(); let node_id = { let mut sim = self.sim.wl(); - let cfg = self.cfg.clone(); + let mut cfg = self.cfg.clone(); // Like TiKVServer::init + self.cluster_ext.pre_node_start(&mut cfg); sim.run_node( 0, cfg, @@ -299,10 +300,11 @@ impl> Cluster { self.store_metas.insert(node_id, store_meta); self.key_managers_map.insert(node_id, key_manager.clone()); self.register_ffi_helper_set(None, node_id); - self.post_node_start(node_id); + self.post_node_start(node_id, apply_router); } assert_eq!(self.count, self.engines.len()); assert_eq!(self.count, self.dbs.len()); + self.cluster_ext.post_cluster_start(); Ok(()) } @@ -337,6 +339,7 @@ impl> Cluster { if let Some(labels) = self.labels.get(&node_id) { cfg.server.labels = labels.to_owned(); } + self.cluster_ext.pre_node_start(&mut cfg); let store_meta = match self.store_metas.entry(node_id) { MapEntry::Occupied(o) => { let mut meta = o.get().lock().unwrap(); @@ -352,10 +355,13 @@ impl> Cluster { tikv_util::thread_group::set_properties(Some(props)); debug!("calling run node"; "node_id" => node_id); + let apply_router = system.apply_router(); // FIXME: rocksdb event listeners may not work, because we change the router. self.sim .wl() .run_node(node_id, cfg, engines, store_meta, key_mgr, router, system)?; + + self.post_node_start(node_id, apply_router); debug!("node {} started", node_id); Ok(()) } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster_ext_v1.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster_ext_v1.rs index 8d8cef7491e..a869a643af3 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster_ext_v1.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster_ext_v1.rs @@ -6,6 +6,7 @@ use collections::HashMap; use engine_store_ffi::{ffi::RaftStoreProxyFFI, TiFlashEngine}; use engine_tiflash::DB; use engine_traits::{Engines, KvEngine}; +use raftstore::store::fsm::ApplyRouter; use tikv_util::{sys::SysQuota, HandyRwLock}; use super::{common::*, Cluster, Simulator}; @@ -15,7 +16,7 @@ impl> Cluster { self.cluster_ext.access_ffi_helpers(f) } - pub fn post_node_start(&mut self, node_id: u64) { + pub fn post_node_start(&mut self, node_id: u64, apply_router: ApplyRouter) { // Since we use None to create_ffi_helper_set, we must init again. let router = self.sim.rl().get_router(node_id).unwrap(); self.cluster_ext @@ -26,6 +27,7 @@ impl> Cluster { SysQuota::cpu_cores_quota() as usize * 2, ), ))); + ffi.proxy.setup_apply_router_helper(apply_router.clone()); }); } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs index 79102eb44b7..a51115f48c0 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs @@ -293,6 +293,7 @@ impl Simulator for NodeCluster { None, ); + let key_mgr_cloned = key_manager.clone(); let (snap_mgr, snap_mgr_path) = if node_id == 0 || !self .trans @@ -348,6 +349,7 @@ impl Simulator for NodeCluster { snap_mgr.clone(), packed_envs, DebugStruct::default(), + key_mgr_cloned, ); tiflash_ob.register_to(&mut coprocessor_host); diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs index 287f57bd22c..b584c7bb105 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs @@ -288,6 +288,7 @@ impl ServerCluster { } } + let key_mgr_cloned = key_manager.clone(); let local_reader = LocalReader::new( engines.kv.clone(), StoreMetaDelegate::new(store_meta.clone(), engines.kv.clone()), @@ -556,6 +557,7 @@ impl ServerCluster { snap_mgr.clone(), packed_envs, DebugStruct::default(), + key_mgr_cloned, ); tiflash_ob.register_to(&mut coprocessor_host); diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/util.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/util.rs index 049eb58c438..0e7c39d0472 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/util.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/util.rs @@ -37,6 +37,7 @@ pub fn create_tiflash_test_engine_with_cluster_ctx>( engines, &key_manager, &router, + Some(cluster.pd_client.clone()), ); let ffi_helper_set = cluster.cluster_ext.ffi_helper_lst.last_mut().unwrap(); let engines = ffi_helper_set diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index cfc2291af99..ba23f193ea7 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -338,9 +338,11 @@ extern "C" fn ffi_need_flush_data( unsafe extern "C" fn ffi_try_flush_data( arg1: *mut interfaces_ffi::EngineStoreServerWrap, region_id: u64, - _try_until_succeed: u8, + _flush_pattern: u8, index: u64, term: u64, + _truncated_index: u64, + _truncated_term: u64, ) -> u8 { let store = into_engine_store_server_wrap(arg1); let kvstore = &mut (*store.engine_store_server).kvstore; @@ -516,3 +518,14 @@ unsafe extern "C" fn ffi_handle_compute_store_stats( engine_keys_read: 0, } } + +unsafe extern "C" fn ffi_get_flushed_state( + _arg1: *mut interfaces_ffi::EngineStoreServerWrap, + _region_id: u64, + _acquire_lock: u8, +) -> interfaces_ffi::FlushedState { + interfaces_ffi::FlushedState { + applied_index: 0, + applied_term: 0, + } +} diff --git a/proxy_components/proxy_ffi/Cargo.toml b/proxy_components/proxy_ffi/Cargo.toml index b41d1ea6b50..9510c20ffbe 100644 --- a/proxy_components/proxy_ffi/Cargo.toml +++ b/proxy_components/proxy_ffi/Cargo.toml @@ -23,6 +23,7 @@ keys = { workspace = true, default-features = false } kvproto = { workspace = true } lazy_static = "1.3" protobuf = { version = "2.8", features = ["bytes"] } +pd_client = { workspace = true } raftstore = { workspace = true, default-features = false } slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" } @@ -30,6 +31,8 @@ tikv_util = { workspace = true, default-features = false } tokio = { version = "1.5", features = ["sync", "rt-multi-thread"] } tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hotfix" } tracker = { workspace = true, default-features = false } +reqwest = { version = "0.11", features = ["blocking"] } +url = "2.4.0" [dependencies.rocksdb] git = "https://github.com/tikv/rust-rocksdb.git" diff --git a/proxy_components/proxy_ffi/src/apply_router_helper.rs b/proxy_components/proxy_ffi/src/apply_router_helper.rs new file mode 100644 index 00000000000..5dd20a81af8 --- /dev/null +++ b/proxy_components/proxy_ffi/src/apply_router_helper.rs @@ -0,0 +1,47 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use engine_traits::KvEngine; +use raftstore::store::fsm::{ApplyRouter, ApplyTask}; + +pub trait ApplyRouterHelper: Sync + Send { + fn schedule_compact_log_task( + &self, + region_id: u64, + compact_index: u64, + compact_term: u64, + applied_index: u64, + ); +} + +pub struct ProxyApplyRouterHelper { + pub apply_router: std::sync::Mutex>, +} + +impl ProxyApplyRouterHelper { + pub fn new(apply_router: ApplyRouter) -> Self { + Self { + apply_router: std::sync::Mutex::new(apply_router.clone()), + } + } +} + +impl ApplyRouterHelper for ProxyApplyRouterHelper { + fn schedule_compact_log_task( + &self, + region_id: u64, + compact_index: u64, + compact_term: u64, + applied_index: u64, + ) { + self.apply_router.lock().unwrap().schedule_task( + region_id, + ApplyTask::CheckCompact { + region_id, + voter_replicated_index: compact_index, + voter_replicated_term: compact_term, + applied_index: Some(applied_index), + from_underlying_engine: true, + }, + ) + } +} diff --git a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs index a010e39ee30..6e3ee116b5d 100644 --- a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs @@ -156,6 +156,8 @@ impl EngineStoreServerHelper { try_until_succeed: bool, index: u64, term: u64, + truncated_index: u64, + truncated_term: u64, ) -> bool { debug_assert!(self.fn_try_flush_data.is_some()); unsafe { @@ -170,6 +172,8 @@ impl EngineStoreServerHelper { }, index, term, + truncated_index, + truncated_term, ) != 0 } } diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index 3f0e965e7c5..2289dfb885c 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -136,6 +136,13 @@ pub mod root { Stopping = 2, Terminated = 3, } + #[repr(u8)] + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] + pub enum RaftstoreVer { + Uncertain = 0, + V1 = 1, + V2 = 2, + } pub type RawCppPtrType = u32; pub type RawRustPtrType = u32; #[repr(C)] @@ -312,6 +319,12 @@ pub mod root { } #[repr(C)] #[derive(Debug)] + pub struct FlushedState { + pub applied_index: u64, + pub applied_term: u64, + } + #[repr(C)] + #[derive(Debug)] pub struct RaftStoreProxyFFIHelper { pub proxy_ptr: root::DB::RaftStoreProxyPtr, pub fn_handle_get_proxy_status: ::std::option::Option< @@ -411,6 +424,22 @@ pub mod root { error_msg: *mut root::DB::RawCppStringPtr, ) -> root::DB::KVGetStatus, >, + pub fn_notify_compact_log: ::std::option::Option< + unsafe extern "C" fn( + arg1: root::DB::RaftStoreProxyPtr, + region_id: u64, + compact_index: u64, + compact_term: u64, + applied_index: u64, + ), + >, + pub fn_get_cluster_raftstore_version: ::std::option::Option< + unsafe extern "C" fn( + arg1: root::DB::RaftStoreProxyPtr, + refresh_strategy: u8, + timeout_ms: i64, + ) -> root::DB::RaftstoreVer, + >, } #[repr(C)] #[derive(Debug)] @@ -502,10 +531,12 @@ pub mod root { pub fn_try_flush_data: ::std::option::Option< unsafe extern "C" fn( arg1: *mut root::DB::EngineStoreServerWrap, - arg2: u64, - arg3: u8, - arg4: u64, - arg5: u64, + region_id: u64, + flush_pattern: u8, + index: u64, + term: u64, + truncated_index: u64, + truncated_term: u64, ) -> u8, >, pub fn_atomic_update_proxy: ::std::option::Option< @@ -613,7 +644,14 @@ pub mod root { ) -> root::DB::FastAddPeerRes, >, } - pub const RAFT_STORE_PROXY_VERSION: u64 = 3617226644007633432; + extern "C" { + pub fn ffi_get_server_info_from_proxy( + arg1: isize, + arg2: root::DB::BaseBuffView, + arg3: root::DB::RawVoidPtr, + ) -> u32; + } + pub const RAFT_STORE_PROXY_VERSION: u64 = 1785250247080530932; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/proxy_components/proxy_ffi/src/lib.rs b/proxy_components/proxy_ffi/src/lib.rs index 59ed9dbdc8e..72878805029 100644 --- a/proxy_components/proxy_ffi/src/lib.rs +++ b/proxy_components/proxy_ffi/src/lib.rs @@ -21,6 +21,7 @@ pub mod raftstore_proxy; pub mod raftstore_proxy_helper_impls; pub mod read_index_helper; // FFI releated with reading from SST/RocksDB files. +pub mod apply_router_helper; pub mod snapshot_reader_impls; pub mod utils; diff --git a/proxy_components/proxy_ffi/src/raftstore_proxy.rs b/proxy_components/proxy_ffi/src/raftstore_proxy.rs index 9550e10a042..699ede40cba 100644 --- a/proxy_components/proxy_ffi/src/raftstore_proxy.rs +++ b/proxy_components/proxy_ffi/src/raftstore_proxy.rs @@ -6,16 +6,22 @@ use std::sync::{ }; use encryption::DataKeyManager; +use engine_traits::KvEngine; +use pd_client::PdClient; +use raftstore::store::fsm::ApplyRouter; +use tikv_util::error; +use tokio::runtime::Runtime; use super::{ get_engine_store_server_helper, interfaces_ffi, interfaces_ffi::{ - ConstRawVoidPtr, KVGetStatus, RaftProxyStatus, RaftStoreProxyPtr, RawCppStringPtr, - RawVoidPtr, + ConstRawVoidPtr, KVGetStatus, RaftProxyStatus, RaftStoreProxyPtr, RaftstoreVer, + RawCppStringPtr, RawVoidPtr, }, raftstore_proxy_helper_impls::*, read_index_helper, }; +use crate::apply_router_helper; pub type Eng = Box; @@ -24,6 +30,9 @@ pub struct RaftStoreProxy { key_manager: Option>, read_index_client: Option>, raftstore_proxy_engine: RwLock>, + apply_router_client: Option>, + pd_client: Option>, + cluster_raftstore_ver: RwLock, } impl RaftStoreProxy { @@ -32,19 +41,187 @@ impl RaftStoreProxy { key_manager: Option>, read_index_client: Option>, raftstore_proxy_engine: Option, + apply_router_client: Option>, + pd_client: Option>, ) -> Self { RaftStoreProxy { status, key_manager, read_index_client, raftstore_proxy_engine: RwLock::new(raftstore_proxy_engine), + apply_router_client, + pd_client, + cluster_raftstore_ver: RwLock::new(RaftstoreVer::Uncertain), } } } +pub fn maybe_use_backup_addr(u: &str, backup: impl Fn() -> String) -> Option { + let mut res = None; + let mut need_backup_ip = false; + + if let Ok(mut stuff) = url::Url::parse(u) { + match stuff.host() { + None => { + need_backup_ip = true; + } + Some(url::Host::Domain(e)) => { + if e == "localhost" { + need_backup_ip = true; + } + } + Some(url::Host::Ipv4(e)) => { + let is_loopback_or_unspecified = e.is_unspecified() || e.is_loopback(); + if is_loopback_or_unspecified { + need_backup_ip = true; + } + } + Some(url::Host::Ipv6(e)) => { + let is_loopback_or_unspecified = e.is_unspecified() || e.is_loopback(); + if is_loopback_or_unspecified { + need_backup_ip = true; + } + } + }; + if need_backup_ip { + let mut s = backup(); + if !s.starts_with("http") { + s = format!("http://{}", s); + } + if let Ok(back) = url::Url::parse(&s) { + stuff.set_host(back.host_str()).unwrap(); + } + res = Some(stuff.to_string()) + } + } + res +} + impl RaftStoreProxy { - pub fn raftstore_version(&self) -> u64 { - 1 + pub fn cluster_raftstore_version(&self) -> RaftstoreVer { + *self.cluster_raftstore_ver.read().unwrap() + } + + /// Issue requests to all stores which is not marked as TiFlash. + /// Use the result of the first store which is not a Uncertain. + /// Or set the result to Uncertain if timeout. + pub fn refresh_cluster_raftstore_version(&mut self, timeout_ms: i64) -> bool { + let generate_request_with_timeout = |timeout_ms: i64| -> Option { + let headers = reqwest::header::HeaderMap::new(); + let mut builder = reqwest::Client::builder().default_headers(headers); + if timeout_ms >= 0 { + builder = builder.timeout(std::time::Duration::from_millis(timeout_ms as u64)); + } + match builder.build() { + Ok(o) => Some(o), + Err(e) => { + error!("generate_request_with_timeout error {:?}", e); + None + } + } + }; + + let parse_response = + |rt: &Runtime, resp: Result| -> RaftstoreVer { + match resp { + Ok(resp) => { + if resp.status() == 404 { + // If the port is not implemented. + return RaftstoreVer::V1; + } else if resp.status() != 200 { + return RaftstoreVer::Uncertain; + } + let resp = rt.block_on(async { resp.text().await }).unwrap(); + if resp.contains("partitioned") { + RaftstoreVer::V2 + } else { + RaftstoreVer::V1 + } + } + Err(e) => { + error!("get_engine_type respond error {:?}", e); + RaftstoreVer::Uncertain + } + } + }; + + // We don't use information stored in `GlobalReplicationState` to decouple. + *self.cluster_raftstore_ver.write().unwrap() = RaftstoreVer::Uncertain; + let stores = match self.pd_client.as_ref().unwrap().get_all_stores(false) { + Ok(stores) => stores, + Err(e) => { + tikv_util::info!("get_all_stores error {:?}", e); + return false; + } + }; + + let to_try_addrs = stores.iter().filter_map(|store| { + // There are some other labels such like tiflash_compute. + let shall_filter = store + .get_labels() + .iter() + .any(|label| label.get_key() == "engine" && label.get_value().contains("tiflash")); + if !shall_filter { + // TiKV's status server don't support https. + let mut u = format!("http://{}/{}", store.get_status_address(), "engine_type"); + if let Some(nu) = maybe_use_backup_addr(&u, || store.get_address().to_string()) { + tikv_util::info!("switch from {} to {}", u, nu); + u = nu; + } + // A invalid url may lead to 404, which will enforce a V1 inference, which is + // error. + if let Ok(stuff) = url::Url::parse(&u) { + if stuff.path() == "/engine_type" { + Some(u) + } else { + None + } + } else { + None + } + } else { + None + } + }); + + let rt = Runtime::new().unwrap(); + + let mut pending = vec![]; + for addr in to_try_addrs { + if let Some(c) = generate_request_with_timeout(timeout_ms) { + let _g = rt.enter(); + let f = c.get(&addr).send(); + pending.push(rt.spawn(f)); + } + } + + if pending.is_empty() { + tikv_util::error!("no valid tikv stores with status server"); + } + + loop { + if pending.is_empty() { + break; + } + let sel = futures::future::select_all(pending); + let (resp, _completed_idx, remaining) = rt.block_on(async { sel.await }); + + let res = parse_response(&rt, resp.unwrap()); + + if res != RaftstoreVer::Uncertain { + *self.cluster_raftstore_ver.write().unwrap() = res; + rt.shutdown_timeout(std::time::Duration::from_millis(1)); + return true; + } + + pending = remaining; + } + rt.shutdown_timeout(std::time::Duration::from_millis(1)); + false + } + + pub fn raftstore_version(&self) -> RaftstoreVer { + RaftstoreVer::V1 } pub fn set_kv_engine(&mut self, kv_engine: Option) { @@ -80,7 +257,7 @@ impl RaftStoreProxy { ) -> KVGetStatus { let region_state_key = keys::region_state_key(region_id); let mut res = KVGetStatus::NotFound; - if self.raftstore_version() == 1 { + if self.raftstore_version() == RaftstoreVer::V1 { self.get_value_cf(engine_traits::CF_RAFT, ®ion_state_key, &mut |value| { match value { Ok(v) => { @@ -111,7 +288,7 @@ impl RaftStoreProxy { } pub fn get_raft_apply_state(&self, _region_id: u64) -> interfaces_ffi::KVGetStatus { - if self.raftstore_version() == 1 { + if self.raftstore_version() == RaftstoreVer::V1 { panic!("wrong raftstore version"); } else { unreachable!() @@ -147,12 +324,29 @@ impl RaftStoreProxyFFI for RaftStoreProxy { fn set_status(&mut self, s: RaftProxyStatus) { self.status.store(s as u8, Ordering::SeqCst); } + + fn maybe_apply_router_helper( + &self, + ) -> &Option> { + &self.apply_router_client + } +} + +impl RaftStoreProxy { + pub fn setup_apply_router_helper(&mut self, ar: ApplyRouter) { + self.apply_router_client = Some(Box::new( + crate::apply_router_helper::ProxyApplyRouterHelper::new(ar), + )); + } } impl RaftStoreProxyPtr { pub unsafe fn as_ref(&self) -> &RaftStoreProxy { &*(self.inner as *const RaftStoreProxy) } + pub unsafe fn as_mut(&mut self) -> &mut RaftStoreProxy { + &mut *(self.inner as *mut RaftStoreProxy) + } pub fn is_null(&self) -> bool { self.inner.is_null() } diff --git a/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs b/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs index 4abefe6c19c..1d53771fc46 100644 --- a/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs @@ -15,17 +15,19 @@ use kvproto::kvrpcpb; use protobuf::Message; use super::{ + apply_router_helper, basic_ffi_impls::*, domain_impls::*, encryption_impls::*, engine_store_helper_impls::*, interfaces_ffi::{ BaseBuffView, CppStrVecView, KVGetStatus, RaftProxyStatus, RaftStoreProxyFFIHelper, - RaftStoreProxyPtr, RawCppPtr, RawCppStringPtr, RawRustPtr, RawVoidPtr, SSTReaderInterfaces, + RaftStoreProxyPtr, RaftstoreVer, RawCppPtr, RawCppStringPtr, RawRustPtr, RawVoidPtr, + SSTReaderInterfaces, }, read_index_helper, snapshot_reader_impls::*, - utils, + utils, UnwrapExternCFunc, }; impl Clone for RaftStoreProxyPtr { @@ -51,6 +53,8 @@ pub trait RaftStoreProxyFFI: Sync { // F: FnOnce(Result, String>); // fn set_kv_engine(&mut self, kv_engine: Option); // fn kv_engine(&self) -> &RwLock>; + fn maybe_apply_router_helper(&self) + -> &Option>; } impl RaftStoreProxyFFIHelper { @@ -83,10 +87,35 @@ impl RaftStoreProxyFFIHelper { fn_make_timer_task: Some(ffi_make_timer_task), fn_poll_timer_task: Some(ffi_poll_timer_task), fn_get_region_local_state: Some(ffi_get_region_local_state), + fn_notify_compact_log: Some(ffi_notify_compact_log), + fn_get_cluster_raftstore_version: Some(ffi_get_cluster_raftstore_version), } } } +unsafe extern "C" fn ffi_get_cluster_raftstore_version( + proxy_ptr: RaftStoreProxyPtr, + refresh_strategy: u8, + timeout_ms: i64, +) -> RaftstoreVer { + if refresh_strategy == 1 { + // Force refresh + let mut proxy_ptr = proxy_ptr; + proxy_ptr + .as_mut() + .refresh_cluster_raftstore_version(timeout_ms); + } else if refresh_strategy == 2 { + // Refresh if uncertain + if proxy_ptr.as_ref().cluster_raftstore_version() == RaftstoreVer::Uncertain { + let mut proxy_ptr = proxy_ptr; + proxy_ptr + .as_mut() + .refresh_cluster_raftstore_version(timeout_ms); + } + } + proxy_ptr.as_ref().cluster_raftstore_version() +} + unsafe extern "C" fn ffi_get_region_local_state( proxy_ptr: RaftStoreProxyPtr, region_id: u64, @@ -259,3 +288,27 @@ pub unsafe extern "C" fn ffi_poll_timer_task(task_ptr: RawVoidPtr, waker: RawVoi 0 } } + +pub extern "C" fn ffi_notify_compact_log( + proxy_ptr: RaftStoreProxyPtr, + region_id: u64, + compact_index: u64, + compact_term: u64, + applied_index: u64, +) { + assert!(!proxy_ptr.is_null()); + unsafe { + if proxy_ptr.as_ref().maybe_apply_router_helper().is_none() { + tikv_util::info!("Apply router helper is none"); + return; + } + } + unsafe { + proxy_ptr + .as_ref() + .maybe_apply_router_helper() + .as_ref() + .unwrap() + .schedule_compact_log_task(region_id, compact_index, compact_term, applied_index) + } +} diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs index 0fdc852ef65..88e6529c131 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs @@ -28,12 +28,12 @@ impl Clone for SSTReaderInterfaces { /// All impl of SST reader will be dispatched by this ptr. impl SSTReaderPtr { - unsafe fn as_mut_sst_lock(&mut self) -> &mut LockCFFileReader { + pub unsafe fn as_mut_sst_lock(&mut self) -> &mut LockCFFileReader { assert_eq!(self.kind, SSTFormatKind::KIND_SST); &mut *(self.inner as *mut LockCFFileReader) } - unsafe fn as_mut_sst_other(&mut self) -> &mut SSTFileReader { + pub unsafe fn as_mut_sst_other(&mut self) -> &mut SSTFileReader { assert_eq!(self.kind, SSTFormatKind::KIND_SST); &mut *(self.inner as *mut SSTFileReader) } diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index 46fd97e77fe..6a98934ff7c 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -24,6 +24,7 @@ use engine_store_ffi::{ self, core::DebugStruct, ffi::{ + apply_router_helper::ProxyApplyRouterHelper, interfaces_ffi::{ EngineStoreServerHelper, EngineStoreServerStatus, RaftProxyStatus, RaftStoreProxyFFIHelper, @@ -142,6 +143,17 @@ pub fn run_impl( SysQuota::cpu_cores_quota() as usize * 2, ))), None, + Some(Box::new(ProxyApplyRouterHelper::new( + tikv.system.as_ref().unwrap().apply_router(), + ))), + Some(tikv.pd_client.clone()), + ); + info!("start probing cluster's raftstore version"); + // We wait for a maximum of 10 seconds for every store. + proxy.refresh_cluster_raftstore_version(10 * 1000); + info!( + "cluster's raftstore version is {:?}", + proxy.cluster_raftstore_version() ); let proxy_ref = &proxy; @@ -254,6 +266,8 @@ fn run_impl_only_for_decryption( encryption_key_manager.clone(), Option::None, None, + None, + None, ); let proxy_ref = &proxy; @@ -1169,6 +1183,7 @@ impl TiKvServer { snap_mgr.clone(), packed_envs, DebugStruct::default(), + self.core.encryption_key_manager.clone(), ); tiflash_ob.register_to(self.coprocessor_host.as_mut().unwrap()); diff --git a/proxy_components/proxy_server/src/util.rs b/proxy_components/proxy_server/src/util.rs index c00d196696a..1a5158ae335 100644 --- a/proxy_components/proxy_server/src/util.rs +++ b/proxy_components/proxy_server/src/util.rs @@ -1,6 +1,9 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::time::{Duration, Instant}; +use std::{ + pin::Pin, + time::{Duration, Instant}, +}; use engine_store_ffi::ffi::interfaces_ffi::{BaseBuffView, RaftStoreProxyPtr, RawVoidPtr}; use futures::{compat::Future01CompatExt, executor::block_on}; @@ -74,3 +77,29 @@ pub extern "C" fn ffi_server_info( engine_store_ffi::ffi::set_server_info_resp(&resp, res); 0 } + +#[no_mangle] +pub extern "C" fn ffi_get_server_info_from_proxy( + server_helper_ptr: isize, + view: BaseBuffView, + res: RawVoidPtr, +) -> u32 { + assert_ne!(server_helper_ptr, 0); + let mut req = ServerInfoRequest::default(); + assert_ne!(view.data, std::ptr::null()); + assert_ne!(view.len, 0); + req.merge_from_bytes(view.to_slice()).unwrap(); + + let resp = server_info_for_ffi(req); + let buff = engine_store_ffi::ffi::ProtoMsgBaseBuff::new(&resp); + unsafe { + let server_helper = &(*(server_helper_ptr + as *const engine_store_ffi::ffi::interfaces_ffi::EngineStoreServerHelper)); + server_helper.set_pb_msg_by_bytes( + engine_store_ffi::ffi::interfaces_ffi::MsgPBType::ServerInfoResponse, + res, + Pin::new(&buff).into(), + ); + } + 0 +} diff --git a/proxy_scripts/ci_check.sh b/proxy_scripts/ci_check.sh index e170d016d2a..bf7391d9bb5 100755 --- a/proxy_scripts/ci_check.sh +++ b/proxy_scripts/ci_check.sh @@ -38,6 +38,7 @@ elif [[ $M == "testnew" ]]; then cargo check --package proxy_server --features="$ENABLE_FEATURES" # tests based on mock-engine-store, with compat for new proxy cargo test --package proxy_tests --test proxy shared::write + cargo test --package proxy_tests --test proxy shared::proactive_flush cargo test --package proxy_tests --test proxy shared::snapshot cargo test --package proxy_tests --test proxy shared::normal::store cargo test --package proxy_tests --test proxy shared::normal::config @@ -49,6 +50,7 @@ elif [[ $M == "testnew" ]]; then cargo test --package proxy_tests --test proxy shared::store cargo test --package proxy_tests --test proxy shared::region cargo test --package proxy_tests --test proxy shared::flashback + cargo test --package proxy_tests --test proxy v2_compat::cluster_raftstore_ver cargo test --package proxy_tests --test proxy v2_compat::tablet_snapshot cargo test --package proxy_tests --test proxy v2_compat::simple_write cargo test --package proxy_tests --test proxy v1_specific::region_ext diff --git a/proxy_scripts/clippy.sh b/proxy_scripts/clippy.sh old mode 100644 new mode 100755 diff --git a/proxy_tests/Cargo.toml b/proxy_tests/Cargo.toml index 6f7a270bd94..1c96815703f 100644 --- a/proxy_tests/Cargo.toml +++ b/proxy_tests/Cargo.toml @@ -112,6 +112,7 @@ engine_test = { workspace = true } engine_traits = { workspace = true } external_storage_export = { workspace = true } file_system = { workspace = true } +hex = "0.4" hyper = { version = "0.14", default-features = false, features = ["runtime"] } keys = { workspace = true } panic_hook = { workspace = true } diff --git a/proxy_tests/proxy/mod.rs b/proxy_tests/proxy/mod.rs index 2b319d5738a..bf278bed628 100644 --- a/proxy_tests/proxy/mod.rs +++ b/proxy_tests/proxy/mod.rs @@ -7,6 +7,7 @@ #![recursion_limit = "100"] #![feature(vec_into_raw_parts)] #![feature(slice_pattern)] +#![feature(async_closure)] #[macro_use] extern crate slog_global; diff --git a/proxy_tests/proxy/shared/ingest.rs b/proxy_tests/proxy/shared/ingest.rs index a259e656612..4b9781814a4 100644 --- a/proxy_tests/proxy/shared/ingest.rs +++ b/proxy_tests/proxy/shared/ingest.rs @@ -41,6 +41,55 @@ fn test_handle_ingest_sst() { cluster.shutdown(); } +#[test] +fn test_handle_multiple_ingest_sst() { + let (mut cluster, _pd_client) = new_mock_cluster(0, 1); + let _ = cluster.run(); + + let key = "k"; + cluster.must_put(key.as_bytes(), b"v"); + let region = cluster.get_region(key.as_bytes()); + + let v = make_ssts( + &cluster, + region.get_id(), + region.get_region_epoch().clone(), + (0..100).map(|i| format!("k{}", i)).collect::>(), + 5, + ); + + let mut ssts = vec![]; + for (save, m, _) in v.iter() { + ssts.push((save.clone(), proxy_ffi::name_to_cf(m.get_cf_name()))); + } + let sorted_ssts = engine_store_ffi::core::forward_raft::sort_sst_by_start_key(ssts, None); + for sl in sorted_ssts.windows(2) { + let a = &sl[0]; + let b = &sl[1]; + let fk1 = + engine_store_ffi::core::forward_raft::get_first_key(a.0.to_str().unwrap(), a.1, None); + let fk2 = + engine_store_ffi::core::forward_raft::get_first_key(b.0.to_str().unwrap(), b.1, None); + assert!(fk1 < fk2); + } + + let mut reqs = vec![]; + for (_, m, _) in v.iter() { + reqs.push(new_ingest_sst_cmd(m.clone())); + } + + let _ = cluster.request(key.as_bytes(), reqs, false, Duration::from_secs(5), true); + + check_key(&cluster, b"k66", b"2", Some(true), Some(true), None); + + for (file, _, sst_path) in v.into_iter() { + assert!(sst_path.as_path().is_file()); + assert!(!file.as_path().is_file()); + std::fs::remove_file(sst_path.as_path()).unwrap(); + } + cluster.shutdown(); +} + #[test] fn test_invalid_ingest_sst() { let (mut cluster, _pd_client) = new_mock_cluster(0, 1); diff --git a/proxy_tests/proxy/shared/mod.rs b/proxy_tests/proxy/shared/mod.rs index 12e24c39e70..9070b6989a2 100644 --- a/proxy_tests/proxy/shared/mod.rs +++ b/proxy_tests/proxy/shared/mod.rs @@ -7,6 +7,7 @@ mod ffi; mod ingest; mod mock; mod normal; +mod proactive_flush; mod region; mod replica_read; mod server_cluster_test; diff --git a/proxy_tests/proxy/shared/proactive_flush.rs b/proxy_tests/proxy/shared/proactive_flush.rs new file mode 100644 index 00000000000..0cfe9be78b1 --- /dev/null +++ b/proxy_tests/proxy/shared/proactive_flush.rs @@ -0,0 +1,81 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. +use proxy_ffi::interfaces_ffi::{RaftStoreProxyFFIHelper, RaftStoreProxyPtr}; + +use crate::utils::v1::*; + +fn do_notify_compact_log( + helper: &RaftStoreProxyFFIHelper, + ptr: RaftStoreProxyPtr, + region_id: u64, + compact_index: u64, + compact_term: u64, + applied_index: u64, +) { + unsafe { + (helper.fn_notify_compact_log.as_ref().unwrap())( + ptr, + region_id, + compact_index, + compact_term, + applied_index, + ) + } +} + +#[test] +fn test_proactive_flush() { + let (mut cluster, _) = new_mock_cluster(0, 1); + disable_auto_gen_compact_log(&mut cluster); + fail::cfg("try_flush_data", "return(0)").unwrap(); + cluster.run(); + + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + must_put_and_check_key(&mut cluster, 1, 10, Some(true), None, None); + + let prev_state = collect_all_states(&cluster.cluster_ext, region_id); + let (compact_index, compact_term) = get_valid_compact_index(&prev_state); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + let new_state = collect_all_states(&cluster.cluster_ext, region_id); + must_unaltered_disk_apply_state(&prev_state, &new_state); + let prev_state = new_state; + + let applied_index = compact_index; + iter_ffi_helpers(&cluster, Some(vec![1]), &mut |_, ffi: &mut FFIHelperSet| { + do_notify_compact_log( + ffi.proxy_helper.as_ref(), + ffi.proxy_helper.proxy_ptr.clone(), + region_id, + compact_index, + compact_term, + applied_index, + ) + }); + // Wait the async scheduled + std::thread::sleep(std::time::Duration::from_secs(2)); + let new_state = collect_all_states(&cluster.cluster_ext, region_id); + must_altered_disk_apply_state(&prev_state, &new_state); + iter_ffi_helpers(&cluster, Some(vec![1]), &mut |_, ffi: &mut FFIHelperSet| { + assert_eq!( + ffi.engine_store_server + .engines + .as_ref() + .unwrap() + .kv + .proxy_ext + .debug_struct + .proactive_compact_log_count + .load(Ordering::SeqCst), + 1 + ) + }); + fail::remove("try_flush_data"); + cluster.shutdown(); +} diff --git a/proxy_tests/proxy/shared/write.rs b/proxy_tests/proxy/shared/write.rs index a5432fae073..e29589460b2 100644 --- a/proxy_tests/proxy/shared/write.rs +++ b/proxy_tests/proxy/shared/write.rs @@ -365,6 +365,62 @@ fn test_unsupport_admin_cmd() { cluster.shutdown(); } +#[test] +fn test_failed_compact_log() { + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); + disable_auto_gen_compact_log(&mut cluster); + cluster.run(); + + cluster.must_put(b"k", b"v"); + let region = cluster.get_region("k".as_bytes()); + let region_id = region.get_id(); + fail::cfg("try_flush_data", "return(1)").unwrap(); + for i in 0..5 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + for i in 0..5 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } + std::thread::sleep(std::time::Duration::from_millis(500)); + let prev_state = collect_all_states(&cluster.cluster_ext, region_id); + let (compact_index, compact_term) = get_valid_compact_index(&prev_state); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + // compact index should less than applied index + assert!(!res.get_header().has_error(), "{:?}", res); + + for i in 5..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + for i in 5..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } + std::thread::sleep(std::time::Duration::from_millis(500)); + let prev_state = collect_all_states(&cluster.cluster_ext, region_id); + let compact_log = test_raftstore::new_compact_log_request(1, compact_term); + let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + // compact index should less than applied index + assert!(!res.get_header().has_error(), "{:?}", res); + + let new_state = collect_all_states(&cluster.cluster_ext, region_id); + must_unaltered_disk_truncated_state(&prev_state, &new_state); + cluster.shutdown(); +} + #[test] fn test_compact_log() { let (mut cluster, _pd_client) = new_mock_cluster(0, 3); diff --git a/proxy_tests/proxy/utils/mod.rs b/proxy_tests/proxy/utils/mod.rs index dbf75edfb82..0af26a60153 100644 --- a/proxy_tests/proxy/utils/mod.rs +++ b/proxy_tests/proxy/utils/mod.rs @@ -1,6 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. pub mod common; +pub mod sst; pub mod v1; pub mod v1_server; pub mod v2; diff --git a/proxy_tests/proxy/utils/sst.rs b/proxy_tests/proxy/utils/sst.rs new file mode 100644 index 00000000000..33bf767ac77 --- /dev/null +++ b/proxy_tests/proxy/utils/sst.rs @@ -0,0 +1,184 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +#![allow(unused)] + +use byteorder::{BigEndian, ByteOrder}; +use mock_engine_store::interfaces_ffi::BaseBuffView; +use more_asserts::assert_gt; +use proxy_ffi::{ + interfaces_ffi::{ColumnFamilyType, EngineIteratorSeekType, SSTFormatKind}, + snapshot_reader_impls::{sst_file_reader::SSTFileReader, tablet_reader::TabletReader, *}, +}; + +pub fn from_hex(key: &str) -> Result, hex::FromHexError> { + if key.starts_with("0x") || key.starts_with("0X") { + return hex::decode(&key[2..]); + } + hex::decode(key) +} + +pub unsafe fn must_get_in_tablet_snapshot(path: &str, cf: ColumnFamilyType, key: &str) { + // key is something like + // "7480000000000000FF795F720380000000FF0000026303800000FF0000017801000000FCF9DE534E2797FB83" + // . + let reader = TabletReader::ffi_get_cf_file_reader(path, cf, None); + assert_eq!(reader.kind, SSTFormatKind::KIND_TABLET); + let encoded = from_hex(key).unwrap(); + let bf = BaseBuffView { + data: encoded.as_ptr() as *const _, + len: encoded.len() as u64, + }; + + ffi_sst_reader_seek(reader.clone(), cf, EngineIteratorSeekType::Key, bf); + assert_eq!(ffi_sst_reader_remained(reader.clone(), cf), 1); + let actual = ffi_sst_reader_key(reader.clone(), cf); + assert_eq!(actual.to_slice(), encoded); +} + +pub unsafe fn must_not_get_in_tablet_snapshot(path: &str, cf: ColumnFamilyType, key: &str) { + // key is something like + // "7480000000000000FF795F720380000000FF0000026303800000FF0000017801000000FCF9DE534E2797FB83" + // . + let reader = TabletReader::ffi_get_cf_file_reader(path, cf, None); + assert_eq!(reader.kind, SSTFormatKind::KIND_TABLET); + let encoded = from_hex(key).unwrap(); + let bf = BaseBuffView { + data: encoded.as_ptr() as *const _, + len: encoded.len() as u64, + }; + + ffi_sst_reader_seek(reader.clone(), cf, EngineIteratorSeekType::Key, bf); + if ffi_sst_reader_remained(reader.clone(), cf) == 0 { + return; + } + let actual = ffi_sst_reader_key(reader.clone(), cf); + assert_ne!(actual.to_slice(), encoded); +} + +pub mod RecordFormat { + // These are functions from TiFlash. + #![allow(non_snake_case)] + + use super::*; + const ENC_GROUP_SIZE: usize = 8; + const ENC_MARKER: u8 = 0xff; + pub fn decodeInt64(p: &[u8]) -> i64 { + const SIGN_MASK: u64 = 1u64 << 63; + let y = (BigEndian::read_u64(p) ^ SIGN_MASK) as i64; + y + } + + pub fn getHandleID(p: &[u8]) -> i64 { + decodeInt64(p) + } + + pub fn getRawTiDBPK(decode: &[u8]) -> Vec { + let user_key = getUserKey(decode); + const RAW_KEY_NO_HANDLE_SIZE: usize = 1 + 8 + 2; + user_key[RAW_KEY_NO_HANDLE_SIZE..].to_vec() + } + + pub fn getUserKey(data: &[u8]) -> Vec { + removeKeyspaceID(data) + } + + pub fn removeKeyspaceID(data: &[u8]) -> Vec { + const KEYSPACE_PREFIX_LEN: usize = 4; + const TXN_MODE_PREFIX: u8 = 'x' as u8; + if data.len() < KEYSPACE_PREFIX_LEN || data[0] != TXN_MODE_PREFIX { + return data.to_vec(); + } + data[KEYSPACE_PREFIX_LEN..].to_vec() + } + + pub fn getTableId(data: &[u8]) -> i64 { + decodeInt64(&data[1..]) + } + + fn checkKeyPaddingValid(key: &[u8], ptr: usize, pad_size: u8) -> bool { + let p = unsafe { *(&key[ptr..] as *const [u8] as *const u64) } + >> ((ENC_GROUP_SIZE - pad_size as usize) * 8); + p == 0 + } + + pub fn decodeTiKVKeyFull(key: &[u8]) -> Vec { + let mut res: Vec = vec![]; + let chunk_len: usize = ENC_GROUP_SIZE + 1; + + for ptr in (0..).step_by(chunk_len) { + if ptr + chunk_len > key.len() { + panic!("Unexpexcted EOF"); + } + let marker = key[ptr + ENC_GROUP_SIZE]; + let pad_size: u8 = ENC_MARKER - marker; + if pad_size == 0 { + let mut v = key[ptr..ptr + ENC_GROUP_SIZE].to_vec(); + res.append(&mut v); + continue; + } + if pad_size as usize > ENC_GROUP_SIZE { + panic!("key padding"); + } + let a: usize = ptr + ENC_GROUP_SIZE - (pad_size as usize); + let mut v = key[ptr..a].to_vec(); + res.append(&mut v); + + if !checkKeyPaddingValid(key, ptr, pad_size) { + panic!("Key padding, wrong end") + } + return res; + } + unreachable!() + } +} + +pub fn parse_handle_id(tikv_key: &[u8]) -> i64 { + let decode = RecordFormat::decodeTiKVKeyFull(tikv_key); + let pk = RecordFormat::getRawTiDBPK(&decode); + let handle = RecordFormat::getHandleID(&pk); + handle +} + +pub fn parse_table_id(tikv_key: &[u8]) -> i64 { + let handle = RecordFormat::getTableId(&tikv_key); + handle +} + +pub unsafe fn read_sst_file(path: &str) { + let reader = SSTFileReader::ffi_get_cf_file_reader(path, None); + assert_eq!(reader.kind, SSTFormatKind::KIND_SST); + + let mut prev: Option> = None; + loop { + let r = ffi_sst_reader_remained(reader.clone(), ColumnFamilyType::Write); + if r != 1 { + let p: &[u8] = &prev.unwrap(); + tikv_util::info!( + "End table id {} key {}", + parse_table_id(p), + parse_handle_id(p) + ); + break; + } + let k = ffi_sst_reader_key(reader.clone(), ColumnFamilyType::Write); + let ks = k.to_slice(); + if let Some(p) = prev { + let ps: &[u8] = &p; + more_asserts::assert_ge!(ks, ps); + more_asserts::assert_ge!(ks[11..], ps[11..]); + let hprev = parse_handle_id(ps); + let h = parse_handle_id(ks); + assert_gt!(h, hprev); + } else { + tikv_util::info!( + "Start table id {} key {}", + parse_table_id(ks), + parse_handle_id(ks) + ); + } + prev = Some(ks.to_vec()); + // tikv_util::info!("AAA {:?} {:?} {} {}", ks, &ks[11..], ks.len(), + // BigEndian::read_u64(&ks[12..])); + ffi_sst_reader_next(reader.clone(), ColumnFamilyType::Write); + } +} diff --git a/proxy_tests/proxy/utils/v1.rs b/proxy_tests/proxy/utils/v1.rs index 91a5c5920a3..962b727bc11 100644 --- a/proxy_tests/proxy/utils/v1.rs +++ b/proxy_tests/proxy/utils/v1.rs @@ -9,6 +9,7 @@ pub use mock_engine_store::mock_cluster::v1::{ }, Cluster, Simulator, }; +use rand::seq::SliceRandom; use sst_importer::SstImporter; use test_sst_importer::gen_sst_file_with_kvs; @@ -99,6 +100,57 @@ pub fn create_tmp_importer(cfg: &MixedClusterConfig, kv_path: &str) -> (PathBuf, (dir, importer) } +pub fn make_ssts( + cluster: &Cluster, + region_id: u64, + region_epoch: RegionEpoch, + keys: Vec, + split_num: usize, +) -> Vec<(PathBuf, SstMeta, PathBuf)> { + let path = cluster.engines.iter().last().unwrap().1.kv.path(); + let (import_dir, importer) = create_tmp_importer(&cluster.cfg, path); + + // Prepare data + let mut kvs: Vec<(&[u8], &[u8])> = Vec::new(); + let mut keys = keys; + keys.sort(); + for i in 0..keys.len() { + kvs.push((keys[i].as_bytes(), b"2")); + } + + assert!(keys.len() > split_num); + let per_file_num = keys.len() / split_num; + + // Make files + let mut res = vec![]; + for i in 0..split_num { + let (import_dir, importer) = create_tmp_importer(cluster.get_config(), path); + let sst_path = import_dir.join(format!("test{}.sst", i)); + let (mut meta, data) = if i == split_num - 1 { + gen_sst_file_with_kvs(&sst_path, &kvs[i * per_file_num..]) + } else { + gen_sst_file_with_kvs(&sst_path, &kvs[i * per_file_num..(i + 1) * per_file_num]) + }; + assert!(Path::new(sst_path.to_str().unwrap()).exists()); + meta.set_region_id(region_id); + meta.set_region_epoch(region_epoch.clone()); + meta.set_cf_name("default".to_owned()); + let mut file = importer.create(&meta).unwrap(); + file.append(&data).unwrap(); + file.finish().unwrap(); + + // copy file to save dir. + let src = sst_path.clone(); + let dst = file.get_import_path().save.to_str().unwrap(); + let _ = std::fs::copy(src.clone(), dst); + res.push((file.get_import_path().save.clone(), meta, sst_path)); + } + + let mut rnd = rand::thread_rng(); + res.shuffle(&mut rnd); + res +} + pub fn make_sst( cluster: &Cluster, region_id: u64, diff --git a/proxy_tests/proxy/v2_compat/cluster_raftstore_ver.rs b/proxy_tests/proxy/v2_compat/cluster_raftstore_ver.rs new file mode 100644 index 00000000000..e6b8827c2ec --- /dev/null +++ b/proxy_tests/proxy/v2_compat/cluster_raftstore_ver.rs @@ -0,0 +1,300 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{ + convert::Infallible, + net::SocketAddr, + sync::{atomic::AtomicUsize, Arc, RwLock}, +}; + +use hyper::{ + service::{make_service_fn, service_fn}, + Body, Response, Server, +}; +use proxy_ffi::interfaces_ffi::RaftstoreVer; +use tokio::{runtime::Runtime, sync::oneshot, task::JoinHandle}; + +use crate::utils::v1::*; + +type BoxError = Box; + +#[derive(Clone, Copy, Debug, PartialEq)] +enum ReturnState { + V1, + V2, + C404, + C403, + TimeoutV1, +} + +#[derive(Clone)] +pub struct SharedState { + inner: Arc>, +} + +impl SharedState { + fn new(t: ReturnState) -> Self { + Self { + inner: Arc::new(RwLock::new(t)), + } + } +} + +async fn handle_request(shared: SharedState) -> Result, BoxError> { + let x = shared.inner.read().unwrap().clone(); + match x { + ReturnState::C403 => Ok(Response::builder() + .status(403) + .body("raft-kv".into()) + .unwrap()), + ReturnState::C404 => Ok(Response::builder() + .status(404) + .body("raft-kv".into()) + .unwrap()), + ReturnState::V1 => Ok(Response::new("raft-kv".into())), + ReturnState::V2 => Ok(Response::new("partitioned-raft-kv".into())), + ReturnState::TimeoutV1 => { + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + Ok(Response::new("raft-kv".into())) + } + } +} + +pub async fn serve(rx: tokio::sync::oneshot::Receiver, port: u16, state: SharedState) { + let addr = SocketAddr::from(([127, 0, 0, 1], port)); + + let state = state.clone(); + let make_svc = make_service_fn(move |_| { + let state = state.clone(); + async move { + // This is the request handler. + Ok::<_, Infallible>(service_fn(move |_| { + let state = state.clone(); + handle_request(state) + })) + } + }); + + let server = Server::bind(&addr).serve(make_svc); + + let sig = async move || { + rx.await.unwrap(); + }; + let graceful = server.with_graceful_shutdown(sig()); + + if let Err(e) = graceful.await { + eprintln!("server error: {}", e); + } +} + +struct MockServer { + rt: Runtime, + tx: Vec>, + res: Vec, + handle: Vec>, + index: Arc, + ports: Vec, +} + +impl MockServer { + fn new(ports: Vec) -> Self { + let rt = Runtime::new().unwrap(); + let n = ports.len(); + let mut tx = vec![]; + let mut res = vec![]; + let mut handle = vec![]; + let index = Arc::new(AtomicUsize::new(0)); + for i in 0..n { + let (tx1, rx1) = oneshot::channel(); + let res1 = SharedState::new(ReturnState::V2); + let f1 = rt.spawn(serve(rx1, ports[i], res1.clone())); + tx.push(tx1); + res.push(res1); + handle.push(f1); + } + Self { + rt, + tx, + res, + handle, + index, + ports, + } + } +} + +impl Drop for MockServer { + fn drop(&mut self) { + let tx = std::mem::take(&mut self.tx); + let handle = std::mem::take(&mut self.handle); + for x in tx.into_iter() { + x.send(1).unwrap() + } + for x in handle.into_iter() { + self.rt.block_on(async { x }); + } + } +} + +#[test] +fn test_maybe_substitute_addr() { + let b = || "http://111.111.111.111:100".to_string(); + let b2 = || "111.111.111.111:100".to_string(); + let b3 = || "tikv2:100".to_string(); + fn test_with_b(b: impl Fn() -> String + Clone, ba: &str) { + let o = proxy_ffi::maybe_use_backup_addr( + "http://tc-tikv-9.tc-tikv-peer.mutiple-rocksdb-btdpb.svc:20160/engine_type", + b.clone(), + ); + assert_eq!(o, None); + let o = proxy_ffi::maybe_use_backup_addr("http://1.2.3.4:5/engine_type", b.clone()); + assert_eq!(o, None); + let o = proxy_ffi::maybe_use_backup_addr("http://127.0.0.1/engine_type", b.clone()); + assert_eq!(o.unwrap(), format!("http://{}/engine_type", ba)); + let o = proxy_ffi::maybe_use_backup_addr("http://localhost:222/a", b.clone()); + assert_eq!(o.unwrap(), format!("http://{}:222/a", ba)); + let o = proxy_ffi::maybe_use_backup_addr("http://0.0.0.0:333/engine_type", b.clone()); + assert_eq!(o.unwrap(), format!("http://{}:333/engine_type", ba)); + } + test_with_b(b, "111.111.111.111"); + test_with_b(b2, "111.111.111.111"); + test_with_b(b3, "tikv2"); +} + +#[test] +fn test_with_error_status_addr() { + let mock_server = MockServer::new(vec![1111, 1112]); + let (mut cluster_v1, _) = new_mock_cluster(1, 3); + let index = mock_server.index.clone(); + cluster_v1.cluster_ext.pre_run_node_callback = + Some(Box::new(move |cfg: &mut MixedClusterConfig| { + cfg.server.labels.clear(); + match index.load(Ordering::Relaxed) { + 0 => cfg.server.status_addr = "error$#_string".to_string(), + 1 => cfg.server.status_addr = "".to_string(), + _ => cfg.server.status_addr = "localhost:1119".to_string(), + } + index.fetch_add(1, Ordering::Relaxed); + })); + cluster_v1.run(); + cluster_v1 + .cluster_ext + .iter_ffi_helpers(None, &mut |_, ffi: &mut FFIHelperSet| { + ffi.proxy.refresh_cluster_raftstore_version(-1); + assert_eq!( + ffi.proxy.cluster_raftstore_version(), + RaftstoreVer::Uncertain + ); + }); + cluster_v1.shutdown(); +} + +#[test] +fn test_with_tiflash() { + let mock_server = MockServer::new(vec![1111, 1112]); + let (mut cluster_v1, _) = new_mock_cluster(1, 2); + let addrs = mock_server.ports.iter().map(|e| format!("127.0.0.1:{}", e)); + let status_addrs = Arc::new(addrs.collect::>()); + let index = mock_server.index.clone(); + cluster_v1.cluster_ext.pre_run_node_callback = + Some(Box::new(move |cfg: &mut MixedClusterConfig| { + if index.load(Ordering::Relaxed) == 0 { + cfg.server.labels.clear(); + } else { + cfg.server + .labels + .insert("engine".to_string(), "tiflash".to_string()); + } + cfg.server.status_addr = (*status_addrs)[index.load(Ordering::Relaxed)].to_string(); + index.fetch_add(1, Ordering::Relaxed); + })); + cluster_v1.run(); + + // TiFlash will always output as V1, however, we should neglect that. + + *mock_server.res[0].inner.write().unwrap() = ReturnState::C403; + // Node 1 is TiFlash node, its result will be neglected. + *mock_server.res[1].inner.write().unwrap() = ReturnState::V2; + cluster_v1 + .cluster_ext + .iter_ffi_helpers(None, &mut |_, ffi: &mut FFIHelperSet| { + ffi.proxy.refresh_cluster_raftstore_version(-1); + assert_eq!( + ffi.proxy.cluster_raftstore_version(), + RaftstoreVer::Uncertain + ); + }); + + cluster_v1.shutdown(); +} + +#[test] +fn test_normal() { + let mock_server = MockServer::new(vec![1111, 1112]); + let (mut cluster_v1, _) = new_mock_cluster(1, 2); + // Will switch from the ipv6 localhost address into store.get_addr(). + let addrs = mock_server.ports.iter().map(|e| format!("[::]:{}", e)); + let status_addrs = Arc::new(addrs.collect::>()); + let index = mock_server.index.clone(); + cluster_v1.cluster_ext.pre_run_node_callback = + Some(Box::new(move |cfg: &mut MixedClusterConfig| { + cfg.server.labels.clear(); + cfg.server.status_addr = (*status_addrs)[index.load(Ordering::Relaxed)].to_string(); + index.fetch_add(1, Ordering::Relaxed); + })); + cluster_v1.run(); + + cluster_v1 + .cluster_ext + .iter_ffi_helpers(None, &mut |_, ffi: &mut FFIHelperSet| { + assert_eq!(ffi.proxy.cluster_raftstore_version(), RaftstoreVer::V2); + }); + + *mock_server.res[0].inner.write().unwrap() = ReturnState::C403; + *mock_server.res[1].inner.write().unwrap() = ReturnState::V1; + cluster_v1 + .cluster_ext + .iter_ffi_helpers(None, &mut |_, ffi: &mut FFIHelperSet| { + ffi.proxy.refresh_cluster_raftstore_version(-1); + assert_eq!(ffi.proxy.cluster_raftstore_version(), RaftstoreVer::V1); + }); + + *mock_server.res[0].inner.write().unwrap() = ReturnState::TimeoutV1; + *mock_server.res[1].inner.write().unwrap() = ReturnState::V2; + assert_eq!( + *mock_server.res[0].inner.read().unwrap(), + ReturnState::TimeoutV1 + ); + assert_eq!(*mock_server.res[1].inner.read().unwrap(), ReturnState::V2); + cluster_v1 + .cluster_ext + .iter_ffi_helpers(None, &mut |_, ffi: &mut FFIHelperSet| { + ffi.proxy.refresh_cluster_raftstore_version(500); + assert_eq!(ffi.proxy.cluster_raftstore_version(), RaftstoreVer::V2); + }); + + // All timeout result in uncertain state. + *mock_server.res[0].inner.write().unwrap() = ReturnState::TimeoutV1; + *mock_server.res[1].inner.write().unwrap() = ReturnState::TimeoutV1; + cluster_v1 + .cluster_ext + .iter_ffi_helpers(None, &mut |_, ffi: &mut FFIHelperSet| { + ffi.proxy.refresh_cluster_raftstore_version(500); + assert_eq!( + ffi.proxy.cluster_raftstore_version(), + RaftstoreVer::Uncertain + ); + }); + + // If returns 404, means the server is an old v1 TiKV which doesn't have this + // service. + *mock_server.res[0].inner.write().unwrap() = ReturnState::C404; + *mock_server.res[1].inner.write().unwrap() = ReturnState::C404; + cluster_v1 + .cluster_ext + .iter_ffi_helpers(None, &mut |_, ffi: &mut FFIHelperSet| { + ffi.proxy.refresh_cluster_raftstore_version(-1); + assert_eq!(ffi.proxy.cluster_raftstore_version(), RaftstoreVer::V1); + }); + + cluster_v1.shutdown(); +} diff --git a/proxy_tests/proxy/v2_compat/mod.rs b/proxy_tests/proxy/v2_compat/mod.rs index f641166686f..9e5a468e48c 100644 --- a/proxy_tests/proxy/v2_compat/mod.rs +++ b/proxy_tests/proxy/v2_compat/mod.rs @@ -1,5 +1,6 @@ // Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. +mod cluster_raftstore_ver; mod simple_write; mod tablet_snapshot; pub(crate) mod utils; diff --git a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs index ecc3f907c79..c58219911fe 100644 --- a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs +++ b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs @@ -21,7 +21,7 @@ use tikv::server::tablet_snap::send_snap as send_snap_v2; use tikv_util::time::Limiter; use super::utils::*; -use crate::utils::v1::*; +use crate::utils::{sst::*, v1::*}; fn random_long_vec(length: usize) -> Vec { let mut rng = rand::thread_rng(); diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 98d5effe612..f7a1258050d 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 3617226644007633432ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 1785250247080530932ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index 22558455e47..1eb0dbd9de5 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -85,6 +85,12 @@ enum class EngineStoreServerStatus : uint8_t { // shutdown. }; +enum class RaftstoreVer : uint8_t { + Uncertain = 0, + V1, + V2, +}; + using RawCppPtrType = uint32_t; using RawRustPtrType = uint32_t; @@ -207,6 +213,10 @@ struct FastAddPeerRes { CppStrWithView region; }; +struct FlushedState { + uint64_t applied_index; + uint64_t applied_term; +}; struct RaftStoreProxyFFIHelper { RaftStoreProxyPtr proxy_ptr; RaftProxyStatus (*fn_handle_get_proxy_status)(RaftStoreProxyPtr); @@ -236,6 +246,12 @@ struct RaftStoreProxyFFIHelper { KVGetStatus (*fn_get_region_local_state)(RaftStoreProxyPtr, uint64_t region_id, RawVoidPtr data, RawCppStringPtr *error_msg); + void (*fn_notify_compact_log)(RaftStoreProxyPtr, uint64_t region_id, + uint64_t compact_index, uint64_t compact_term, + uint64_t applied_index); + RaftstoreVer (*fn_get_cluster_raftstore_version)(RaftStoreProxyPtr, + uint8_t refresh_strategy, + int64_t timeout_ms); }; struct PageStorageInterfaces { @@ -272,8 +288,10 @@ struct EngineStoreServerHelper { BaseBuffView, BaseBuffView, RaftCmdHeader); uint8_t (*fn_need_flush_data)(EngineStoreServerWrap *, uint64_t); - uint8_t (*fn_try_flush_data)(EngineStoreServerWrap *, uint64_t, uint8_t, - uint64_t, uint64_t); + uint8_t (*fn_try_flush_data)(EngineStoreServerWrap *, uint64_t region_id, + uint8_t flush_pattern, uint64_t index, + uint64_t term, uint64_t truncated_index, + uint64_t truncated_term); void (*fn_atomic_update_proxy)(EngineStoreServerWrap *, RaftStoreProxyFFIHelper *); void (*fn_handle_destroy)(EngineStoreServerWrap *, uint64_t); @@ -304,4 +322,15 @@ struct EngineStoreServerHelper { FastAddPeerRes (*fn_fast_add_peer)(EngineStoreServerWrap *, uint64_t region_id, uint64_t new_peer_id); }; + +#ifdef __cplusplus +extern "C" { +#endif +// Basically same as ffi_server_info, but no need to setup ProxyHelper, only +// need to setup ServerHelper. Used when proxy not start. +uint32_t ffi_get_server_info_from_proxy(intptr_t, BaseBuffView, RawVoidPtr); +#ifdef __cplusplus +} +#endif + } // namespace DB