Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calvin proactive proxy2 #341

Open
wants to merge 13 commits into
base: raftstore-proxy
Choose a base branch
from
Prev Previous commit
Next Next commit
add tests
Signed-off-by: test <test@test.com>
test committed Jul 17, 2023
commit 8183ecc8eae955d785678bbd135e67b9d9fa9717
2 changes: 1 addition & 1 deletion components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
@@ -3163,7 +3163,7 @@ where

fn exec_compact_log(
&mut self,
ctx: &mut ApplyContext<EK>,
ctx: &ApplyContext<EK>,
req: &AdminRequest,
) -> Result<(AdminResponse, ApplyResult<EK::Snapshot>)> {
PEER_ADMIN_CMD_COUNTER.compact.all.inc();
Original file line number Diff line number Diff line change
@@ -259,7 +259,6 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
debug!("recover node"; "node_id" => node_id);
// Like TiKVServer::init
self.run_node(node_id)?;
self.post_node_start(node_id);
}

// Try start new nodes.
@@ -271,6 +270,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
}
let (router, system) =
create_raft_batch_system(&self.cfg.raft_store, &self.resource_manager);
let apply_router = system.apply_router();
self.create_engine(Some(router.clone()));

let store_meta = Arc::new(Mutex::new(StoreMeta::new(PENDING_MSG_CAP)));
@@ -300,7 +300,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
self.store_metas.insert(node_id, store_meta);
self.key_managers_map.insert(node_id, key_manager.clone());
self.register_ffi_helper_set(None, node_id);
self.post_node_start(node_id);
self.post_node_start(node_id, apply_router);
}
assert_eq!(self.count, self.engines.len());
assert_eq!(self.count, self.dbs.len());
@@ -355,10 +355,13 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
tikv_util::thread_group::set_properties(Some(props));
debug!("calling run node"; "node_id" => node_id);

let apply_router = system.apply_router();
// FIXME: rocksdb event listeners may not work, because we change the router.
self.sim
.wl()
.run_node(node_id, cfg, engines, store_meta, key_mgr, router, system)?;

self.post_node_start(node_id, apply_router);
debug!("node {} started", node_id);
Ok(())
}
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ use collections::HashMap;
use engine_store_ffi::{ffi::RaftStoreProxyFFI, TiFlashEngine};
use engine_tiflash::DB;
use engine_traits::{Engines, KvEngine};
use raftstore::store::fsm::ApplyRouter;
use tikv_util::{sys::SysQuota, HandyRwLock};

use super::{common::*, Cluster, Simulator};
@@ -15,7 +16,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
self.cluster_ext.access_ffi_helpers(f)
}

pub fn post_node_start(&mut self, node_id: u64) {
pub fn post_node_start<EK: KvEngine>(&mut self, node_id: u64, apply_router: ApplyRouter<EK>) {
// Since we use None to create_ffi_helper_set, we must init again.
let router = self.sim.rl().get_router(node_id).unwrap();
self.cluster_ext
@@ -26,6 +27,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
SysQuota::cpu_cores_quota() as usize * 2,
),
)));
ffi.proxy.setup_apply_router_helper(apply_router.clone());
});
}

10 changes: 10 additions & 0 deletions proxy_components/proxy_ffi/src/raftstore_proxy.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,9 @@ use std::sync::{
};

use encryption::DataKeyManager;
use engine_traits::KvEngine;
use pd_client::PdClient;
use raftstore::store::fsm::ApplyRouter;
use tikv_util::error;
use tokio::runtime::Runtime;

@@ -330,6 +332,14 @@ impl RaftStoreProxyFFI for RaftStoreProxy {
}
}

impl RaftStoreProxy {
pub fn setup_apply_router_helper<EK: KvEngine>(&mut self, ar: ApplyRouter<EK>) {
self.apply_router_client = Some(Box::new(
crate::apply_router_helper::ProxyApplyRouterHelper::new(ar),
));
}
}

impl RaftStoreProxyPtr {
pub unsafe fn as_ref(&self) -> &RaftStoreProxy {
&*(self.inner as *const RaftStoreProxy)
Original file line number Diff line number Diff line change
@@ -299,6 +299,7 @@ pub extern "C" fn ffi_notify_compact_log(
assert!(!proxy_ptr.is_null());
unsafe {
if proxy_ptr.as_ref().maybe_apply_router_helper().is_none() {
tikv_util::info!("Apply router helper is none");
return;
}
}
1 change: 1 addition & 0 deletions proxy_scripts/ci_check.sh
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ elif [[ $M == "testnew" ]]; then
cargo check --package proxy_server --features="$ENABLE_FEATURES"
# tests based on mock-engine-store, with compat for new proxy
cargo test --package proxy_tests --test proxy shared::write
cargo test --package proxy_tests --test proxy shared::proactive_flush
cargo test --package proxy_tests --test proxy shared::snapshot
cargo test --package proxy_tests --test proxy shared::normal::store
cargo test --package proxy_tests --test proxy shared::normal::config
1 change: 1 addition & 0 deletions proxy_tests/proxy/shared/mod.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ mod ffi;
mod ingest;
mod mock;
mod normal;
mod proactive_flush;
mod region;
mod replica_read;
mod server_cluster_test;