diff --git a/src/cache/delta_reader.rs b/src/cache/delta_reader.rs index bbc68f9..6508bcb 100644 --- a/src/cache/delta_reader.rs +++ b/src/cache/delta_reader.rs @@ -80,18 +80,88 @@ impl DeltaReader { /// Get `n` keys >= `seek_key` pub async fn get_n_from_first_match( &self, - _seek_key: &impl SeekKeyEncoder, - _n: usize, + seek_key: &impl SeekKeyEncoder, + n: usize, ) -> anyhow::Result> { - todo!() + let seek_key = seek_key.encode_seek_key()?; + let range = seek_key..; + + let mut iterator = self.iter_range::(range)?; + + let results: Vec<(S::Key, S::Value)> = iterator + .by_ref() + .filter_map(|(key_bytes, value_bytes)| { + let key = S::Key::decode_key(&key_bytes).ok()?; + let value = S::Value::decode_value(&value_bytes).ok()?; + Some((key, value)) + }) + .take(n) + .collect(); + + let next_start_key = match iterator.next().map(|(key_bytes, _)| key_bytes) { + None => None, + Some(key_bytes) => Some(S::Key::decode_key(&key_bytes)?), + }; + + Ok(PaginatedResponse { + key_value: results, + next: next_start_key, + }) } /// Collects all key-value pairs in given range, from smallest to largest. pub async fn collect_in_range>( &self, - _range: std::ops::Range, + range: std::ops::Range, ) -> anyhow::Result> { - todo!() + let lower_bound = range.start.encode_seek_key()?; + let upper_bound = range.end.encode_seek_key()?; + let range = lower_bound..upper_bound; + + let result = self + .iter_range::(range)? + .map(|(key, value)| { + let key = S::Key::decode_key(&key).unwrap(); + let value = S::Value::decode_value(&value).unwrap(); + (key, value) + }) + .collect(); + Ok(result) + } + + fn iter(&self) -> anyhow::Result> { + let snapshot_iterators = self + .snapshots + .iter() + .map(|snapshot| snapshot.iter::()) + .collect::>(); + + let db_iter = self.db.raw_iter::(ScanDirection::Forward)?; + + Ok(DeltaReaderIter::new( + snapshot_iterators, + db_iter, + ScanDirection::Forward, + )) + } + + fn iter_range( + &self, + range: impl std::ops::RangeBounds + Clone, + ) -> anyhow::Result> { + let snapshot_iterators = self + .snapshots + .iter() + .map(|snapshot| snapshot.iter_range::(range.clone())) + .collect::>(); + + let db_iter = self.db.raw_iter_range::(range, ScanDirection::Forward)?; + + Ok(DeltaReaderIter::new( + snapshot_iterators, + db_iter, + ScanDirection::Backward, + )) } // @@ -105,7 +175,6 @@ impl DeltaReader { .map(|snapshot| snapshot.iter::().rev()) .collect::>(); - // Database iterator let db_iter = self.db.raw_iter::(ScanDirection::Backward)?; Ok(DeltaReaderIter::new( @@ -327,27 +396,6 @@ mod tests { assert_eq!(expected_value, actual_value); } - // End of test utils - - #[tokio::test] - async fn test_empty() { - let tmpdir = tempfile::tempdir().unwrap(); - let db = open_db(tmpdir.path()); - - let delta_db = DeltaReader::new(db, vec![]); - - let values: Vec<_> = delta_db.iter_rev::().unwrap().collect(); - assert!(values.is_empty()); - - let largest = delta_db.get_largest::().await.unwrap(); - - assert!(largest.is_none()); - - let key = TestCompositeField::MAX; - let prev = delta_db.get_prev::(&key).await.unwrap(); - assert!(prev.is_none()); - } - // DeltaReader with a simple set of known values. Useful for minimal checks. // Contains only writes. fn build_simple_delta_reader(path: impl AsRef) -> DeltaReader { @@ -402,6 +450,26 @@ mod tests { ], ) } + // End of test utils + + #[tokio::test] + async fn test_empty() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir.path()); + + let delta_db = DeltaReader::new(db, vec![]); + + let values: Vec<_> = delta_db.iter_rev::().unwrap().collect(); + assert!(values.is_empty()); + + let largest = delta_db.get_largest::().await.unwrap(); + + assert!(largest.is_none()); + + let key = TestCompositeField::MAX; + let prev = delta_db.get_prev::(&key).await.unwrap(); + assert!(prev.is_none()); + } #[test] fn iterator_simple() {