Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse engine_rocks in engine_tiflash #143

Open
wants to merge 11 commits into
base: raftstore-proxy-6.2
Choose a base branch
from
Open
4 changes: 2 additions & 2 deletions .github/workflows/pr-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ jobs:
export ENGINE_LABEL_VALUE=tiflash
export RUST_BACKTRACE=full
cargo check
cargo test --features compat_new_proxy --package tests --test proxy normal
cargo test --package tests --test proxy proxy
cargo test --package tests --test failpoints cases::test_normal
cargo test --package tests --test failpoints cases::test_bootstrap
cargo test --package tests --test failpoints cases::test_compact_log
Expand All @@ -72,5 +74,3 @@ jobs:
cargo test --package tests --test failpoints cases::test_merge
cargo test --package tests --test failpoints cases::test_import_service
cargo test --package tests --test failpoints cases::test_proxy_replica_read
cargo test --features compat_new_proxy --package tests --test proxy normal
cargo test --package tests --test proxy proxy
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions components/proxy_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,24 @@ use crate::fatal;
#[serde(rename_all = "kebab-case")]
pub struct ProxyConfig {
pub snap_handle_pool_size: usize,
pub engine_addr: String,
pub engine_store_version: String,
pub engine_store_git_hash: String,
}

pub const DEFAULT_ENGINE_ADDR: &str = if cfg!(feature = "failpoints") {
"127.0.0.1:20206"
} else {
""
};

impl Default for ProxyConfig {
fn default() -> Self {
ProxyConfig {
snap_handle_pool_size: 2,
engine_addr: DEFAULT_ENGINE_ADDR.to_string(),
engine_store_version: String::default(),
engine_store_git_hash: String::default(),
}
}
}
Expand Down
53 changes: 53 additions & 0 deletions components/proxy_server/src/hacked_lock_mgr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use tikv::{
server::{lock_manager::waiter_manager::Callback, Error, Result},
storage::{
lock_manager::{DiagnosticContext, Lock, LockManager as LockManagerTrait, WaitTimeout},
ProcessResult, StorageCallback,
},
};
use txn_types::TimeStamp;

#[derive(Copy, Clone)]
pub struct HackedLockManager {}

#[allow(dead_code)]
#[allow(unused_variables)]
impl LockManagerTrait for HackedLockManager {
fn wait_for(
&self,
start_ts: TimeStamp,
cb: StorageCallback,
pr: ProcessResult,
lock: Lock,
is_first_lock: bool,
timeout: Option<WaitTimeout>,
diag_ctx: DiagnosticContext,
) {
unimplemented!()
}

fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
) {
unimplemented!()
}

fn has_waiter(&self) -> bool {
todo!()
}

fn dump_wait_for_entries(&self, cb: Callback) {
todo!()
}
}

impl HackedLockManager {
pub fn new() -> Self {
Self {}
}
pub fn stop(&mut self) {}
}
1 change: 1 addition & 0 deletions components/proxy_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate tikv_util;

#[macro_use]
pub mod config;
pub mod hacked_lock_mgr;
pub mod proxy;
pub mod run;
pub mod setup;
Expand Down
75 changes: 49 additions & 26 deletions components/proxy_server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
process,
};

use clap::{App, Arg};
use clap::{App, Arg, ArgMatches};
use tikv::config::TiKvConfig;

use crate::{
Expand All @@ -18,6 +18,49 @@ use crate::{
},
};

// Not the same as TiKV
pub const TIFLASH_DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20170";
pub const TIFLASH_DEFAULT_STATUS_ADDR: &str = "127.0.0.1:20292";

fn make_tikv_config() -> TiKvConfig {
let mut default = TiKvConfig::default();
setup_default_tikv_config(&mut default);
default
}

pub fn setup_default_tikv_config(default: &mut TiKvConfig) {
default.server.addr = TIFLASH_DEFAULT_LISTENING_ADDR.to_string();
default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
}

pub fn gen_tikv_config(
matches: &ArgMatches,
is_config_check: bool,
unrecognized_keys: &mut Vec<String>,
) -> TiKvConfig {
matches
.value_of_os("config")
.map_or_else(make_tikv_config, |path| {
let path = Path::new(path);
TiKvConfig::from_file(
path,
if is_config_check {
Some(unrecognized_keys)
} else {
None
},
)
.unwrap_or_else(|e| {
panic!(
"invalid auto generated configuration file {}, err {}",
path.display(),
e
);
})
})
}

pub unsafe fn run_proxy(
argc: c_int,
argv: *const *const c_char,
Expand Down Expand Up @@ -223,34 +266,11 @@ pub unsafe fn run_proxy(
let mut unrecognized_keys = Vec::new();
let is_config_check = matches.is_present("config-check");

let mut config = matches
.value_of_os("config")
.map_or_else(TiKvConfig::default, |path| {
let path = Path::new(path);
TiKvConfig::from_file(
path,
if is_config_check {
Some(&mut unrecognized_keys)
} else {
None
},
)
.unwrap_or_else(|e| {
panic!(
"invalid auto generated configuration file {}, err {}",
path.display(),
e
);
})
});

check_engine_label(&matches);
overwrite_config_with_cmd_args(&mut config, &matches);
config.logger_compatible_adjust();
let mut config = gen_tikv_config(&matches, is_config_check, &mut unrecognized_keys);

let mut proxy_unrecognized_keys = Vec::new();
// Double read the same file for proxy-specific arguments.
let proxy_config =
let mut proxy_config =
matches
.value_of_os("config")
.map_or_else(crate::config::ProxyConfig::default, |path| {
Expand All @@ -271,6 +291,9 @@ pub unsafe fn run_proxy(
);
})
});
check_engine_label(&matches);
overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches);
config.logger_compatible_adjust();

// TODO(tiflash) We should later use ProxyConfig for proxy's own settings like `snap_handle_pool_size`
if is_config_check {
Expand Down
63 changes: 43 additions & 20 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ use tikv::{
config::{Config as ServerConfig, ServerConfigManager},
create_raft_storage,
gc_worker::{AutoGcConfig, GcWorker},
lock_manager::HackedLockManager as LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
Expand All @@ -105,7 +104,10 @@ use tikv_util::{
};
use tokio::runtime::Builder;

use crate::{config::ProxyConfig, fatal, setup::*, util::ffi_server_info};
use crate::{
config::ProxyConfig, fatal, hacked_lock_mgr::HackedLockManager as LockManager, setup::*,
util::ffi_server_info,
};

#[inline]
pub fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
Expand Down Expand Up @@ -948,7 +950,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)),
};

let storage = create_raft_storage::<_, _, _, F>(
let storage = create_raft_storage::<_, _, _, F, _>(
engines.engine.clone(),
&self.config.storage,
storage_read_pool_handle,
Expand Down Expand Up @@ -1077,6 +1079,27 @@ impl<ER: RaftEngine> TiKvServer<ER> {
.unwrap_or_else(|e| fatal!("failed to validate raftstore config {}", e));
let raft_store = Arc::new(VersionTrack::new(self.config.raft_store.clone()));
let health_service = HealthService::default();
let mut default_store = kvproto::metapb::Store::default();

if !self.proxy_config.engine_store_version.is_empty() {
default_store.set_version(self.proxy_config.engine_store_version.clone());
}
if !self.proxy_config.engine_store_git_hash.is_empty() {
default_store.set_git_hash(self.proxy_config.engine_store_git_hash.clone());
}
// addr -> store.peer_address
if self.config.server.advertise_addr.is_empty() {
default_store.set_peer_address(self.config.server.addr.clone());
} else {
default_store.set_peer_address(self.config.server.advertise_addr.clone())
}
// engine_addr -> store.addr
if !self.proxy_config.engine_addr.is_empty() {
default_store.set_address(self.proxy_config.engine_addr.clone());
} else {
panic!("engine address is empty");
}

let mut node = Node::new(
self.system.take().unwrap(),
&server_config.value().clone(),
Expand All @@ -1086,6 +1109,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
self.state.clone(),
self.background_worker.clone(),
Some(health_service.clone()),
Some(default_store),
);
node.try_bootstrap_store(engines.engines.clone())
.unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e));
Expand Down Expand Up @@ -1282,23 +1306,22 @@ impl<ER: RaftEngine> TiKvServer<ER> {
}

// Debug service.
// TODO(tiflash) make this usable when tikv merged.
// let debug_service = DebugService::new(
// Engines {
// kv: engines.engines.kv.rocks.clone(),
// raft: engines.engines.raft.clone(),
// },
// servers.server.get_debug_thread_pool().clone(),
// self.router.clone(),
// self.cfg_controller.as_ref().unwrap().clone(),
// );
// if servers
// .server
// .register_service(create_debug(debug_service))
// .is_some()
// {
// fatal!("failed to register debug service");
// }
let debug_service = DebugService::new(
Engines {
kv: engines.engines.kv.rocks.clone(),
raft: engines.engines.raft.clone(),
},
servers.server.get_debug_thread_pool().clone(),
self.router.clone(),
self.cfg_controller.as_ref().unwrap().clone(),
);
if servers
.server
.register_service(create_debug(debug_service))
.is_some()
{
fatal!("failed to register debug service");
}

// Create Diagnostics service
let diag_service = DiagnosticsService::new(
Expand Down
17 changes: 11 additions & 6 deletions components/proxy_server/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ pub use server::setup::{
use tikv::config::{check_critical_config, persist_config, MetricConfig, TiKvConfig};
use tikv_util::{self, config, logger};

use crate::config::ProxyConfig;
pub use crate::fatal;

#[allow(dead_code)]
pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatches<'_>) {
pub fn overwrite_config_with_cmd_args(
config: &mut TiKvConfig,
proxy_config: &mut ProxyConfig,
matches: &ArgMatches<'_>,
) {
if let Some(level) = matches.value_of("log-level") {
config.log.level = logger::get_level_by_string(level).unwrap();
config.log_level = slog::Level::Info;
Expand Down Expand Up @@ -47,21 +52,21 @@ pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatc
}

if let Some(engine_store_version) = matches.value_of("engine-version") {
config.server.engine_store_version = engine_store_version.to_owned();
proxy_config.engine_store_version = engine_store_version.to_owned();
}

if let Some(engine_store_git_hash) = matches.value_of("engine-git-hash") {
config.server.engine_store_git_hash = engine_store_git_hash.to_owned();
proxy_config.engine_store_git_hash = engine_store_git_hash.to_owned();
}

if config.server.engine_addr.is_empty() {
if proxy_config.engine_addr.is_empty() {
if let Some(engine_addr) = matches.value_of("engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
proxy_config.engine_addr = engine_addr.to_owned();
}
}

if let Some(engine_addr) = matches.value_of("advertise-engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
proxy_config.engine_addr = engine_addr.to_owned();
}

if let Some(data_dir) = matches.value_of("data-dir") {
Expand Down
Loading