Skip to content

Commit

Permalink
Implement the rest without testing
Browse files Browse the repository at this point in the history
  • Loading branch information
citizen-stig committed May 30, 2024
1 parent 620d167 commit 094d95c
Showing 1 changed file with 95 additions and 27 deletions.
122 changes: 95 additions & 27 deletions src/cache/delta_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,88 @@ impl DeltaReader {
/// Get `n` keys >= `seek_key`
pub async fn get_n_from_first_match<S: Schema>(
&self,
_seek_key: &impl SeekKeyEncoder<S>,
_n: usize,
seek_key: &impl SeekKeyEncoder<S>,
n: usize,
) -> anyhow::Result<PaginatedResponse<S>> {
todo!()
let seek_key = seek_key.encode_seek_key()?;
let range = seek_key..;

let mut iterator = self.iter_range::<S>(range)?;

let results: Vec<(S::Key, S::Value)> = iterator
.by_ref()
.filter_map(|(key_bytes, value_bytes)| {
let key = S::Key::decode_key(&key_bytes).ok()?;
let value = S::Value::decode_value(&value_bytes).ok()?;
Some((key, value))
})
.take(n)
.collect();

let next_start_key = match iterator.next().map(|(key_bytes, _)| key_bytes) {
None => None,
Some(key_bytes) => Some(S::Key::decode_key(&key_bytes)?),
};

Ok(PaginatedResponse {
key_value: results,
next: next_start_key,
})
}

/// Collects all key-value pairs in given range, from smallest to largest.
pub async fn collect_in_range<S: Schema, Sk: SeekKeyEncoder<S>>(
&self,
_range: std::ops::Range<Sk>,
range: std::ops::Range<Sk>,
) -> anyhow::Result<Vec<(S::Key, S::Value)>> {
todo!()
let lower_bound = range.start.encode_seek_key()?;
let upper_bound = range.end.encode_seek_key()?;
let range = lower_bound..upper_bound;

let result = self
.iter_range::<S>(range)?
.map(|(key, value)| {
let key = S::Key::decode_key(&key).unwrap();
let value = S::Value::decode_value(&value).unwrap();
(key, value)
})
.collect();
Ok(result)
}

fn iter<S: Schema>(&self) -> anyhow::Result<DeltaReaderIter<ChangeSetIter>> {
let snapshot_iterators = self
.snapshots
.iter()
.map(|snapshot| snapshot.iter::<S>())
.collect::<Vec<_>>();

let db_iter = self.db.raw_iter::<S>(ScanDirection::Forward)?;

Ok(DeltaReaderIter::new(
snapshot_iterators,
db_iter,
ScanDirection::Forward,
))
}

fn iter_range<S: Schema>(
&self,
range: impl std::ops::RangeBounds<SchemaKey> + Clone,
) -> anyhow::Result<DeltaReaderIter<ChangeSetRange>> {
let snapshot_iterators = self
.snapshots
.iter()
.map(|snapshot| snapshot.iter_range::<S>(range.clone()))
.collect::<Vec<_>>();

let db_iter = self.db.raw_iter_range::<S>(range, ScanDirection::Forward)?;

Ok(DeltaReaderIter::new(
snapshot_iterators,
db_iter,
ScanDirection::Backward,
))
}

//
Expand All @@ -105,7 +175,6 @@ impl DeltaReader {
.map(|snapshot| snapshot.iter::<S>().rev())
.collect::<Vec<_>>();

// Database iterator
let db_iter = self.db.raw_iter::<S>(ScanDirection::Backward)?;

Ok(DeltaReaderIter::new(
Expand Down Expand Up @@ -327,27 +396,6 @@ mod tests {
assert_eq!(expected_value, actual_value);
}

// End of test utils

#[tokio::test]
async fn test_empty() {
let tmpdir = tempfile::tempdir().unwrap();
let db = open_db(tmpdir.path());

let delta_db = DeltaReader::new(db, vec![]);

let values: Vec<_> = delta_db.iter_rev::<S>().unwrap().collect();
assert!(values.is_empty());

let largest = delta_db.get_largest::<S>().await.unwrap();

assert!(largest.is_none());

let key = TestCompositeField::MAX;
let prev = delta_db.get_prev::<S>(&key).await.unwrap();
assert!(prev.is_none());
}

// DeltaReader with a simple set of known values. Useful for minimal checks.
// Contains only writes.
fn build_simple_delta_reader(path: impl AsRef<Path>) -> DeltaReader {
Expand Down Expand Up @@ -402,6 +450,26 @@ mod tests {
],
)
}
// End of test utils

#[tokio::test]
async fn test_empty() {
let tmpdir = tempfile::tempdir().unwrap();
let db = open_db(tmpdir.path());

let delta_db = DeltaReader::new(db, vec![]);

let values: Vec<_> = delta_db.iter_rev::<S>().unwrap().collect();
assert!(values.is_empty());

let largest = delta_db.get_largest::<S>().await.unwrap();

assert!(largest.is_none());

let key = TestCompositeField::MAX;
let prev = delta_db.get_prev::<S>(&key).await.unwrap();
assert!(prev.is_none());
}

#[test]
fn iterator_simple() {
Expand Down

0 comments on commit 094d95c

Please sign in to comment.