From 70de5355e12daca1663df33db21dee6ac5bc45ae Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Mon, 15 Aug 2022 14:00:44 +0800 Subject: [PATCH] Avoid set addr/engine-addr and HackedLockManager in src/node.rs (#139) --- .github/workflows/pr-ci.yml | 4 +- Cargo.lock | 1 + components/proxy_server/src/config.rs | 12 ++ .../proxy_server/src/hacked_lock_mgr.rs | 53 ++++++ components/proxy_server/src/lib.rs | 1 + components/proxy_server/src/proxy.rs | 75 ++++++--- components/proxy_server/src/run.rs | 30 +++- components/proxy_server/src/setup.rs | 17 +- components/server/src/server.rs | 20 ++- components/server/src/setup.rs | 18 -- components/test_raftstore/src/node.rs | 1 + components/test_raftstore/src/server.rs | 7 +- new-mock-engine-store/src/node.rs | 1 + readme.md | 155 ------------------ src/server/config.rs | 16 +- src/server/lock_manager/mod.rs | 47 +----- src/server/node.rs | 58 ++++--- tests/Cargo.toml | 1 + .../integrations/raftstore/test_bootstrap.rs | 1 + tests/proxy/normal.rs | 35 ++++ 20 files changed, 245 insertions(+), 308 deletions(-) create mode 100644 components/proxy_server/src/hacked_lock_mgr.rs delete mode 100644 readme.md diff --git a/.github/workflows/pr-ci.yml b/.github/workflows/pr-ci.yml index f209ed99eeb..6994c74f7f1 100644 --- a/.github/workflows/pr-ci.yml +++ b/.github/workflows/pr-ci.yml @@ -58,6 +58,8 @@ jobs: export ENGINE_LABEL_VALUE=tiflash export RUST_BACKTRACE=full cargo check + cargo test --features compat_new_proxy --package tests --test proxy normal + cargo test --package tests --test proxy proxy cargo test --package tests --test failpoints cases::test_normal cargo test --package tests --test failpoints cases::test_bootstrap cargo test --package tests --test failpoints cases::test_compact_log @@ -72,5 +74,3 @@ jobs: cargo test --package tests --test failpoints cases::test_merge cargo test --package tests --test failpoints cases::test_import_service cargo test --package tests --test failpoints cases::test_proxy_replica_read - cargo test --features compat_new_proxy --package tests --test proxy normal - cargo test --package tests --test proxy proxy diff --git a/Cargo.lock b/Cargo.lock index 5c11a6e9431..133beaaa9f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5782,6 +5782,7 @@ dependencies = [ "byteorder", "causal_ts", "cdc", + "clap", "collections", "concurrency_manager", "crc64fast", diff --git a/components/proxy_server/src/config.rs b/components/proxy_server/src/config.rs index da180a98095..e7f5461dd28 100644 --- a/components/proxy_server/src/config.rs +++ b/components/proxy_server/src/config.rs @@ -19,12 +19,24 @@ use crate::fatal; #[serde(rename_all = "kebab-case")] pub struct ProxyConfig { pub snap_handle_pool_size: usize, + pub engine_addr: String, + pub engine_store_version: String, + pub engine_store_git_hash: String, } +pub const DEFAULT_ENGINE_ADDR: &str = if cfg!(feature = "failpoints") { + "127.0.0.1:20206" +} else { + "" +}; + impl Default for ProxyConfig { fn default() -> Self { ProxyConfig { snap_handle_pool_size: 2, + engine_addr: DEFAULT_ENGINE_ADDR.to_string(), + engine_store_version: String::default(), + engine_store_git_hash: String::default(), } } } diff --git a/components/proxy_server/src/hacked_lock_mgr.rs b/components/proxy_server/src/hacked_lock_mgr.rs new file mode 100644 index 00000000000..43c99ec5e78 --- /dev/null +++ b/components/proxy_server/src/hacked_lock_mgr.rs @@ -0,0 +1,53 @@ +use tikv::{ + server::{lock_manager::waiter_manager::Callback, Error, Result}, + storage::{ + lock_manager::{DiagnosticContext, Lock, LockManager as LockManagerTrait, WaitTimeout}, + ProcessResult, StorageCallback, + }, +}; +use txn_types::TimeStamp; + +#[derive(Copy, Clone)] +pub struct HackedLockManager {} + +#[allow(dead_code)] +#[allow(unused_variables)] +impl LockManagerTrait for HackedLockManager { + fn wait_for( + &self, + start_ts: TimeStamp, + cb: StorageCallback, + pr: ProcessResult, + lock: Lock, + is_first_lock: bool, + timeout: Option, + diag_ctx: DiagnosticContext, + ) { + unimplemented!() + } + + fn wake_up( + &self, + lock_ts: TimeStamp, + hashes: Vec, + commit_ts: TimeStamp, + is_pessimistic_txn: bool, + ) { + unimplemented!() + } + + fn has_waiter(&self) -> bool { + todo!() + } + + fn dump_wait_for_entries(&self, cb: Callback) { + todo!() + } +} + +impl HackedLockManager { + pub fn new() -> Self { + Self {} + } + pub fn stop(&mut self) {} +} diff --git a/components/proxy_server/src/lib.rs b/components/proxy_server/src/lib.rs index 1119e6ade56..d113356f818 100644 --- a/components/proxy_server/src/lib.rs +++ b/components/proxy_server/src/lib.rs @@ -7,6 +7,7 @@ extern crate tikv_util; #[macro_use] pub mod config; +pub mod hacked_lock_mgr; pub mod proxy; pub mod run; pub mod setup; diff --git a/components/proxy_server/src/proxy.rs b/components/proxy_server/src/proxy.rs index fbed7b05425..1503386900a 100644 --- a/components/proxy_server/src/proxy.rs +++ b/components/proxy_server/src/proxy.rs @@ -8,7 +8,7 @@ use std::{ process, }; -use clap::{App, Arg}; +use clap::{App, Arg, ArgMatches}; use tikv::config::TiKvConfig; use crate::{ @@ -18,6 +18,49 @@ use crate::{ }, }; +// Not the same as TiKV +pub const TIFLASH_DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20170"; +pub const TIFLASH_DEFAULT_STATUS_ADDR: &str = "127.0.0.1:20292"; + +fn make_tikv_config() -> TiKvConfig { + let mut default = TiKvConfig::default(); + setup_default_tikv_config(&mut default); + default +} + +pub fn setup_default_tikv_config(default: &mut TiKvConfig) { + default.server.addr = TIFLASH_DEFAULT_LISTENING_ADDR.to_string(); + default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string(); + default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string(); +} + +pub fn gen_tikv_config( + matches: &ArgMatches, + is_config_check: bool, + unrecognized_keys: &mut Vec, +) -> TiKvConfig { + matches + .value_of_os("config") + .map_or_else(make_tikv_config, |path| { + let path = Path::new(path); + TiKvConfig::from_file( + path, + if is_config_check { + Some(unrecognized_keys) + } else { + None + }, + ) + .unwrap_or_else(|e| { + panic!( + "invalid auto generated configuration file {}, err {}", + path.display(), + e + ); + }) + }) +} + pub unsafe fn run_proxy( argc: c_int, argv: *const *const c_char, @@ -223,34 +266,11 @@ pub unsafe fn run_proxy( let mut unrecognized_keys = Vec::new(); let is_config_check = matches.is_present("config-check"); - let mut config = matches - .value_of_os("config") - .map_or_else(TiKvConfig::default, |path| { - let path = Path::new(path); - TiKvConfig::from_file( - path, - if is_config_check { - Some(&mut unrecognized_keys) - } else { - None - }, - ) - .unwrap_or_else(|e| { - panic!( - "invalid auto generated configuration file {}, err {}", - path.display(), - e - ); - }) - }); - - check_engine_label(&matches); - overwrite_config_with_cmd_args(&mut config, &matches); - config.logger_compatible_adjust(); + let mut config = gen_tikv_config(&matches, is_config_check, &mut unrecognized_keys); let mut proxy_unrecognized_keys = Vec::new(); // Double read the same file for proxy-specific arguments. - let proxy_config = + let mut proxy_config = matches .value_of_os("config") .map_or_else(crate::config::ProxyConfig::default, |path| { @@ -271,6 +291,9 @@ pub unsafe fn run_proxy( ); }) }); + check_engine_label(&matches); + overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches); + config.logger_compatible_adjust(); // TODO(tiflash) We should later use ProxyConfig for proxy's own settings like `snap_handle_pool_size` if is_config_check { diff --git a/components/proxy_server/src/run.rs b/components/proxy_server/src/run.rs index f3e53fe1c72..8540141d6eb 100644 --- a/components/proxy_server/src/run.rs +++ b/components/proxy_server/src/run.rs @@ -79,7 +79,6 @@ use tikv::{ config::{Config as ServerConfig, ServerConfigManager}, create_raft_storage, gc_worker::{AutoGcConfig, GcWorker}, - lock_manager::HackedLockManager as LockManager, raftkv::ReplicaReadLockChecker, resolve, service::{DebugService, DiagnosticsService}, @@ -105,7 +104,10 @@ use tikv_util::{ }; use tokio::runtime::Builder; -use crate::{config::ProxyConfig, fatal, setup::*, util::ffi_server_info}; +use crate::{ + config::ProxyConfig, fatal, hacked_lock_mgr::HackedLockManager as LockManager, setup::*, + util::ffi_server_info, +}; #[inline] pub fn run_impl( @@ -948,7 +950,7 @@ impl TiKvServer { in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)), }; - let storage = create_raft_storage::<_, _, _, F>( + let storage = create_raft_storage::<_, _, _, F, _>( engines.engine.clone(), &self.config.storage, storage_read_pool_handle, @@ -1077,6 +1079,27 @@ impl TiKvServer { .unwrap_or_else(|e| fatal!("failed to validate raftstore config {}", e)); let raft_store = Arc::new(VersionTrack::new(self.config.raft_store.clone())); let health_service = HealthService::default(); + let mut default_store = kvproto::metapb::Store::default(); + + if !self.proxy_config.engine_store_version.is_empty() { + default_store.set_version(self.proxy_config.engine_store_version.clone()); + } + if !self.proxy_config.engine_store_git_hash.is_empty() { + default_store.set_git_hash(self.proxy_config.engine_store_git_hash.clone()); + } + // addr -> store.peer_address + if self.config.server.advertise_addr.is_empty() { + default_store.set_peer_address(self.config.server.addr.clone()); + } else { + default_store.set_peer_address(self.config.server.advertise_addr.clone()) + } + // engine_addr -> store.addr + if !self.proxy_config.engine_addr.is_empty() { + default_store.set_address(self.proxy_config.engine_addr.clone()); + } else { + panic!("engine address is empty"); + } + let mut node = Node::new( self.system.take().unwrap(), &server_config.value().clone(), @@ -1086,6 +1109,7 @@ impl TiKvServer { self.state.clone(), self.background_worker.clone(), Some(health_service.clone()), + Some(default_store), ); node.try_bootstrap_store(engines.engines.clone()) .unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e)); diff --git a/components/proxy_server/src/setup.rs b/components/proxy_server/src/setup.rs index 322a91cd789..d6455d53df6 100644 --- a/components/proxy_server/src/setup.rs +++ b/components/proxy_server/src/setup.rs @@ -16,10 +16,15 @@ pub use server::setup::{ use tikv::config::{check_critical_config, persist_config, MetricConfig, TiKvConfig}; use tikv_util::{self, config, logger}; +use crate::config::ProxyConfig; pub use crate::fatal; #[allow(dead_code)] -pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatches<'_>) { +pub fn overwrite_config_with_cmd_args( + config: &mut TiKvConfig, + proxy_config: &mut ProxyConfig, + matches: &ArgMatches<'_>, +) { if let Some(level) = matches.value_of("log-level") { config.log.level = logger::get_level_by_string(level).unwrap(); config.log_level = slog::Level::Info; @@ -47,21 +52,21 @@ pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatc } if let Some(engine_store_version) = matches.value_of("engine-version") { - config.server.engine_store_version = engine_store_version.to_owned(); + proxy_config.engine_store_version = engine_store_version.to_owned(); } if let Some(engine_store_git_hash) = matches.value_of("engine-git-hash") { - config.server.engine_store_git_hash = engine_store_git_hash.to_owned(); + proxy_config.engine_store_git_hash = engine_store_git_hash.to_owned(); } - if config.server.engine_addr.is_empty() { + if proxy_config.engine_addr.is_empty() { if let Some(engine_addr) = matches.value_of("engine-addr") { - config.server.engine_addr = engine_addr.to_owned(); + proxy_config.engine_addr = engine_addr.to_owned(); } } if let Some(engine_addr) = matches.value_of("advertise-engine-addr") { - config.server.engine_addr = engine_addr.to_owned(); + proxy_config.engine_addr = engine_addr.to_owned(); } if let Some(data_dir) = matches.value_of("data-dir") { diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 4ee557b08d9..23c7fdc6d07 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -88,7 +88,7 @@ use tikv::{ config::{Config as ServerConfig, ServerConfigManager}, create_raft_storage, gc_worker::{AutoGcConfig, GcWorker}, - lock_manager::HackedLockManager as LockManager, + lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve, service::{DebugService, DiagnosticsService}, @@ -519,13 +519,14 @@ impl TiKvServer { .engine .set_txn_extra_scheduler(Arc::new(txn_extra_scheduler)); - // let lock_mgr = LockManager::new(&self.config.pessimistic_txn); - let lock_mgr = LockManager::new(); - // cfg_controller.register( - // tikv::config::Module::PessimisticTxn, - // Box::new(lock_mgr.config_manager()), - // ); - // lock_mgr.register_detector_role_change_observer(self.coprocessor_host.as_mut().unwrap()); + // Recover TiKV's lock manager, since we don't use this crate now. + let lock_mgr = LockManager::new(&self.config.pessimistic_txn); + // let lock_mgr = LockManager::new(); + cfg_controller.register( + tikv::config::Module::PessimisticTxn, + Box::new(lock_mgr.config_manager()), + ); + lock_mgr.register_detector_role_change_observer(self.coprocessor_host.as_mut().unwrap()); let engines = self.engines.as_ref().unwrap(); @@ -613,7 +614,7 @@ impl TiKvServer { in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)), }; - let storage = create_raft_storage::<_, _, _, F>( + let storage = create_raft_storage::<_, _, _, F, _>( engines.engine.clone(), &self.config.storage, storage_read_pool_handle, @@ -751,6 +752,7 @@ impl TiKvServer { self.state.clone(), self.background_worker.clone(), Some(health_service.clone()), + None, ); node.try_bootstrap_store(engines.engines.clone()) .unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e)); diff --git a/components/server/src/setup.rs b/components/server/src/setup.rs index 37f6bb92f66..e2adc47fbe0 100644 --- a/components/server/src/setup.rs +++ b/components/server/src/setup.rs @@ -260,24 +260,6 @@ pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatc config.server.advertise_status_addr = advertise_status_addr.to_owned(); } - if let Some(engine_store_version) = matches.value_of("engine-version") { - config.server.engine_store_version = engine_store_version.to_owned(); - } - - if let Some(engine_store_git_hash) = matches.value_of("engine-git-hash") { - config.server.engine_store_git_hash = engine_store_git_hash.to_owned(); - } - - if config.server.engine_addr.is_empty() { - if let Some(engine_addr) = matches.value_of("engine-addr") { - config.server.engine_addr = engine_addr.to_owned(); - } - } - - if let Some(engine_addr) = matches.value_of("advertise-engine-addr") { - config.server.engine_addr = engine_addr.to_owned(); - } - if let Some(data_dir) = matches.value_of("data-dir") { config.storage.data_dir = data_dir.to_owned(); } diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index b1839791471..db87f850a09 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -244,6 +244,7 @@ impl Simulator for NodeCluster { Arc::default(), bg_worker.clone(), None, + None, ); let (snap_mgr, snap_mgr_path) = if node_id == 0 diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 629ce7506a4..b603ca85a79 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -57,7 +57,7 @@ use tikv::{ create_raft_storage, gc_worker::GcWorker, load_statistics::ThreadLoadPool, - lock_manager::HackedLockManager as LockManager, + lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve::{self, StoreAddrResolver}, service::DebugService, @@ -373,7 +373,7 @@ impl ServerCluster { let check_leader_runner = CheckLeaderRunner::new(store_meta.clone()); let check_leader_scheduler = bg_worker.start("check-leader", check_leader_runner); - let mut lock_mgr = LockManager::new(); + let mut lock_mgr = LockManager::new(&cfg.pessimistic_txn); let quota_limiter = Arc::new(QuotaLimiter::new( cfg.quota.foreground_cpu_time, cfg.quota.foreground_write_bandwidth, @@ -386,7 +386,7 @@ impl ServerCluster { pipelined_pessimistic_lock: Arc::new(AtomicBool::new(true)), in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)), }; - let store = create_raft_storage::<_, _, _, F>( + let store = create_raft_storage::<_, _, _, F, _>( engine, &cfg.storage, storage_read_pool.handle(), @@ -481,6 +481,7 @@ impl ServerCluster { state, bg_worker.clone(), Some(health_service.clone()), + None, ); node.try_bootstrap_store(engines.clone())?; let node_id = node.id(); diff --git a/new-mock-engine-store/src/node.rs b/new-mock-engine-store/src/node.rs index 9bdc6de34d5..384dbb7ea9d 100644 --- a/new-mock-engine-store/src/node.rs +++ b/new-mock-engine-store/src/node.rs @@ -256,6 +256,7 @@ impl Simulator for NodeCluster { Arc::default(), bg_worker.clone(), None, + None, ); let (snap_mgr, snap_mgr_path) = if node_id == 0 diff --git a/readme.md b/readme.md deleted file mode 100644 index e00ab27c341..00000000000 --- a/readme.md +++ /dev/null @@ -1,155 +0,0 @@ -# TiDB Engine Extensions Library - -## Abstract - -This repository is to introduce a [TiKV](https://github.com/tikv/tikv) based `c dynamic library` for extending storage system in `TiDB` cluster. -It aims to export current multi-raft framework to other engines and make them be able to provide services(read/write) as `raftstore` directly. - -## Background - -Initially, such framework was designed for `Realtime HTAP` scenarios. -There is already a distributed OLTP storage product `TiKV`, and we could extend other kind of realtime analytics system based on current multi-raft mechanism to handle more complicated scenarios. -For example, assume a strong schema-aware storage node could be accessed as a raftstore with special identification labels. -Third-party components can use [Placement Rules](https://docs.pingcap.com/tidb/stable/configure-placement-rules), provided by `PD`, to schedule learner/voter replicas into it. -If such storage system has supported `Multi-raft RSM`, `Percolator Transaction Model` and `Transaction Read Protocol`, just like `TiFlash`(a distributed column-based storage) does, it will be appropriate for `HTAP` cases. - -If transaction is not required, like most `OLAP` cases which only guarantee `Eventual Consistency`, and what matters more is throughput rather than latency. -Then, data(formed by table schema or other pattern) could be R/W from this kind of raftstore directly. - -## Design - -### Overview - -Generally speaking, there are two storage components in TiKV for maintaining multi-raft RSM: `RaftEngine` and `KvEngine`. -KvEngine is mainly used for applying raft command and providing key-value services. -RaftEngine will parse its own committed raft log into corresponding normal/admin raft commands, which will be handled by the apply process. -Multiple modifications about region data/meta/apply-state will be encapsulated into one `Write Batch` and written into KvEngine atomically. -It is an option to replace KvEngine with `Engine Traits`. -But it's not easy to guarantee atomicity while writing/reading dynamic key-value pair(such as meta/apply-state) and patterned data(strong schema) together for other storage systems. -Besides, a few modules and components(like importer or lighting) reply on the SST format of KvEngine in TiKV. -It may cost a lot to achieve such a replacement. - -It's suggested to let the apply process work as usual but only persist meta and state information to bring a few intrusive modifications against the original logic of TiKV. -i.e., we must replace everywhere that may write normal region data with related interfaces. -Unlike KvEngine, the storage system(called `engine-store`) under such a framework should be aware of the transition about multi-raft RSM from these interfaces. -The `engine-store` must have the ability to deal with raft commands to handle queries with region epoch. - -The `region snapshot` presents the complete region information(data/meta/apply-state) at a specific apply-state. - -Anyway, because there are at least two asynchronous runtimes in one program, the best practice of such raft store is to guarantee `External Consistency` by `region snapshot`. -The raft logs persisted in RaftEngine are the `WAL(Write-ahead Log)` of the apply process. -Index of raft entry within the same region peer is monotonic increasing. -If the process is interrupted at the middle step, it should replay from the last persisted apply-state after the restart. -Until a safe point is reached, related modifications are not visible to others. - -`Idempotency` is an essential property for `External Consistency`, which means such a system could handle outdated raft commands. A practical way is like: - -- Fsync snapshot in `engine-store` atomically -- Fsync region snapshot in `raftstore-proxy` atomically -- Make RaftEngine only GC raft log whose index is smaller than persisted apply-state -- `engine-store` should screen out raft commands with outdated apply-state during apply process -- `engine-store` should recover from the middle step by overwriting and must NOT provide services until caught up with the latest state - -Such architecture inherited several important features from TiKV, such as distributed fault tolerance/recovery, automatic re-balancing, etc. -It's also convenient for PD to maintain this kind of storage system by the existing way as long as it works as `raft store`. - -#### Interfaces - -Since the program language `Rust`, which TiKV uses, has zero-cost abstractions, it's straightforward to let different threads interact with each other by `FFI`(Foreign Function Interface). -Such mode brings almost no overhead. -However, any caller must be pretty clear about the exact safe/unsafe operations boundary. -The structure used by different runtimes through interfaces must have the same memory layout. - -It's feasible to refactor TiKV source code and extract parts of the necessary process into interfaces. The main categories are like: - -- applying normal-write raft command -- applying admin raft command -- peer detection: destroy peer -- region snapshot: pre-handle/apply region snapshot -- SST file reader -- applying `IngestSst` command -- replica read: batch read-index -- encryption: get file; new file; delete file; link file; rename file; -- status services: metrics; CPU profile; config; thread stats; self-defined API; -- store stats: key/bytes R/W stats; disk stats; `engine-store` stats; -- tools/utils - -TiKV can split or merge regions to make the partitions more flexible. -When the size of a region exceeds the limit, it will split into two or more regions, and its range would change from `[a, c)` to `[a, b)` and `[b, c)`. -When the sizes of two consecutive regions are small enough, TiKV will merge them into one, and their range would change from `[a, b)` and `[b, c)` to `[a, c)`. - -We must persist the region snapshot when executing admin raft commands about `split`, `merge` or `change peer` because such commands will change the core properties(`version`, `conf version`, `start/end key`) of multi-raft RSM. -Ignorable admin command `CompactLog` may trigger raft log GC in `RaftEngine`. -Thus, to execute such commands, it's required to persist region snapshot. -But while executing normal-write command, which won't change region meta, the decision of persisting can be pushed down to `engine-store`. - -When the region in the current store is illegal or pending removal, it will execute a `destroy-peer` task to clean useless data. - -According to the basic transaction log replication, a leader peer must commit or apply each writing action before returning success ACK to the client. -When any peer tries to respond to queries, it should get the latest committed index from the leader and wait until the apply-state caught up to ensure it has enough context. -For learners/followers or even leaders, the `Read Index` is a practical choice to check the latest `Lease` because it's easy to make any peer of region group provide read service under the same logic as the overhead of read-index itself is insignificant. - -When the leader peer has reclaimed related raft log or other peers can not proceed with RSM in the current context, other peers can request a region snapshot from the leader. -However, the region snapshot data, whose format is TiKV's `SST` file, is not usually used by other storage systems directly. -The standard process has been divided into several parts to accelerate the speed of applying region snapshot data: - -- `SST File Reader` to read key-value one by one from SST files -- Multi-thread pool to pre-handle SST files into the self-defined structure of `engine-store` -- Delete old data within [start-key, end-key) of the new region strictly. -- Apply self-defined structure by original sequence - -Interfaces about `IngestSst` are the core to be compatible with `TiDB Lighting` and `BR` for the `HTAP` scenario. -It can substantially speed up data loading/restoring. -`SST File Reader` is also useful when applying the `IngestSst` raft command. - -Encryption is essential for `DBaaS`(database as a service). -To be compatible with TiKV, a data key manager with the same logic is indispensable, especially for rotating data encryption keys or using the KMS service. - -Status services like metrics, CPU/Memory profile(flame graph), or other self-defined stats can effectively support the diagnosis. -It's suggested to encapsulate those into one status server and let other external components visit through the status address. -We could also reuse most of the original metrics of TiKV, and an optional way is to add a specific prefix for each name. - -When maintaining DWAL, it's practical to batch raft msg before fsync as long as latency is tolerable to reduce IOPS(mainly in RaftEngine) and make it system-friendly with poor performance. - -## Usage - -There are two exposed extern "C" functions in [raftstore-proxy](raftstore-proxy/src/lib.rs): - -- `print_raftstore_proxy_version`: print necessary version information(just like TiKV does) into standard output. -- `run_raftstore_proxy_ffi`: - - the main entry accepts established function pointer interfaces and command arguments. - - it's suggested to run main entry function in another independent thread because it will block current context. - -To use this library, please follow the steps below: -- Install `grpc`, `protobuf`, `c++`, `rust`. -- Include this project as submodule. -- Modify [FFI Source Code](raftstore-proxy/ffi/src/RaftStoreProxyFFI) under namspace `DB` if necessary and run `make gen_proxy_ffi`. -- Run `ENGINE_LABEL_VALUE=xxx make release` - - label `engine:${ENGINE_LABEL_VALUE}` will be added to store info automatically - - prefix `${ENGINE_LABEL_VALUE}_proxy_` will be added to each metrics name; -- Include FFI header files and implement related interfaces (mainly `struct EngineStoreServerHelper` and `struct RaftStoreProxyFFIHelper`) by `c++`. -- Compile and link target library `target/release/lib${ENGINE_LABEL_VALUE}_proxy.dylib|so`. - -## Interfaces Description - -TBD. - -## TODO - -- support R/W as `Leader` -- resources control -- async future framework -- direct writing - -## Contact - -[Zhigao Tong](http://github.com/solotzg) ([tongzhigao@pingcap.com](mailto:tongzhigao@pingcap.com)) - -## License - -Apache 2.0 license. See the [LICENSE](./LICENSE) file for details. - -## Acknowledgments - -- Thanks [tikv](https://github.com/tikv/tikv) for providing source code. -- Thanks [pd](https://github.com/tikv/pd) for providing `placement rules`. diff --git a/src/server/config.rs b/src/server/config.rs index d5975753bf8..050f7f0bfff 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -18,15 +18,10 @@ use super::{snap::Task as SnapTask, Result}; pub use crate::storage::config::Config as StorageConfig; pub const DEFAULT_CLUSTER_ID: u64 = 0; -pub const DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20170"; -pub const DEFAULT_ENGINE_ADDR: &str = if cfg!(feature = "failpoints") { - "127.0.0.1:20206" -} else { - "" -}; +pub const DEFAULT_LISTENING_ADDR: &str = ""; const DEFAULT_ADVERTISE_LISTENING_ADDR: &str = ""; -const DEFAULT_STATUS_ADDR: &str = "127.0.0.1:20292"; +const DEFAULT_STATUS_ADDR: &str = ""; const DEFAULT_GRPC_CONCURRENCY: usize = 5; const DEFAULT_GRPC_CONCURRENT_STREAM: i32 = 1024; const DEFAULT_GRPC_RAFT_CONN_NUM: usize = 1; @@ -77,10 +72,6 @@ pub struct Config { #[online_config(skip)] pub advertise_addr: String, - pub engine_addr: String, - pub engine_store_version: String, - pub engine_store_git_hash: String, - // These are related to TiKV status. #[online_config(skip)] pub status_addr: String, @@ -211,9 +202,6 @@ impl Default for Config { addr: DEFAULT_LISTENING_ADDR.to_owned(), labels: HashMap::default(), advertise_addr: DEFAULT_ADVERTISE_LISTENING_ADDR.to_owned(), - engine_addr: DEFAULT_ENGINE_ADDR.to_string(), - engine_store_version: "".to_string(), - engine_store_git_hash: "".to_string(), status_addr: DEFAULT_STATUS_ADDR.to_owned(), advertise_status_addr: DEFAULT_ADVERTISE_LISTENING_ADDR.to_owned(), status_thread_pool_size: 1, diff --git a/src/server/lock_manager/mod.rs b/src/server/lock_manager/mod.rs index 7d0ccc240c6..7527f07b5da 100644 --- a/src/server/lock_manager/mod.rs +++ b/src/server/lock_manager/mod.rs @@ -56,7 +56,7 @@ fn detected_slot_idx(txn_ts: TimeStamp) -> usize { /// * One is the `WaiterManager` which manages transactions waiting for locks. /// * The other one is the `Detector` which detects deadlocks between transactions. #[allow(dead_code)] -struct LockManager { +pub struct LockManager { waiter_mgr_worker: Option>, detector_worker: Option>, @@ -73,51 +73,6 @@ struct LockManager { in_memory: Arc, } -#[derive(Copy, Clone)] -pub struct HackedLockManager {} - -#[allow(dead_code)] -#[allow(unused_variables)] -impl LockManagerTrait for HackedLockManager { - fn wait_for( - &self, - start_ts: TimeStamp, - cb: StorageCallback, - pr: ProcessResult, - lock: Lock, - is_first_lock: bool, - timeout: Option, - diag_ctx: DiagnosticContext, - ) { - unimplemented!() - } - - fn wake_up( - &self, - lock_ts: TimeStamp, - hashes: Vec, - commit_ts: TimeStamp, - is_pessimistic_txn: bool, - ) { - unimplemented!() - } - - fn has_waiter(&self) -> bool { - todo!() - } - - fn dump_wait_for_entries(&self, cb: Callback) { - todo!() - } -} - -impl HackedLockManager { - pub fn new() -> Self { - Self {} - } - pub fn stop(&mut self) {} -} - impl Clone for LockManager { fn clone(&self) -> Self { Self { diff --git a/src/server/node.rs b/src/server/node.rs index c5cef4663d1..f49da416c34 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -35,10 +35,11 @@ use super::{RaftKv, Result}; use crate::{ import::SstImporter, read_pool::ReadPoolHandle, - server::{lock_manager::HackedLockManager as LockManager, Config as ServerConfig}, + server::Config as ServerConfig, storage::{ config::Config as StorageConfig, kv::FlowStatsReporter, - txn::flow_controller::FlowController, DynamicConfigs as StorageDynamicConfigs, Storage, + lock_manager::LockManager as LockManagerTrait, txn::flow_controller::FlowController, + DynamicConfigs as StorageDynamicConfigs, Storage, }, }; @@ -47,11 +48,11 @@ const CHECK_CLUSTER_BOOTSTRAPPED_RETRY_SECONDS: u64 = 3; /// Creates a new storage engine which is backed by the Raft consensus /// protocol. -pub fn create_raft_storage( +pub fn create_raft_storage( engine: RaftKv, cfg: &StorageConfig, read_pool: ReadPoolHandle, - lock_mgr: LockManager, + lock_mgr: LM, concurrency_manager: ConcurrencyManager, dynamic_configs: StorageDynamicConfigs, flow_controller: Arc, @@ -59,7 +60,7 @@ pub fn create_raft_storage( resource_tag_factory: ResourceTagFactory, quota_limiter: Arc, feature_gate: FeatureGate, -) -> Result, LockManager, F>> +) -> Result, LM, F>> where S: RaftStoreRouter + LocalReadRouter + 'static, EK: KvEngine, @@ -112,32 +113,30 @@ where state: Arc>, bg_worker: Worker, health_service: Option, + default_store: Option, ) -> Node { - let mut store = metapb::Store::default(); + let mut store = match default_store { + None => metapb::Store::default(), + Some(s) => s, + }; store.set_id(INVALID_ID); - if cfg.advertise_addr.is_empty() { - store.set_peer_address(cfg.addr.clone()); - } else { - store.set_peer_address(cfg.advertise_addr.clone()) - } - - if !cfg.engine_addr.is_empty() { - store.set_address(cfg.engine_addr.clone()); - } else { - panic!("engine address is empty"); - } - - if !cfg.engine_store_version.is_empty() { - store.set_version(cfg.engine_store_version.clone()); + if store.get_address() == "" { + if cfg.advertise_addr.is_empty() { + store.set_address(cfg.addr.clone()); + } else { + store.set_address(cfg.advertise_addr.clone()) + } } - if !cfg.engine_store_git_hash.is_empty() { - store.set_git_hash(cfg.engine_store_git_hash.clone()); + if store.get_status_address() == "" { + if cfg.advertise_status_addr.is_empty() { + store.set_status_address(cfg.status_addr.clone()); + } else { + store.set_status_address(cfg.advertise_status_addr.clone()) + } } - if cfg.advertise_status_addr.is_empty() { - store.set_status_address(cfg.status_addr.clone()); - } else { - store.set_status_address(cfg.advertise_status_addr.clone()) + if store.get_version() == "" { + store.set_version(env!("CARGO_PKG_VERSION").to_string()); } if let Ok(path) = std::env::current_exe() { @@ -147,6 +146,13 @@ where }; store.set_start_timestamp(chrono::Local::now().timestamp()); + if store.get_git_hash() == "" { + store.set_git_hash( + option_env!("TIKV_BUILD_GIT_HASH") + .unwrap_or("Unknown git hash") + .to_string(), + ); + } let mut labels = Vec::new(); for (k, v) in &cfg.labels { diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 6eeb0645362..e585353d6f0 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -107,6 +107,7 @@ byteorder = "1.2" # See https://bheisler.github.io/criterion.rs/book/user_guide/known_limitations.html for the usage # of `real_blackbox` feature. causal_ts = { path = "../components/causal_ts" } +clap = "2.32" concurrency_manager = { path = "../components/concurrency_manager", default-features = false } criterion = "0.3" criterion-cpu-time = "0.1" diff --git a/tests/integrations/raftstore/test_bootstrap.rs b/tests/integrations/raftstore/test_bootstrap.rs index 058728cb0a3..64ad6aeded2 100644 --- a/tests/integrations/raftstore/test_bootstrap.rs +++ b/tests/integrations/raftstore/test_bootstrap.rs @@ -68,6 +68,7 @@ fn test_node_bootstrap_with_prepared_data() { Arc::default(), bg_worker, None, + None, ); let snap_mgr = SnapManager::new(tmp_mgr.path().to_str().unwrap()); let pd_worker = LazyWorker::new("test-pd-worker"); diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 9a5c38e4a77..fb7d3f521ba 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -11,6 +11,7 @@ use std::{ }, }; +use clap::{App, Arg, ArgMatches}; use engine_traits::{ Error, ExternalSstFileInfo, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey, SstExt, SstReader, SstWriter, SstWriterBuilder, WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, @@ -31,6 +32,10 @@ use new_mock_engine_store::{ use pd_client::PdClient; use proxy_server::{ config::{address_proxy_config, ensure_no_common_unrecognized_keys}, + proxy::{ + gen_tikv_config, setup_default_tikv_config, TIFLASH_DEFAULT_LISTENING_ADDR, + TIFLASH_DEFAULT_STATUS_ADDR, + }, run::run_tikv_proxy, }; use raft::eraftpb::MessageType; @@ -62,8 +67,11 @@ fn test_config() { let mut unrecognized_keys = Vec::new(); let mut config = TiKvConfig::from_file(path, Some(&mut unrecognized_keys)).unwrap(); + // Othersize we have no default addr for TiKv. + setup_default_tikv_config(&mut config); assert_eq!(config.memory_usage_high_water, 0.65); assert_eq!(config.rocksdb.max_open_files, 111); + assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); assert_eq!(unrecognized_keys.len(), 3); let mut proxy_unrecognized_keys = Vec::new(); @@ -93,6 +101,33 @@ fn test_config() { } #[test] +fn test_config_addr() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + let text = "memory-usage-high-water=0.65\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + write!(file, "{}", text).unwrap(); + let path = file.path(); + let mut args: Vec<&str> = vec![]; + let matches = App::new("RaftStore Proxy") + .arg( + Arg::with_name("config") + .short("C") + .long("config") + .value_name("FILE") + .help("Set the configuration file") + .takes_value(true), + ) + .get_matches_from(args); + let c = format!("--config {}", path.to_str().unwrap()); + let mut v = vec![c]; + let config = gen_tikv_config(&matches, false, &mut v); + assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); + assert_eq!(config.server.status_addr, TIFLASH_DEFAULT_STATUS_ADDR); + assert_eq!( + config.server.advertise_status_addr, + TIFLASH_DEFAULT_STATUS_ADDR + ); +} + fn test_store_stats() { let (mut cluster, pd_client) = new_mock_cluster(0, 1);