Skip to content

Commit

Permalink
raftstore: Implement coprocessor observer pre_persist (tikv#12957)
Browse files Browse the repository at this point in the history
ref tikv#12849

Support coprocessor observer pre_commit

Signed-off-by: CalvinNeo <[email protected]>
  • Loading branch information
CalvinNeo committed Sep 15, 2022
1 parent 30f5313 commit 255b5df
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 124 deletions.
1 change: 1 addition & 0 deletions ci_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ elif [[ $M == "testnew" ]]; then
cargo test --package tests --test proxy normal::ingest
cargo test --package tests --test proxy normal::snapshot
cargo test --package tests --test proxy normal::restart
cargo test --package tests --test proxy normal::persist
# tests based on new-mock-engine-store, for some tests not available for new proxy
cargo test --package tests --test proxy proxy
elif [[ $M == "debug" ]]; then
Expand Down
47 changes: 40 additions & 7 deletions components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@ use std::{

use collections::HashMap;
use engine_tiflash::FsStatsExt;
use engine_traits::{CfName, SstMetaInfo};
use engine_traits::SstMetaInfo;
use kvproto::{
import_sstpb::SstMeta,
metapb::Region,
raft_cmdpb::{
AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest,
RaftCmdRequest, RaftCmdResponse, Request,
},
raft_cmdpb::{AdminCmdType, AdminRequest, AdminResponse, CmdType, RaftCmdRequest},
raft_serverpb::RaftApplyState,
};
use raft::{eraftpb, StateRole};
use raft::StateRole;
use raftstore::{
coprocessor,
coprocessor::{
Expand Down Expand Up @@ -583,6 +579,43 @@ impl RegionChangeObserver for TiFlashObserver {
.handle_destroy(ob_ctx.region().get_id());
}
}
fn pre_persist(
&self,
ob_ctx: &mut ObserverContext<'_>,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let should_persist = if is_finished {
fail::fail_point!("on_pre_persist_with_finish", |_| { true });
false
} else {
let cmd = cmd.unwrap();
if cmd.has_admin_request() {
match cmd.get_admin_request().get_cmd_type() {
// Merge needs to get the latest apply index.
AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => true,
_ => false,
}
} else {
false
}
};
if should_persist {
info!(
"observe pre_persist, persist";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
);
} else {
debug!(
"observe pre_persist";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
"is_finished" => is_finished,
);
};
should_persist
}
}

impl PdTaskObserver for TiFlashObserver {
Expand Down
34 changes: 27 additions & 7 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,11 @@ impl<E: KvEngine> CoprocessorHost<E> {
}
}

/// `post_exec` should be called immediately after we executed one raft command.
/// It notifies observers side effects of this command before execution of the next command,
/// including req/resp, apply state, modified region state, etc.
/// Return true observers think a persistence is necessary.
/// `post_exec` should be called immediately after we executed one raft
/// command. It notifies observers side effects of this command before
/// execution of the next command, including req/resp, apply state,
/// modified region state, etc. Return true observers think a
/// persistence is necessary.
pub fn post_exec(
&self,
region: &Region,
Expand Down Expand Up @@ -633,6 +634,26 @@ impl<E: KvEngine> CoprocessorHost<E> {
);
}

/// `pre_persist` is called we we want to persist data or meta for a region.
/// For example, in `finish_for` and `commit`,
/// we will separately call `pre_persist` with is_finished = true/false.
/// By returning false, we reject this persistence.
pub fn pre_persist(
&self,
region: &Region,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let mut ctx = ObserverContext::new(region);
for observer in &self.registry.region_change_observers {
let observer = observer.observer.inner();
if !observer.pre_persist(&mut ctx, is_finished, cmd) {
return false;
}
}
true
}

pub fn on_flush_applied_cmd_batch(
&self,
max_level: ObserveLevel,
Expand Down Expand Up @@ -927,11 +948,10 @@ mod tests {
_: u64,
_: &crate::store::SnapKey,
_: Option<&Snapshot>,
) -> Result<()> {
) {
self.called
.fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
Ok(())
}

fn should_pre_apply_snapshot(&self) -> bool {
Expand Down Expand Up @@ -1100,7 +1120,7 @@ mod tests {
index += ObserverIndex::PreApplySnapshot as usize;
assert_all!([&ob.called], &[index]);

let _ = host.post_apply_snapshot(&region, 0, &key, None);
host.post_apply_snapshot(&region, 0, &key, None);
index += ObserverIndex::PostApplySnapshot as usize;
assert_all!([&ob.called], &[index]);

Expand Down
49 changes: 30 additions & 19 deletions components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole};
pub mod config;
mod consistency_check;
pub mod dispatcher;
pub mod error;
mod error;
mod metrics;
pub mod region_info_accessor;
mod split_check;
Expand Down Expand Up @@ -103,6 +103,18 @@ pub trait AdminObserver: Coprocessor {
/// For now, the `region` in `ObserverContext` is an empty region.
fn post_apply_admin(&self, _: &mut ObserverContext<'_>, _: &AdminResponse) {}

/// Hook before exec admin request, returns whether we should skip this
/// admin.
fn pre_exec_admin(
&self,
_: &mut ObserverContext<'_>,
_: &AdminRequest,
_: u64,
_: u64,
) -> bool {
false
}

/// Hook to call immediately after exec command
/// Will be a special persistence after this exec if a observer returns true.
fn post_exec_admin(
Expand All @@ -115,18 +127,6 @@ pub trait AdminObserver: Coprocessor {
) -> bool {
false
}

/// Hook before exec admin request, returns whether we should skip this
/// admin.
fn pre_exec_admin(
&self,
_: &mut ObserverContext<'_>,
_: &AdminRequest,
_: u64,
_: u64,
) -> bool {
false
}
}

pub trait QueryObserver: Coprocessor {
Expand All @@ -147,6 +147,12 @@ pub trait QueryObserver: Coprocessor {
/// For now, the `region` in `ObserverContext` is an empty region.
fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &Cmd) {}

/// Hook before exec write request, returns whether we should skip this
/// write.
fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool {
false
}

/// Hook to call immediately after exec command.
/// Will be a special persistence after this exec if a observer returns true.
fn post_exec_query(
Expand All @@ -159,12 +165,6 @@ pub trait QueryObserver: Coprocessor {
) -> bool {
false
}

/// Hook before exec write request, returns whether we should skip this
/// write.
fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool {
false
}
}

pub trait ApplySnapshotObserver: Coprocessor {
Expand Down Expand Up @@ -307,6 +307,17 @@ pub enum RegionChangeEvent {
pub trait RegionChangeObserver: Coprocessor {
/// Hook to call when a region changed on this TiKV
fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {}

/// Should be called everytime before we write a WriteBatch into
/// KvEngine. Returns false if we can't commit at this time.
fn pre_persist(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
true
}
}

#[derive(Clone, Debug, Default)]
Expand Down
79 changes: 58 additions & 21 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,18 @@ where
delegate: &mut ApplyDelegate<EK>,
results: VecDeque<ExecResult<EK::Snapshot>>,
) {
#[cfg(any(test, feature = "testexport"))]
{
if cfg!(feature = "compat_old_proxy") {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
if self.host.pre_persist(&delegate.region, true, None) {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, false);
} else {
debug!("do not persist when finish_for";
"region" => ?delegate.region,
"tag" => &delegate.tag,
"apply_state" => ?delegate.apply_state,
);
}
self.commit_opt(delegate, false);
self.apply_res.push(ApplyRes {
region_id: delegate.region_id(),
apply_state: delegate.apply_state.clone(),
Expand Down Expand Up @@ -717,17 +720,6 @@ pub fn notify_stale_req_with_msg(term: u64, msg: String, cb: Callback<impl Snaps
cb.invoke_with_response(resp);
}

fn should_flush_to_engine(cmd: &RaftCmdRequest) -> bool {
if cmd.has_admin_request() {
match cmd.get_admin_request().get_cmd_type() {
// Merge needs to get the latest apply index.
AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => return true,
_ => {}
}
}
return false;
}

/// Checks if a write is needed to be issued before handling the command.
fn should_write_to_engine(cmd: &RaftCmdRequest) -> bool {
if cmd.has_admin_request() {
Expand Down Expand Up @@ -1096,8 +1088,15 @@ where
return ApplyResult::Yield;
}
}
if should_flush_to_engine(&cmd) {
apply_ctx.commit_opt(self, true);
let mut has_unflushed_data =
self.last_flush_applied_index != self.apply_state.get_applied_index();
if (has_unflushed_data && should_write_to_engine(&cmd)
|| apply_ctx.kv_wb().should_write_to_engine())
&& apply_ctx.host.pre_persist(&self.region, false, Some(&cmd))
{
// TODO(tiflash) may write apply state twice here.
// Originally use only `commit_opt`.
apply_ctx.commit(self);
if let Some(start) = self.handle_start.as_ref() {
if start.saturating_elapsed() >= apply_ctx.yield_duration {
return ApplyResult::Yield;
Expand Down Expand Up @@ -4983,6 +4982,7 @@ mod tests {
cmd_sink: Option<Arc<Mutex<Sender<CmdBatch>>>>,
filter_compact_log: Arc<AtomicBool>,
filter_consistency_check: Arc<AtomicBool>,
skip_persist_when_pre_commit: Arc<AtomicBool>,
delay_remove_ssts: Arc<AtomicBool>,
last_delete_sst_count: Arc<AtomicU64>,
last_pending_delete_sst_count: Arc<AtomicU64>,
Expand Down Expand Up @@ -5106,6 +5106,17 @@ mod tests {
fn on_applied_current_term(&self, _: raft::StateRole, _: &Region) {}
}

impl RegionChangeObserver for ApplyObserver {
fn pre_persist(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
!self.skip_persist_when_pre_commit.load(Ordering::SeqCst)
}
}

#[test]
fn test_handle_raft_committed_entries() {
let (_path, engine) = create_tmp_engine("test-delegate");
Expand Down Expand Up @@ -5725,6 +5736,8 @@ mod tests {
let obs = ApplyObserver::default();
host.registry
.register_admin_observer(1, BoxAdminObserver::new(obs.clone()));
host.registry
.register_region_change_observer(1, BoxRegionChangeObserver::new(obs.clone()));
host.registry
.register_query_observer(1, BoxQueryObserver::new(obs.clone()));

Expand Down Expand Up @@ -5760,6 +5773,8 @@ mod tests {
reg.region.mut_region_epoch().set_version(3);
router.schedule_task(1, Msg::Registration(reg));

obs.skip_persist_when_pre_commit
.store(true, Ordering::SeqCst);
let mut index_id = 1;
let put_entry = EntryBuilder::new(index_id, 1)
.put(b"k1", b"v1")
Expand All @@ -5768,7 +5783,19 @@ mod tests {
.epoch(1, 3)
.build();
router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![put_entry], vec![])));
fetch_apply_res(&rx);
let apply_res = fetch_apply_res(&rx);

// We don't persist at `finish_for`, since we disabled `pre_persist`.
let state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(1))
.unwrap()
.unwrap_or_default();
assert_eq!(
apply_res.apply_state.get_applied_index(),
state.get_applied_index() + 1
);
obs.skip_persist_when_pre_commit
.store(false, Ordering::SeqCst);

// Phase 1: we test if pre_exec will filter execution of commands correctly.
index_id += 1;
Expand All @@ -5790,6 +5817,16 @@ mod tests {
assert_eq!(apply_res.exec_res.len(), 0);
assert_eq!(apply_res.apply_state.get_truncated_state().get_index(), 0);

// We persist at `finish_for`, since we enabled `pre_persist`.
let state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(1))
.unwrap()
.unwrap_or_default();
assert_eq!(
apply_res.apply_state.get_applied_index(),
state.get_applied_index()
);

index_id += 1;
// Don't filter CompactLog
obs.filter_compact_log.store(false, Ordering::SeqCst);
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl Simulator for NodeCluster {
.max_total_size(cfg.server.snap_max_total_size.0)
.encryption_key_manager(key_manager)
.max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0)
.enable_multi_snapshot_files(true)
.build(tmp.path().to_str().unwrap());
(snap_mgr, Some(tmp))
} else {
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ impl ServerCluster {
.max_total_size(cfg.server.snap_max_total_size.0)
.encryption_key_manager(key_manager)
.max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0)
.enable_multi_snapshot_files(true)
.build(tmp_str);
self.snap_mgrs.insert(node_id, snap_mgr.clone());
let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone()));
Expand Down
Loading

0 comments on commit 255b5df

Please sign in to comment.