Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pre_persist to control persistence #171

Merged
merged 2 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ elif [[ $M == "testnew" ]]; then
cargo test --package tests --test proxy normal::ingest
cargo test --package tests --test proxy normal::snapshot
cargo test --package tests --test proxy normal::restart
cargo test --package tests --test proxy normal::persist
# tests based on new-mock-engine-store, for some tests not available for new proxy
cargo test --package tests --test proxy proxy
elif [[ $M == "debug" ]]; then
Expand Down
50 changes: 41 additions & 9 deletions components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@ use std::{

use collections::HashMap;
use engine_tiflash::FsStatsExt;
use engine_traits::{CfName, SstMetaInfo};
use engine_traits::SstMetaInfo;
use kvproto::{
import_sstpb::SstMeta,
metapb::Region,
raft_cmdpb::{
AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest,
RaftCmdRequest, RaftCmdResponse, Request,
},
raft_cmdpb::{AdminCmdType, AdminRequest, AdminResponse, CmdType, RaftCmdRequest},
raft_serverpb::RaftApplyState,
};
use raft::{eraftpb, StateRole};
use raft::StateRole;
use raftstore::{
coprocessor,
coprocessor::{
Expand Down Expand Up @@ -131,7 +127,7 @@ impl TiFlashObserver {
) -> Self {
let engine_store_server_helper =
gen_engine_store_server_helper(engine.engine_store_server_helper);
// TODO(tiflash) start thread pool
// start thread pool for pre handle snapshot
let snap_pool = Builder::new(tikv_util::thd_name!("region-task"))
.max_thread_count(snap_handle_pool_size)
.build_future_pool();
Expand Down Expand Up @@ -242,7 +238,6 @@ impl TiFlashObserver {

impl Coprocessor for TiFlashObserver {
fn stop(&self) {
// TODO(tiflash) remove this when pre apply merged
info!("shutdown tiflash observer"; "peer_id" => self.peer_id);
self.apply_snap_pool.as_ref().unwrap().shutdown();
}
Expand Down Expand Up @@ -584,6 +579,43 @@ impl RegionChangeObserver for TiFlashObserver {
.handle_destroy(ob_ctx.region().get_id());
}
}
fn pre_persist(
&self,
ob_ctx: &mut ObserverContext<'_>,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let should_persist = if is_finished {
fail::fail_point!("on_pre_persist_with_finish", |_| { true });
false
} else {
let cmd = cmd.unwrap();
if cmd.has_admin_request() {
match cmd.get_admin_request().get_cmd_type() {
// Merge needs to get the latest apply index.
AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => true,
_ => false,
}
} else {
false
}
};
if should_persist {
info!(
"observe pre_persist, persist";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
);
} else {
debug!(
"observe pre_persist";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
"is_finished" => is_finished,
);
};
should_persist
}
}

impl PdTaskObserver for TiFlashObserver {
Expand Down
6 changes: 1 addition & 5 deletions components/proxy_server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
collections::{hash_map::RandomState, HashSet},
iter::FromIterator,
path::{Path, PathBuf},
};
use std::{collections::HashSet, iter::FromIterator, path::Path};

use itertools::Itertools;
use online_config::OnlineConfig;
Expand Down
2 changes: 1 addition & 1 deletion components/proxy_server/src/hacked_lock_mgr.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use tikv::{
server::{lock_manager::waiter_manager::Callback, Error, Result},
server::lock_manager::waiter_manager::Callback,
storage::{
lock_manager::{DiagnosticContext, Lock, LockManager as LockManagerTrait, WaitTimeout},
ProcessResult, StorageCallback,
Expand Down
2 changes: 0 additions & 2 deletions components/proxy_server/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(proc_macro_hygiene)]
use std::{
ffi::CStr,
os::raw::{c_char, c_int},
Expand All @@ -10,7 +9,6 @@ use std::{

use clap::{App, Arg, ArgMatches};
use tikv::config::TiKvConfig;
use tikv_util::config::ReadableDuration;

use crate::{config::make_tikv_config, fatal, setup::overwrite_config_with_cmd_args};

Expand Down
16 changes: 5 additions & 11 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
path::{Path, PathBuf},
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering},
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
mpsc, Arc, Mutex,
},
thread,
Expand Down Expand Up @@ -43,15 +43,11 @@ use grpcio::{EnvBuilder, Environment};
use grpcio_health::HealthService;
use kvproto::{
debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst,
kvrpcpb::ApiVersion,
};
use pd_client::{PdClient, RpcClient};
use raft_log_engine::RaftLogEngine;
use raftstore::{
coprocessor::{
config::SplitCheckConfigManager, BoxConsistencyCheckObserver, ConsistencyCheckMethod,
CoprocessorHost, RawConsistencyCheckObserver, RegionInfoAccessor,
},
coprocessor::{config::SplitCheckConfigManager, CoprocessorHost, RegionInfoAccessor},
router::ServerRaftStoreRouter,
store::{
config::RaftstoreConfigManager,
Expand All @@ -75,7 +71,7 @@ use tikv::{
server::{
config::{Config as ServerConfig, ServerConfigManager},
create_raft_storage,
gc_worker::{AutoGcConfig, GcWorker},
gc_worker::GcWorker,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
Expand All @@ -84,8 +80,7 @@ use tikv::{
GRPC_THREAD_PREFIX,
},
storage::{
self, config_manager::StorageConfigManger, mvcc::MvccConsistencyCheckObserver,
txn::flow_controller::FlowController, Engine,
self, config_manager::StorageConfigManger, txn::flow_controller::FlowController, Engine,
},
};
use tikv_util::{
Expand Down Expand Up @@ -217,7 +212,7 @@ pub fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
#[inline]
fn run_impl_only_for_decryption<CER: ConfiguredRaftEngine, F: KvFormat>(
config: TiKvConfig,
proxy_config: ProxyConfig,
_proxy_config: ProxyConfig,
engine_store_server_helper: &EngineStoreServerHelper,
) {
let encryption_key_manager =
Expand Down Expand Up @@ -927,7 +922,6 @@ impl<ER: RaftEngine> TiKvServer<ER> {
data_sink_reg_handle.clone(),
);
self.to_stop.push(single_target_worker);
let rsmeter_pubsub_service = resource_metering::PubSubService::new(data_sink_reg_handle);

let cfg_manager = resource_metering::ConfigManager::new(
self.config.resource_metering.clone(),
Expand Down
14 changes: 4 additions & 10 deletions components/proxy_server/src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
borrow::ToOwned,
io,
path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering},
};

use chrono::Local;
use std::borrow::ToOwned;

use clap::ArgMatches;
use collections::HashMap;
pub use server::setup::initial_logger;
use tikv::config::{MetricConfig, TiKvConfig};
use tikv_util::{self, config, logger};
use tikv_util::{self, logger};

use crate::config::{validate_and_persist_config, ProxyConfig};
use crate::config::ProxyConfig;
pub use crate::fatal;

#[allow(dead_code)]
Expand Down
34 changes: 27 additions & 7 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,11 @@ impl<E: KvEngine> CoprocessorHost<E> {
}
}

/// `post_exec` should be called immediately after we executed one raft command.
/// It notifies observers side effects of this command before execution of the next command,
/// including req/resp, apply state, modified region state, etc.
/// Return true observers think a persistence is necessary.
/// `post_exec` should be called immediately after we executed one raft
/// command. It notifies observers side effects of this command before
/// execution of the next command, including req/resp, apply state,
/// modified region state, etc. Return true observers think a
/// persistence is necessary.
pub fn post_exec(
&self,
region: &Region,
Expand Down Expand Up @@ -633,6 +634,26 @@ impl<E: KvEngine> CoprocessorHost<E> {
);
}

/// `pre_persist` is called we we want to persist data or meta for a region.
/// For example, in `finish_for` and `commit`,
/// we will separately call `pre_persist` with is_finished = true/false.
/// By returning false, we reject this persistence.
pub fn pre_persist(
&self,
region: &Region,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let mut ctx = ObserverContext::new(region);
for observer in &self.registry.region_change_observers {
let observer = observer.observer.inner();
if !observer.pre_persist(&mut ctx, is_finished, cmd) {
return false;
}
}
true
}

pub fn on_flush_applied_cmd_batch(
&self,
max_level: ObserveLevel,
Expand Down Expand Up @@ -927,11 +948,10 @@ mod tests {
_: u64,
_: &crate::store::SnapKey,
_: Option<&Snapshot>,
) -> Result<()> {
) {
self.called
.fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
Ok(())
}

fn should_pre_apply_snapshot(&self) -> bool {
Expand Down Expand Up @@ -1100,7 +1120,7 @@ mod tests {
index += ObserverIndex::PreApplySnapshot as usize;
assert_all!([&ob.called], &[index]);

let _ = host.post_apply_snapshot(&region, 0, &key, None);
host.post_apply_snapshot(&region, 0, &key, None);
index += ObserverIndex::PostApplySnapshot as usize;
assert_all!([&ob.called], &[index]);

Expand Down
49 changes: 30 additions & 19 deletions components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole};
pub mod config;
mod consistency_check;
pub mod dispatcher;
pub mod error;
mod error;
mod metrics;
pub mod region_info_accessor;
mod split_check;
Expand Down Expand Up @@ -103,6 +103,18 @@ pub trait AdminObserver: Coprocessor {
/// For now, the `region` in `ObserverContext` is an empty region.
fn post_apply_admin(&self, _: &mut ObserverContext<'_>, _: &AdminResponse) {}

/// Hook before exec admin request, returns whether we should skip this
/// admin.
fn pre_exec_admin(
&self,
_: &mut ObserverContext<'_>,
_: &AdminRequest,
_: u64,
_: u64,
) -> bool {
false
}

/// Hook to call immediately after exec command
/// Will be a special persistence after this exec if a observer returns true.
fn post_exec_admin(
Expand All @@ -115,18 +127,6 @@ pub trait AdminObserver: Coprocessor {
) -> bool {
false
}

/// Hook before exec admin request, returns whether we should skip this
/// admin.
fn pre_exec_admin(
&self,
_: &mut ObserverContext<'_>,
_: &AdminRequest,
_: u64,
_: u64,
) -> bool {
false
}
}

pub trait QueryObserver: Coprocessor {
Expand All @@ -147,6 +147,12 @@ pub trait QueryObserver: Coprocessor {
/// For now, the `region` in `ObserverContext` is an empty region.
fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &Cmd) {}

/// Hook before exec write request, returns whether we should skip this
/// write.
fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool {
false
}

/// Hook to call immediately after exec command.
/// Will be a special persistence after this exec if a observer returns true.
fn post_exec_query(
Expand All @@ -159,12 +165,6 @@ pub trait QueryObserver: Coprocessor {
) -> bool {
false
}

/// Hook before exec write request, returns whether we should skip this
/// write.
fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool {
false
}
}

pub trait ApplySnapshotObserver: Coprocessor {
Expand Down Expand Up @@ -307,6 +307,17 @@ pub enum RegionChangeEvent {
pub trait RegionChangeObserver: Coprocessor {
/// Hook to call when a region changed on this TiKV
fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {}

/// Should be called everytime before we write a WriteBatch into
/// KvEngine. Returns false if we can't commit at this time.
fn pre_persist(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
true
}
}

#[derive(Clone, Debug, Default)]
Expand Down
Loading