Skip to content

Commit

Permalink
Implement the flashback query
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jan 7, 2022
1 parent 07ddc34 commit dafc675
Show file tree
Hide file tree
Showing 16 changed files with 97 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion components/tidb_query_common/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub trait Storage: Send {
is_key_only: bool,
need_mvcc: bool,
range: IntervalRange,
flashback_tss: &[(u64, u64)],
) -> Result<()>;

fn scan_next(&mut self) -> Result<Option<OwnedKvPair>>;
Expand All @@ -46,8 +47,15 @@ impl<T: Storage + ?Sized> Storage for Box<T> {
is_key_only: bool,
need_mvcc: bool,
range: IntervalRange,
flashback_tss: &[(u64, u64)],
) -> Result<()> {
(**self).begin_scan(is_backward_scan, is_key_only, need_mvcc, range)
(**self).begin_scan(
is_backward_scan,
is_key_only,
need_mvcc,
range,
flashback_tss,
)
}

fn scan_next(&mut self) -> Result<Option<OwnedKvPair>> {
Expand Down
5 changes: 5 additions & 0 deletions components/tidb_query_common/src/storage/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct RangesScanner<T> {
is_key_only: bool,

need_mvcc: bool,
flashback_tss: Vec<(u64, u64)>,

scanned_rows_per_range: Vec<usize>,

Expand All @@ -36,6 +37,7 @@ pub struct RangesScannerOptions<T> {
pub is_key_only: bool, // TODO: This can be const generics
pub is_scanned_range_aware: bool, // TODO: This can be const generics
pub need_mvcc: bool,
pub flashback_tss: Vec<(u64, u64)>,
}

impl<T: Storage> RangesScanner<T> {
Expand All @@ -47,6 +49,7 @@ impl<T: Storage> RangesScanner<T> {
is_key_only,
is_scanned_range_aware,
need_mvcc,
flashback_tss,
}: RangesScannerOptions<T>,
) -> RangesScanner<T> {
let ranges_len = ranges.len();
Expand All @@ -57,6 +60,7 @@ impl<T: Storage> RangesScanner<T> {
scan_backward_in_range,
is_key_only,
need_mvcc,
flashback_tss,
scanned_rows_per_range: Vec::with_capacity(ranges_len),
is_scanned_range_aware,
current_range: IntervalRange {
Expand Down Expand Up @@ -93,6 +97,7 @@ impl<T: Storage> RangesScanner<T> {
self.is_key_only,
self.need_mvcc,
r,
&self.flashback_tss,
)?;
self.storage.scan_next()?
}
Expand Down
1 change: 1 addition & 0 deletions components/tidb_query_common/src/storage/test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl super::Storage for FixtureStorage {
is_key_only: bool,
_: bool,
range: IntervalRange,
_: &[(u64, u64)],
) -> Result<()> {
let data_view = self
.data
Expand Down
2 changes: 2 additions & 0 deletions components/tidb_query_executors/src/index_scan_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ impl<S: Storage> BatchIndexScanExecutor<S> {
is_key_only: false,
accept_point_range: unique,
is_scanned_range_aware,
// TODO: support index scan for Flashback & MVCC Query
need_mvcc: false,
flashback_tss: vec![],
})?;
Ok(Self(wrapper))
}
Expand Down
7 changes: 7 additions & 0 deletions components/tidb_query_executors/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ pub fn build_executors<S: Storage + 'static>(
let primary_column_ids = descriptor.take_primary_column_ids();
let primary_prefix_column_ids = descriptor.take_primary_prefix_column_ids();

let flashback_tss: Vec<(u64, u64)> = descriptor
.get_flashback_timestamps()
.iter()
.map(|ts| (ts.get_start(), ts.get_end()))
.collect();

executor = Box::new(
BatchTableScanExecutor::new(
storage,
Expand All @@ -188,6 +194,7 @@ pub fn build_executors<S: Storage + 'static>(
is_scanned_range_aware,
primary_prefix_column_ids,
descriptor.get_need_mvcc(),
flashback_tss,
)?
.collect_summary(summary_slot_index),
);
Expand Down
2 changes: 2 additions & 0 deletions components/tidb_query_executors/src/table_scan_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl<S: Storage> BatchTableScanExecutor<S> {
is_scanned_range_aware: bool,
primary_prefix_column_ids: Vec<i64>,
need_mvcc: bool,
flashback_tss: Vec<(u64, u64)>,
) -> Result<Self> {
let is_column_filled = vec![false; columns_info.len()];
let mut is_key_only = true;
Expand Down Expand Up @@ -99,6 +100,7 @@ impl<S: Storage> BatchTableScanExecutor<S> {
accept_point_range: no_common_handle,
is_scanned_range_aware,
need_mvcc,
flashback_tss,
})?;
Ok(Self(wrapper))
}
Expand Down
3 changes: 3 additions & 0 deletions components/tidb_query_executors/src/util/scan_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct ScanExecutorOptions<S, I> {
pub accept_point_range: bool,
pub is_scanned_range_aware: bool,
pub need_mvcc: bool,
pub flashback_tss: Vec<(u64, u64)>,
}

impl<S: Storage, I: ScanExecutorImpl> ScanExecutor<S, I> {
Expand All @@ -70,6 +71,7 @@ impl<S: Storage, I: ScanExecutorImpl> ScanExecutor<S, I> {
accept_point_range,
is_scanned_range_aware,
need_mvcc,
flashback_tss,
}: ScanExecutorOptions<S, I>,
) -> Result<Self> {
tidb_query_datatype::codec::table::check_table_ranges(&key_ranges)?;
Expand All @@ -88,6 +90,7 @@ impl<S: Storage, I: ScanExecutorImpl> ScanExecutor<S, I> {
is_key_only,
is_scanned_range_aware,
need_mvcc,
flashback_tss,
}),
is_ended: false,
})
Expand Down
1 change: 1 addition & 0 deletions src/coprocessor/checksum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl<S: Snapshot> ChecksumContext<S> {
is_key_only: false,
is_scanned_range_aware: false,
need_mvcc: false,
flashback_tss: vec![],
});
Ok(Self { req, scanner })
}
Expand Down
2 changes: 2 additions & 0 deletions src/coprocessor/dag/storage_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl<S: Store> Storage for TiKVStorage<S> {
is_key_only: bool,
need_mvcc: bool,
range: IntervalRange,
flashback_tss: &[(u64, u64)],
) -> QEResult<()> {
if let Some(scanner) = &mut self.scanner {
self.cf_stats_backlog.add(&scanner.take_statistics());
Expand All @@ -61,6 +62,7 @@ impl<S: Store> Storage for TiKVStorage<S> {
self.met_newer_ts_data_backlog == NewerTsCheckState::NotMetYet,
lower,
upper,
flashback_tss,
)
.map_err(Error::from)?,
// There is no transform from storage error to QE's StorageError,
Expand Down
3 changes: 3 additions & 0 deletions src/coprocessor/statistics/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl<S: Snapshot> RequestHandler for AnalyzeContext<S> {
is_key_only: true,
is_scanned_range_aware: false,
need_mvcc: false,
flashback_tss: vec![],
});
let res = AnalyzeContext::handle_index(
req,
Expand Down Expand Up @@ -328,6 +329,7 @@ impl<S: Snapshot> RowSampleBuilder<S> {
false, // Streaming mode is not supported in Analyze request, always false here
req.take_primary_prefix_column_ids(),
false,
vec![],
)?;
Ok(Self {
data: table_scanner,
Expand Down Expand Up @@ -797,6 +799,7 @@ impl<S: Snapshot> SampleBuilder<S> {
false, // Streaming mode is not supported in Analyze request, always false here
req.take_primary_prefix_column_ids(),
false,
vec![],
)?;
Ok(Self {
data: table_scanner,
Expand Down
8 changes: 7 additions & 1 deletion src/server/gc_worker/gc_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,13 @@ pub fn sync_gc(
save_points: Vec<TimeStamp>,
) -> Result<()> {
wait_op!(|callback| schedule_gc(
scheduler, region_id, start_key, end_key, safe_point, save_points, callback
scheduler,
region_id,
start_key,
end_key,
safe_point,
save_points,
callback
))
.unwrap_or_else(|| {
error!("failed to receive result of gc");
Expand Down
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
false,
start_key,
end_key,
&[],
)?;
let res = scanner.scan(limit, sample_step);

Expand Down
41 changes: 38 additions & 3 deletions src/storage/mvcc/reader/scanner/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,12 @@ impl<S: Snapshot, P: ScanPolicy<S>> ForwardScanner<S, P> {
// Meet another key.
return Ok(false);
}
if Key::decode_ts_from(current_key)? <= self.cfg.ts {
let key_ts = Key::decode_ts_from(current_key)?;
if self.is_flashbacked(key_ts) {
// Skip the key that is in the flashback range.
continue;
}
if key_ts <= self.cfg.ts {
// Founded, don't need to seek again.
needs_seek = false;
break;
Expand All @@ -339,12 +344,13 @@ impl<S: Snapshot, P: ScanPolicy<S>> ForwardScanner<S, P> {
}
}
}
let mut cur_ts = self.cfg.ts;
// If we have not found `${user_key}_${ts}` in a few `next()`, directly `seek()`.
if needs_seek {
while needs_seek {
// `user_key` must have reserved space here, so its clone has reserved space too. So no
// reallocation happens in `append_ts`.
self.cursors.write.seek(
&user_key.clone().append_ts(self.cfg.ts),
&user_key.clone().append_ts(cur_ts),
&mut self.statistics.write,
)?;
if !self.cursors.write.valid()? {
Expand All @@ -356,9 +362,38 @@ impl<S: Snapshot, P: ScanPolicy<S>> ForwardScanner<S, P> {
// Meet another key.
return Ok(false);
}
let key_ts = Key::decode_ts_from(current_key)?;
if !self.is_flashbacked(key_ts) {
break;
}
cur_ts = self.find_smallest_flashback_ts(key_ts);
}
Ok(true)
}

fn is_flashbacked(&self, key_ts: TimeStamp) -> bool {
for &(flashback_ts_start, flashback_ts_end) in &self.cfg.flashback_tss {
let start_timestamp = TimeStamp::new(flashback_ts_start);
let end_timestamp = TimeStamp::new(flashback_ts_end);
if start_timestamp <= key_ts && key_ts <= end_timestamp {
return true;
}
}
false
}

fn find_smallest_flashback_ts(&self, key_ts: TimeStamp) -> TimeStamp {
let mut smallest_ts = key_ts;
for &(flashback_ts_start, flashback_ts_end) in &self.cfg.flashback_tss {
let start_timestamp = TimeStamp::new(flashback_ts_start);
let end_timestamp = TimeStamp::new(flashback_ts_end);
if start_timestamp <= key_ts && key_ts <= end_timestamp && start_timestamp < smallest_ts
{
smallest_ts = start_timestamp
}
}
smallest_ts
}
}

/// `ForwardScanner` with this policy outputs the latest key value pairs.
Expand Down
11 changes: 11 additions & 0 deletions src/storage/mvcc/reader/scanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ impl<S: Snapshot> ScannerBuilder<S> {
self
}

/// Set the flashback_tss
///
/// Default is 'false'.
#[inline]
pub fn flashback_tss(mut self, tss: &[(u64, u64)]) -> Self {
self.0.flashback_tss = tss.to_vec();
self
}

/// Limit the range to `[lower_bound, upper_bound)` in which the `ForwardKvScanner` should scan.
/// `None` means unbounded.
///
Expand Down Expand Up @@ -273,6 +282,7 @@ pub struct ScannerConfig<S: Snapshot> {

// if need all mvcc info
need_mvcc: bool,
flashback_tss: Vec<(u64, u64)>,
}

impl<S: Snapshot> ScannerConfig<S> {
Expand All @@ -292,6 +302,7 @@ impl<S: Snapshot> ScannerConfig<S> {
access_locks: Default::default(),
check_has_newer_ts_data: false,
need_mvcc: false,
flashback_tss: vec![],
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/storage/txn/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub trait Store: Send {
check_has_newer_ts_data: bool,
lower_bound: Option<Key>,
upper_bound: Option<Key>,
flashback_tss: &[(u64, u64)],
) -> Result<Self::Scanner>;
}

Expand Down Expand Up @@ -359,12 +360,14 @@ impl<S: Snapshot> Store for SnapshotStore<S> {
check_has_newer_ts_data: bool,
lower_bound: Option<Key>,
upper_bound: Option<Key>,
flashback_tss: &[(u64, u64)],
) -> Result<MvccScanner<S>> {
// Check request bounds with physical bound
self.verify_range(&lower_bound, &upper_bound)?;
let scanner = ScannerBuilder::new(self.snapshot.clone(), self.start_ts)
.desc(desc)
.need_mvcc(need_mvcc)
.flashback_tss(flashback_tss)
.range(lower_bound, upper_bound)
.omit_value(key_only)
.fill_cache(self.fill_cache)
Expand Down Expand Up @@ -556,6 +559,7 @@ impl Store for FixtureStore {
_: bool,
lower_bound: Option<Key>,
upper_bound: Option<Key>,
_: &[(u64, u64)],
) -> Result<FixtureStoreScanner> {
use std::ops::Bound;

Expand Down

0 comments on commit dafc675

Please sign in to comment.