Skip to content

Commit

Permalink
add restart
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <[email protected]>
  • Loading branch information
CalvinNeo committed Aug 29, 2022
1 parent 7392cf2 commit 1e32c1d
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 18 deletions.
81 changes: 72 additions & 9 deletions new-mock-engine-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ pub use engine_store_ffi::{
interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper,
UnwrapExternCFunc,
};
use engine_traits::{Engines, Iterable, SyncMutable, CF_DEFAULT, CF_LOCK, CF_WRITE};
use engine_traits::{
Engines, Iterable, Peekable, SyncMutable, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use kvproto::{
raft_cmdpb::{AdminCmdType, AdminRequest},
raft_serverpb::{
Expand All @@ -38,7 +40,7 @@ type RegionId = u64;
pub struct Region {
pub region: kvproto::metapb::Region,
// Which peer is me?
peer: kvproto::metapb::Peer,
pub peer: kvproto::metapb::Peer,
// in-memory data
pub data: [BTreeMap<Vec<u8>, Vec<u8>>; 3],
// If we a key is deleted, it will immediately be removed from data,
Expand Down Expand Up @@ -104,6 +106,34 @@ impl EngineStoreServer {
None => None,
}
}

pub fn stop(&mut self) {
for (_, region) in self.kvstore.iter_mut() {
for cf in region.pending_write.iter_mut() {
cf.clear();
}
for cf in region.pending_delete.iter_mut() {
cf.clear();
}
for cf in region.data.iter_mut() {
cf.clear();
}
region.apply_state = Default::default();
// We don't clear applied_term.
}
}

pub fn restore(&mut self) {
// TODO We should actually read from engine store's persistence.
// However, since mock engine store don't persist itself,
// we read from proxy instead.
unsafe {
let region_ids = self.kvstore.keys().cloned().collect::<Vec<_>>();
for region_id in region_ids.into_iter() {
load_from_db(self, region_id);
}
}
}
}

pub struct EngineStoreServerWrap {
Expand Down Expand Up @@ -168,21 +198,54 @@ fn delete_kv_in_mem(region: &mut Box<Region>, cf_index: usize, k: &[u8]) {
data.remove(k);
}

unsafe fn load_from_db(store: &mut EngineStoreServer, region: &mut Box<Region>) {
let kv = &mut store.engines.as_mut().unwrap().kv;
unsafe fn load_from_db(store: &mut EngineStoreServer, region_id: u64) {
let store_id = store.id;
let engine = &mut store.engines.as_mut().unwrap().kv;
let apply_state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(region_id))
.unwrap()
.unwrap();
let region_state: RegionLocalState = engine
.get_msg_cf(CF_RAFT, &keys::region_state_key(region_id))
.unwrap()
.unwrap();

let region = store.kvstore.get_mut(&region_id).unwrap();
region.apply_state = apply_state;
region.region = region_state.get_region().clone();
set_new_region_peer(region, store.id);

for cf in 0..3 {
let cf_name = cf_to_name(cf.into());
region.data[cf].clear();
region.pending_delete[cf].clear();
region.pending_write[cf].clear();
let start = region.region.get_start_key().to_owned();
let end = region.region.get_end_key().to_owned();
kv.scan_cf(cf_name, &start, &end, false, |k, v| {
region.data[cf].insert(k.to_vec(), v.to_vec());
Ok(true)
})
.unwrap();
engine
.scan_cf(cf_name, &start, &end, false, |k, v| {
let origin_key = if keys::validate_data_key(k) {
keys::origin_key(k).to_vec()
} else {
k.to_vec()
};
region.data[cf].insert(origin_key, v.to_vec());
debug!("restored data";
"store" => store_id,
"region_id" => region_id,
"cf" => cf,
"k" => ?k,
"v" => ?v,
);
Ok(true)
})
.unwrap();
}
debug!("after restore";
"store" => store_id,
"region_id" => region_id,
"default size" => region.data[2].len(),
);
}

unsafe fn write_to_db_data(
Expand Down
3 changes: 2 additions & 1 deletion new-mock-engine-store/src/mock_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ pub struct EngineHelperSet {
}

pub struct Cluster<T: Simulator<TiFlashEngine>> {
pub ffi_helper_lst: Vec<FFIHelperSet>,
// Helper to set ffi_helper_set.
ffi_helper_lst: Vec<FFIHelperSet>,
pub ffi_helper_set: Arc<Mutex<HashMap<u64, FFIHelperSet>>>,

pub cfg: Config,
Expand Down
91 changes: 89 additions & 2 deletions tests/proxy/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1172,17 +1172,104 @@ mod restart {
fail::cfg("try_flush_data", "return(0)").unwrap();
let _ = cluster.run();

cluster.must_put(b"k0", b"v0");
let region = cluster.get_region(b"k0");
cluster.must_put(b"k", b"v");
let region = cluster.get_region(b"k");
let region_id = region.get_id();
for i in 0..10 {
let k = format!("k{}", i);
let v = format!("v{}", i);
cluster.must_put(k.as_bytes(), v.as_bytes());
}
let prev_state = collect_all_states(&cluster, region_id);
let (compact_index, compact_term) = get_valid_compact_index(&prev_state);
let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term);
let req =
test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log);
fail::cfg("try_flush_data", "return(1)").unwrap();
let res = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();

let eng_ids = cluster
.engines
.iter()
.map(|e| e.0.to_owned())
.collect::<Vec<_>>();

for i in 0..10 {
let k = format!("k{}", i);
let v = format!("v{}", i);
// Whatever already persisted or not, we won't loss data.
check_key(
&cluster,
k.as_bytes(),
v.as_bytes(),
Some(true),
None,
Some(vec![eng_ids[0]]),
);
}

for i in 10..20 {
let k = format!("k{}", i);
let v = format!("v{}", i);
cluster.must_put(k.as_bytes(), v.as_bytes());
}

for i in 10..20 {
let k = format!("k{}", i);
let v = format!("v{}", i);
// Whatever already persisted or not, we won't loss data.
check_key(
&cluster,
k.as_bytes(),
v.as_bytes(),
Some(true),
None,
Some(vec![eng_ids[0]]),
);
}

info!("stop node {}", eng_ids[0]);
cluster.stop_node(eng_ids[0]);
{
let lock = cluster.ffi_helper_set.lock();
lock.unwrap()
.deref_mut()
.get_mut(&eng_ids[0])
.unwrap()
.engine_store_server
.stop();
}

info!("resume node {}", eng_ids[0]);
{
let lock = cluster.ffi_helper_set.lock();
lock.unwrap()
.deref_mut()
.get_mut(&eng_ids[0])
.unwrap()
.engine_store_server
.restore();
}
info!("restored node {}", eng_ids[0]);
cluster.run_node(eng_ids[0]).unwrap();

std::thread::sleep(std::time::Duration::from_millis(2000));

for i in 0..20 {
let k = format!("k{}", i);
let v = format!("v{}", i);
// Whatever already persisted or not, we won't loss data.
check_key(
&cluster,
k.as_bytes(),
v.as_bytes(),
Some(true),
None,
Some(vec![eng_ids[0]]),
);
}

fail::remove("try_flush_data");
cluster.shutdown();
Expand Down
10 changes: 4 additions & 6 deletions tests/proxy/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,9 @@ pub fn must_get_mem(
value: Option<&[u8]>,
) {
let mut last_res: Option<&Vec<u8>> = None;
let cf = new_mock_engine_store::ffi_interfaces::ColumnFamilyType::Default;
for _ in 1..300 {
let res = engine_store_server.get_mem(
region_id,
new_mock_engine_store::ffi_interfaces::ColumnFamilyType::Default,
&key.to_vec(),
);
let res = engine_store_server.get_mem(region_id, cf, &key.to_vec());

if let (Some(value), Some(last_res)) = (value, res) {
assert_eq!(value, &last_res[..]);
Expand All @@ -175,11 +172,12 @@ pub fn must_get_mem(
}
let s = std::str::from_utf8(key).unwrap_or("");
panic!(
"can't get mem value {:?} for key {}({}) in {}, actual {:?}",
"can't get mem value {:?} for key {}({}) in store {} cf {:?}, actual {:?}",
value.map(tikv_util::escape),
log_wrappers::hex_encode_upper(key),
s,
engine_store_server.id,
cf,
last_res,
)
}
Expand Down

0 comments on commit 1e32c1d

Please sign in to comment.