Skip to content

Commit

Permalink
Avoid set addr/engine-addr and HackedLockManager in src/node.rs (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo authored Aug 15, 2022
1 parent 56761b7 commit 70de535
Show file tree
Hide file tree
Showing 20 changed files with 245 additions and 308 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ jobs:
export ENGINE_LABEL_VALUE=tiflash
export RUST_BACKTRACE=full
cargo check
cargo test --features compat_new_proxy --package tests --test proxy normal
cargo test --package tests --test proxy proxy
cargo test --package tests --test failpoints cases::test_normal
cargo test --package tests --test failpoints cases::test_bootstrap
cargo test --package tests --test failpoints cases::test_compact_log
Expand All @@ -72,5 +74,3 @@ jobs:
cargo test --package tests --test failpoints cases::test_merge
cargo test --package tests --test failpoints cases::test_import_service
cargo test --package tests --test failpoints cases::test_proxy_replica_read
cargo test --features compat_new_proxy --package tests --test proxy normal
cargo test --package tests --test proxy proxy
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions components/proxy_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,24 @@ use crate::fatal;
#[serde(rename_all = "kebab-case")]
pub struct ProxyConfig {
pub snap_handle_pool_size: usize,
pub engine_addr: String,
pub engine_store_version: String,
pub engine_store_git_hash: String,
}

pub const DEFAULT_ENGINE_ADDR: &str = if cfg!(feature = "failpoints") {
"127.0.0.1:20206"
} else {
""
};

impl Default for ProxyConfig {
fn default() -> Self {
ProxyConfig {
snap_handle_pool_size: 2,
engine_addr: DEFAULT_ENGINE_ADDR.to_string(),
engine_store_version: String::default(),
engine_store_git_hash: String::default(),
}
}
}
Expand Down
53 changes: 53 additions & 0 deletions components/proxy_server/src/hacked_lock_mgr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use tikv::{
server::{lock_manager::waiter_manager::Callback, Error, Result},
storage::{
lock_manager::{DiagnosticContext, Lock, LockManager as LockManagerTrait, WaitTimeout},
ProcessResult, StorageCallback,
},
};
use txn_types::TimeStamp;

#[derive(Copy, Clone)]
pub struct HackedLockManager {}

#[allow(dead_code)]
#[allow(unused_variables)]
impl LockManagerTrait for HackedLockManager {
fn wait_for(
&self,
start_ts: TimeStamp,
cb: StorageCallback,
pr: ProcessResult,
lock: Lock,
is_first_lock: bool,
timeout: Option<WaitTimeout>,
diag_ctx: DiagnosticContext,
) {
unimplemented!()
}

fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
) {
unimplemented!()
}

fn has_waiter(&self) -> bool {
todo!()
}

fn dump_wait_for_entries(&self, cb: Callback) {
todo!()
}
}

impl HackedLockManager {
pub fn new() -> Self {
Self {}
}
pub fn stop(&mut self) {}
}
1 change: 1 addition & 0 deletions components/proxy_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate tikv_util;

#[macro_use]
pub mod config;
pub mod hacked_lock_mgr;
pub mod proxy;
pub mod run;
pub mod setup;
Expand Down
75 changes: 49 additions & 26 deletions components/proxy_server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
process,
};

use clap::{App, Arg};
use clap::{App, Arg, ArgMatches};
use tikv::config::TiKvConfig;

use crate::{
Expand All @@ -18,6 +18,49 @@ use crate::{
},
};

// Not the same as TiKV
pub const TIFLASH_DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20170";
pub const TIFLASH_DEFAULT_STATUS_ADDR: &str = "127.0.0.1:20292";

fn make_tikv_config() -> TiKvConfig {
let mut default = TiKvConfig::default();
setup_default_tikv_config(&mut default);
default
}

pub fn setup_default_tikv_config(default: &mut TiKvConfig) {
default.server.addr = TIFLASH_DEFAULT_LISTENING_ADDR.to_string();
default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
}

pub fn gen_tikv_config(
matches: &ArgMatches,
is_config_check: bool,
unrecognized_keys: &mut Vec<String>,
) -> TiKvConfig {
matches
.value_of_os("config")
.map_or_else(make_tikv_config, |path| {
let path = Path::new(path);
TiKvConfig::from_file(
path,
if is_config_check {
Some(unrecognized_keys)
} else {
None
},
)
.unwrap_or_else(|e| {
panic!(
"invalid auto generated configuration file {}, err {}",
path.display(),
e
);
})
})
}

pub unsafe fn run_proxy(
argc: c_int,
argv: *const *const c_char,
Expand Down Expand Up @@ -223,34 +266,11 @@ pub unsafe fn run_proxy(
let mut unrecognized_keys = Vec::new();
let is_config_check = matches.is_present("config-check");

let mut config = matches
.value_of_os("config")
.map_or_else(TiKvConfig::default, |path| {
let path = Path::new(path);
TiKvConfig::from_file(
path,
if is_config_check {
Some(&mut unrecognized_keys)
} else {
None
},
)
.unwrap_or_else(|e| {
panic!(
"invalid auto generated configuration file {}, err {}",
path.display(),
e
);
})
});

check_engine_label(&matches);
overwrite_config_with_cmd_args(&mut config, &matches);
config.logger_compatible_adjust();
let mut config = gen_tikv_config(&matches, is_config_check, &mut unrecognized_keys);

let mut proxy_unrecognized_keys = Vec::new();
// Double read the same file for proxy-specific arguments.
let proxy_config =
let mut proxy_config =
matches
.value_of_os("config")
.map_or_else(crate::config::ProxyConfig::default, |path| {
Expand All @@ -271,6 +291,9 @@ pub unsafe fn run_proxy(
);
})
});
check_engine_label(&matches);
overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches);
config.logger_compatible_adjust();

// TODO(tiflash) We should later use ProxyConfig for proxy's own settings like `snap_handle_pool_size`
if is_config_check {
Expand Down
30 changes: 27 additions & 3 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ use tikv::{
config::{Config as ServerConfig, ServerConfigManager},
create_raft_storage,
gc_worker::{AutoGcConfig, GcWorker},
lock_manager::HackedLockManager as LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
Expand All @@ -105,7 +104,10 @@ use tikv_util::{
};
use tokio::runtime::Builder;

use crate::{config::ProxyConfig, fatal, setup::*, util::ffi_server_info};
use crate::{
config::ProxyConfig, fatal, hacked_lock_mgr::HackedLockManager as LockManager, setup::*,
util::ffi_server_info,
};

#[inline]
pub fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
Expand Down Expand Up @@ -948,7 +950,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)),
};

let storage = create_raft_storage::<_, _, _, F>(
let storage = create_raft_storage::<_, _, _, F, _>(
engines.engine.clone(),
&self.config.storage,
storage_read_pool_handle,
Expand Down Expand Up @@ -1077,6 +1079,27 @@ impl<ER: RaftEngine> TiKvServer<ER> {
.unwrap_or_else(|e| fatal!("failed to validate raftstore config {}", e));
let raft_store = Arc::new(VersionTrack::new(self.config.raft_store.clone()));
let health_service = HealthService::default();
let mut default_store = kvproto::metapb::Store::default();

if !self.proxy_config.engine_store_version.is_empty() {
default_store.set_version(self.proxy_config.engine_store_version.clone());
}
if !self.proxy_config.engine_store_git_hash.is_empty() {
default_store.set_git_hash(self.proxy_config.engine_store_git_hash.clone());
}
// addr -> store.peer_address
if self.config.server.advertise_addr.is_empty() {
default_store.set_peer_address(self.config.server.addr.clone());
} else {
default_store.set_peer_address(self.config.server.advertise_addr.clone())
}
// engine_addr -> store.addr
if !self.proxy_config.engine_addr.is_empty() {
default_store.set_address(self.proxy_config.engine_addr.clone());
} else {
panic!("engine address is empty");
}

let mut node = Node::new(
self.system.take().unwrap(),
&server_config.value().clone(),
Expand All @@ -1086,6 +1109,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
self.state.clone(),
self.background_worker.clone(),
Some(health_service.clone()),
Some(default_store),
);
node.try_bootstrap_store(engines.engines.clone())
.unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e));
Expand Down
17 changes: 11 additions & 6 deletions components/proxy_server/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ pub use server::setup::{
use tikv::config::{check_critical_config, persist_config, MetricConfig, TiKvConfig};
use tikv_util::{self, config, logger};

use crate::config::ProxyConfig;
pub use crate::fatal;

#[allow(dead_code)]
pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatches<'_>) {
pub fn overwrite_config_with_cmd_args(
config: &mut TiKvConfig,
proxy_config: &mut ProxyConfig,
matches: &ArgMatches<'_>,
) {
if let Some(level) = matches.value_of("log-level") {
config.log.level = logger::get_level_by_string(level).unwrap();
config.log_level = slog::Level::Info;
Expand Down Expand Up @@ -47,21 +52,21 @@ pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatc
}

if let Some(engine_store_version) = matches.value_of("engine-version") {
config.server.engine_store_version = engine_store_version.to_owned();
proxy_config.engine_store_version = engine_store_version.to_owned();
}

if let Some(engine_store_git_hash) = matches.value_of("engine-git-hash") {
config.server.engine_store_git_hash = engine_store_git_hash.to_owned();
proxy_config.engine_store_git_hash = engine_store_git_hash.to_owned();
}

if config.server.engine_addr.is_empty() {
if proxy_config.engine_addr.is_empty() {
if let Some(engine_addr) = matches.value_of("engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
proxy_config.engine_addr = engine_addr.to_owned();
}
}

if let Some(engine_addr) = matches.value_of("advertise-engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
proxy_config.engine_addr = engine_addr.to_owned();
}

if let Some(data_dir) = matches.value_of("data-dir") {
Expand Down
20 changes: 11 additions & 9 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use tikv::{
config::{Config as ServerConfig, ServerConfigManager},
create_raft_storage,
gc_worker::{AutoGcConfig, GcWorker},
lock_manager::HackedLockManager as LockManager,
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
Expand Down Expand Up @@ -519,13 +519,14 @@ impl<ER: RaftEngine> TiKvServer<ER> {
.engine
.set_txn_extra_scheduler(Arc::new(txn_extra_scheduler));

// let lock_mgr = LockManager::new(&self.config.pessimistic_txn);
let lock_mgr = LockManager::new();
// cfg_controller.register(
// tikv::config::Module::PessimisticTxn,
// Box::new(lock_mgr.config_manager()),
// );
// lock_mgr.register_detector_role_change_observer(self.coprocessor_host.as_mut().unwrap());
// Recover TiKV's lock manager, since we don't use this crate now.
let lock_mgr = LockManager::new(&self.config.pessimistic_txn);
// let lock_mgr = LockManager::new();
cfg_controller.register(
tikv::config::Module::PessimisticTxn,
Box::new(lock_mgr.config_manager()),
);
lock_mgr.register_detector_role_change_observer(self.coprocessor_host.as_mut().unwrap());

let engines = self.engines.as_ref().unwrap();

Expand Down Expand Up @@ -613,7 +614,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)),
};

let storage = create_raft_storage::<_, _, _, F>(
let storage = create_raft_storage::<_, _, _, F, _>(
engines.engine.clone(),
&self.config.storage,
storage_read_pool_handle,
Expand Down Expand Up @@ -751,6 +752,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
self.state.clone(),
self.background_worker.clone(),
Some(health_service.clone()),
None,
);
node.try_bootstrap_store(engines.engines.clone())
.unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e));
Expand Down
18 changes: 0 additions & 18 deletions components/server/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,6 @@ pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatc
config.server.advertise_status_addr = advertise_status_addr.to_owned();
}

if let Some(engine_store_version) = matches.value_of("engine-version") {
config.server.engine_store_version = engine_store_version.to_owned();
}

if let Some(engine_store_git_hash) = matches.value_of("engine-git-hash") {
config.server.engine_store_git_hash = engine_store_git_hash.to_owned();
}

if config.server.engine_addr.is_empty() {
if let Some(engine_addr) = matches.value_of("engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
}
}

if let Some(engine_addr) = matches.value_of("advertise-engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
}

if let Some(data_dir) = matches.value_of("data-dir") {
config.storage.data_dir = data_dir.to_owned();
}
Expand Down
Loading

0 comments on commit 70de535

Please sign in to comment.