Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calvin proactive proxy2 #341

Open
wants to merge 13 commits into
base: raftstore-proxy
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge proxy master
  • Loading branch information
CalvinNeo committed Jul 6, 2023
commit e04d1a81bde78d5cc1cf98b8b3970bfe35db0987
56 changes: 34 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -222,6 +222,8 @@ cmake = { git = "https://github.com/rust-lang/cmake-rs" }
# remove this when https://github.com/rust-lang/backtrace-rs/pull/503 is merged.
backtrace = { git = 'https://github.com/hehechen/backtrace-rs', branch = "v0.3.61" }

sysinfo ={ git = "https://github.com/tikv/sysinfo", branch = "0.26-fix-cpu" }

[target.'cfg(target_os = "linux")'.dependencies]
procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "6599eb9dca74229b2c1fcc44118bef7eff127128" }
# When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to
@@ -424,7 +426,7 @@ opt-level = 1

[profile.dev]
opt-level = 0
debug = 0
debug = false
codegen-units = 4
lto = false
incremental = true
6 changes: 3 additions & 3 deletions components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
@@ -354,9 +354,9 @@ pub trait RegionChangeObserver: Coprocessor {
/// Returns the maximum index the underlying engine can compact.
fn get_compact_index_and_term(
&self,
region_id: u64,
compact_index: u64,
compact_term: u64,
_: u64,
_: u64,
_: u64,
) -> Option<(u64, u64)> {
None
}
4 changes: 2 additions & 2 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
@@ -3927,8 +3927,8 @@ where
} => {
write!(
f,
"[region {}] check compact, voter_replicated_index: {}, voter_replicated_term: {}",
region_id, voter_replicated_index, voter_replicated_term
"[region {}] check compact, voter_replicated_index: {}, voter_replicated_term: {}, applied_index: {}",
region_id, voter_replicated_index, voter_replicated_term, applied_index
)
}
}
81 changes: 81 additions & 0 deletions proxy_components/engine_store_ffi/src/core/forward_raft/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,86 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
use encryption::DataKeyManager;
use proxy_ffi::snapshot_reader_impls::{sst_file_reader::SSTFileReader, LockCFFileReader};

use crate::core::{common::*, ProxyForwarder};

pub fn get_first_key(
path: &str,
cf: ColumnFamilyType,
key_manager: Option<Arc<DataKeyManager>>,
) -> Vec<u8> {
unsafe {
if cf == ColumnFamilyType::Lock {
let mut reader = LockCFFileReader::ffi_get_cf_file_reader(path, key_manager.as_ref());
reader.as_mut_sst_lock().ffi_key().to_slice().to_vec()
} else {
let mut reader = SSTFileReader::ffi_get_cf_file_reader(path, key_manager);
reader.as_mut_sst_other().ffi_key().to_slice().to_vec()
}
}
}

pub fn sort_sst_by_start_key(
ssts: Vec<(PathBuf, ColumnFamilyType)>,
key_manager: Option<Arc<DataKeyManager>>,
) -> Vec<(PathBuf, ColumnFamilyType)> {
let mut sw: Vec<(PathBuf, ColumnFamilyType)> = vec![];
let mut sd: Vec<(PathBuf, ColumnFamilyType)> = vec![];
let mut sl: Vec<(PathBuf, ColumnFamilyType)> = vec![];

for (p, c) in ssts.into_iter() {
match c {
ColumnFamilyType::Default => sd.push((p, c)),
ColumnFamilyType::Write => sw.push((p, c)),
ColumnFamilyType::Lock => sl.push((p, c)),
};
}

sw.sort_by(|a, b| {
let fk1 = get_first_key(
a.0.to_str().unwrap(),
ColumnFamilyType::Write,
key_manager.clone(),
);
let fk2 = get_first_key(
b.0.to_str().unwrap(),
ColumnFamilyType::Write,
key_manager.clone(),
);
fk1.cmp(&fk2)
});
sd.sort_by(|a, b| {
let fk1 = get_first_key(
a.0.to_str().unwrap(),
ColumnFamilyType::Default,
key_manager.clone(),
);
let fk2 = get_first_key(
b.0.to_str().unwrap(),
ColumnFamilyType::Default,
key_manager.clone(),
);
fk1.cmp(&fk2)
});
sl.sort_by(|a, b| {
let fk1 = get_first_key(
a.0.to_str().unwrap(),
ColumnFamilyType::Lock,
key_manager.clone(),
);
let fk2 = get_first_key(
b.0.to_str().unwrap(),
ColumnFamilyType::Lock,
key_manager.clone(),
);
fk1.cmp(&fk2)
});

sw.append(&mut sd);
sw.append(&mut sl);
sw
}

impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
fn handle_ingest_sst_for_engine_store(
&self,
@@ -40,6 +120,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
));
}

let ssts_wrap = sort_sst_by_start_key(ssts_wrap, self.key_manager.clone());
for (path, cf) in &ssts_wrap {
sst_views.push((path.to_str().unwrap().as_bytes(), *cf));
}
Original file line number Diff line number Diff line change
@@ -124,8 +124,8 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
pub fn get_compact_index_and_term(
&self,
region_id: u64,
compact_index: u64,
compact_term: u64,
_: u64,
_: u64,
) -> Option<(u64, u64)> {
let res = self
.engine_store_server_helper
6 changes: 6 additions & 0 deletions proxy_components/engine_store_ffi/src/core/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use encryption::DataKeyManager;

use crate::core::common::*;

pub struct PtrWrapper(pub RawCppPtr);
@@ -53,6 +55,7 @@ pub struct ProxyForwarder<T: Transport, ER: RaftEngine> {
pub snap_mgr: Arc<SnapManager>,
pub packed_envs: Arc<PackedEnvs>,
pub debug_struct: Arc<DebugStruct>,
pub key_manager: Option<Arc<DataKeyManager>>,
}

impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
@@ -84,6 +87,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
snap_mgr: SnapManager,
packed_envs: PackedEnvs,
debug_struct: DebugStruct,
key_manager: Option<Arc<DataKeyManager>>,
) -> Self {
let engine_store_server_helper =
gen_engine_store_server_helper(engine.proxy_ext.engine_store_server_helper);
@@ -106,6 +110,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
snap_mgr: Arc::new(snap_mgr),
packed_envs: Arc::new(packed_envs),
debug_struct: Arc::new(debug_struct),
key_manager,
}
}

@@ -130,6 +135,7 @@ impl<T: Transport + 'static, ER: RaftEngine> Clone for ProxyForwarder<T, ER> {
snap_mgr: self.snap_mgr.clone(),
packed_envs: self.packed_envs.clone(),
debug_struct: self.debug_struct.clone(),
key_manager: self.key_manager.clone(),
}
}
}
3 changes: 3 additions & 0 deletions proxy_components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
use std::sync::Arc;

use encryption::DataKeyManager;
use engine_traits::RaftEngine;
use kvproto::{
raft_cmdpb::{AdminRequest, RaftCmdRequest},
@@ -42,6 +43,7 @@ impl<T: Transport + 'static, ER: RaftEngine> TiFlashObserver<T, ER> {
snap_mgr: SnapManager,
packed_envs: PackedEnvs,
debug_struct: DebugStruct,
key_manager: Option<Arc<DataKeyManager>>,
) -> Self {
TiFlashObserver {
forwarder: ProxyForwarder::new(
@@ -53,6 +55,7 @@ impl<T: Transport + 'static, ER: RaftEngine> TiFlashObserver<T, ER> {
snap_mgr,
packed_envs,
debug_struct,
key_manager,
),
}
}
Loading