Skip to content

Commit

Permalink
add savepoints 2 (#3)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <[email protected]>
  • Loading branch information
disksing authored Jan 7, 2022
1 parent ff005bb commit 199d901
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 8 deletions.
21 changes: 16 additions & 5 deletions src/server/gc_worker/gc_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use engine_traits::KvEngine;
use pd_client::FeatureGate;
use std::cmp::Ordering;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::{mpsc, Arc};
use std::sync::{mpsc, Arc, Mutex};
use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
use std::time::Duration;
use tikv_util::time::Instant;
Expand Down Expand Up @@ -225,6 +225,8 @@ pub(super) struct GcManager<S: GcSafePointProvider, R: RegionInfoProvider, E: Kv
/// updated, `GCManager` will start to do GC on all regions.
safe_point: Arc<AtomicU64>,

save_points: Arc<Mutex<Vec<TimeStamp>>>,

safe_point_last_check_time: Instant,

/// Used to schedule `GcTask`s.
Expand All @@ -245,9 +247,11 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
cfg_tracker: GcWorkerConfigManager,
feature_gate: FeatureGate,
) -> GcManager<S, R, E> {
let save_points = Arc::new(Mutex::new(vec![]));
GcManager {
cfg,
safe_point,
save_points,
safe_point_last_check_time: Instant::now(),
worker_scheduler,
gc_manager_ctx: GcManagerContext::new(),
Expand All @@ -261,9 +265,15 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
TimeStamp::new(ts)
}

fn save_safe_point(&self, ts: TimeStamp) {
fn curr_save_points(&self) -> Vec<TimeStamp> {
self.save_points.lock().unwrap().clone()
}

fn save_safe_point(&self, ts: TimeStamp, save_points: Vec<TimeStamp>) {
self.safe_point
.store(ts.into_inner(), AtomicOrdering::Relaxed);
let mut s = self.save_points.lock().unwrap();
*s = save_points;
}

/// Starts working in another thread. This function moves the `GcManager` and returns a handler
Expand Down Expand Up @@ -328,7 +338,7 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
/// updated to a greater value than initial value.
fn initialize(&mut self) {
debug!("gc-manager is initializing");
self.save_safe_point(TimeStamp::zero());
self.save_safe_point(TimeStamp::zero(), vec![]);
self.try_update_safe_point();
debug!("gc-manager started"; "safe_point" => self.curr_safe_point());
}
Expand All @@ -350,7 +360,7 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
fn try_update_safe_point(&mut self) -> bool {
self.safe_point_last_check_time = Instant::now();

let (safe_point, _) = match self.cfg.safe_point_provider.get_safe_point() {
let (safe_point, save_points) = match self.cfg.safe_point_provider.get_safe_point() {
Ok(res) => res,
// Return false directly so we will check it a while later.
Err(e) => {
Expand All @@ -371,7 +381,7 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
Ordering::Equal => false,
Ordering::Greater => {
debug!("gc_worker: update safe point"; "safe_point" => safe_point);
self.save_safe_point(safe_point);
self.save_safe_point(safe_point, save_points);
AUTO_GC_SAFE_POINT_GAUGE.set(safe_point.into_inner() as i64);
true
}
Expand Down Expand Up @@ -545,6 +555,7 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
start,
end,
self.curr_safe_point(),
self.curr_save_points(),
) {
// Ignore the error and continue, since it's useless to retry this.
// TODO: Find a better way to handle errors. Maybe we should retry.
Expand Down
8 changes: 7 additions & 1 deletion src/server/gc_worker/gc_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ where
start_key: Vec<u8>,
end_key: Vec<u8>,
safe_point: TimeStamp,
save_points: Vec<TimeStamp>,
callback: Callback<()>,
},
GcKeys {
Expand Down Expand Up @@ -683,6 +684,7 @@ fn schedule_gc(
start_key: Vec<u8>,
end_key: Vec<u8>,
safe_point: TimeStamp,
save_points: Vec<TimeStamp>,
callback: Callback<()>,
) -> Result<()> {
scheduler
Expand All @@ -691,6 +693,7 @@ fn schedule_gc(
start_key,
end_key,
safe_point,
save_points,
callback,
})
.or_else(handle_gc_task_schedule_error)
Expand All @@ -703,9 +706,10 @@ pub fn sync_gc(
start_key: Vec<u8>,
end_key: Vec<u8>,
safe_point: TimeStamp,
save_points: Vec<TimeStamp>,
) -> Result<()> {
wait_op!(|callback| schedule_gc(
scheduler, region_id, start_key, end_key, safe_point, callback
scheduler, region_id, start_key, end_key, safe_point, save_points, callback
))
.unwrap_or_else(|| {
error!("failed to receive result of gc");
Expand Down Expand Up @@ -914,12 +918,14 @@ where
self.check_is_busy(callback).map_or(Ok(()), |callback| {
let start_key = vec![];
let end_key = vec![];
let save_points = vec![];
self.worker_scheduler
.schedule(GcTask::Gc {
region_id: 0,
start_key,
end_key,
safe_point,
save_points,
callback,
})
.or_else(handle_gc_task_schedule_error)
Expand Down
12 changes: 11 additions & 1 deletion tests/integrations/server/gc_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,17 @@ fn test_gc_bypass_raft() {
}

let gc_sched = cluster.sim.rl().get_gc_worker(1).scheduler();
assert!(sync_gc(&gc_sched, 0, b"k1".to_vec(), b"k2".to_vec(), 200.into()).is_ok());
assert!(
sync_gc(
&gc_sched,
0,
b"k1".to_vec(),
b"k2".to_vec(),
200.into(),
vec![]
)
.is_ok()
);

for &start_ts in &[10, 20, 30] {
let commit_ts = start_ts + 5;
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/server/kv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ fn test_mvcc_resolve_lock_gc_and_delete() {
ts += 1;
let gc_safe_ponit = TimeStamp::from(ts);
let gc_scheduler = cluster.sim.rl().get_gc_worker(1).scheduler();
sync_gc(&gc_scheduler, 0, vec![], vec![], gc_safe_ponit).unwrap();
sync_gc(&gc_scheduler, 0, vec![], vec![], gc_safe_ponit, vec![]).unwrap();

// the `k` at the old ts should be none.
let get_version2 = commit_version + 1;
Expand Down

0 comments on commit 199d901

Please sign in to comment.