Skip to content

Commit

Permalink
Minor comments clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
citizen-stig committed May 31, 2024
1 parent 1c3553f commit 93daa26
Showing 1 changed file with 60 additions and 52 deletions.
112 changes: 60 additions & 52 deletions src/cache/delta_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ pub struct DeltaReader {
}

impl DeltaReader {
/// Creates new [`DeltaReader`] with given [`DB`] and
/// vector of uncommited snapshots of [`SchemaBatch`].
/// `snapshots` should be in chronological order.
/// Creates new [`DeltaReader`] with given [`DB`] and vector with uncommited snapshots of [`SchemaBatch`].
/// Snapshots should be in chronological order.
pub fn new(db: DB, snapshots: Vec<Arc<SchemaBatch>>) -> Self {
Self { snapshots, db }
}
Expand All @@ -44,20 +43,21 @@ impl DeltaReader {
}
}

// 2. Check in DB
self.db.get(key)
})
}

/// Get a value of the largest key written value for given [`Schema`].
pub async fn get_largest<S: Schema>(&self) -> anyhow::Result<Option<(S::Key, S::Value)>> {
let mut iterator = self.iter_rev::<S>()?;
if let Some((key, value)) = iterator.next() {
let key = S::Key::decode_key(&key)?;
let value = S::Value::decode_value(&value)?;
return Ok(Some((key, value)));
}
Ok(None)
tokio::task::block_in_place(|| {
let mut iterator = self.iter_rev::<S>()?;
if let Some((key, value)) = iterator.next() {
let key = S::Key::decode_key(&key)?;
let value = S::Value::decode_value(&value)?;
return Ok(Some((key, value)));
}
Ok(None)
})
}

/// Get the largest value in [`Schema`] that is smaller than give `seek_key`.
Expand All @@ -68,13 +68,15 @@ impl DeltaReader {
let seek_key = seek_key.encode_seek_key()?;
let range = ..=seek_key;

let mut iterator = self.iter_rev_range::<S>(range)?;
if let Some((key, value)) = iterator.next() {
let key = S::Key::decode_key(&key)?;
let value = S::Value::decode_value(&value)?;
return Ok(Some((key, value)));
}
Ok(None)
tokio::task::block_in_place(|| {
let mut iterator = self.iter_rev_range::<S>(range)?;
if let Some((key, value)) = iterator.next() {
let key = S::Key::decode_key(&key)?;
let value = S::Value::decode_value(&value)?;
return Ok(Some((key, value)));
}
Ok(None)
})
}

/// Get `n` keys >= `seek_key`
Expand All @@ -86,26 +88,27 @@ impl DeltaReader {
let seek_key = seek_key.encode_seek_key()?;
let range = seek_key..;

let mut iterator = self.iter_range::<S>(range)?;
tokio::task::block_in_place(|| {
let mut iterator = self.iter_range::<S>(range)?;
let results: Vec<(S::Key, S::Value)> = iterator
.by_ref()
.filter_map(|(key_bytes, value_bytes)| {
let key = S::Key::decode_key(&key_bytes).ok()?;
let value = S::Value::decode_value(&value_bytes).ok()?;
Some((key, value))
})
.take(n)
.collect();

let next_start_key = match iterator.next().map(|(key_bytes, _)| key_bytes) {
None => None,
Some(key_bytes) => Some(S::Key::decode_key(&key_bytes)?),
};

let results: Vec<(S::Key, S::Value)> = iterator
.by_ref()
.filter_map(|(key_bytes, value_bytes)| {
let key = S::Key::decode_key(&key_bytes).ok()?;
let value = S::Value::decode_value(&value_bytes).ok()?;
Some((key, value))
Ok(PaginatedResponse {
key_value: results,
next: next_start_key,
})
.take(n)
.collect();

let next_start_key = match iterator.next().map(|(key_bytes, _)| key_bytes) {
None => None,
Some(key_bytes) => Some(S::Key::decode_key(&key_bytes)?),
};

Ok(PaginatedResponse {
key_value: results,
next: next_start_key,
})
}

Expand All @@ -118,15 +121,17 @@ impl DeltaReader {
let upper_bound = range.end.encode_seek_key()?;
let range = lower_bound..upper_bound;

let result = self
.iter_range::<S>(range)?
.map(|(key, value)| {
let key = S::Key::decode_key(&key).unwrap();
let value = S::Value::decode_value(&value).unwrap();
(key, value)
})
.collect();
Ok(result)
tokio::task::block_in_place(|| {
let result = self
.iter_range::<S>(range)?
.map(|(key, value)| {
let key = S::Key::decode_key(&key).unwrap();
let value = S::Value::decode_value(&value).unwrap();
(key, value)
})
.collect();
Ok(result)
})
}

fn iter<S: Schema>(&self) -> anyhow::Result<DeltaReaderIter<ChangeSetIter>> {
Expand Down Expand Up @@ -303,9 +308,9 @@ where
}
}

// All next values are observed at this point
// All next values are observed at this point.
// Handling actual change of the iterator state.
// the rightmost value in the next locations is the most recent, so it is taken.
// The rightmost value in the next locations is the most recent, so it is taken.
if let Some(latest_next_location) = next_value_locations.pop() {
// First, move all iterators to the next position
for location in &next_value_locations {
Expand Down Expand Up @@ -452,7 +457,7 @@ mod tests {
}
// End of test utils

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_empty() {
let tmpdir = tempfile::tempdir().unwrap();
let db = open_db(tmpdir.path());
Expand All @@ -469,6 +474,9 @@ mod tests {
let key = TestCompositeField::MAX;
let prev = delta_reader.get_prev::<S>(&key).await.unwrap();
assert!(prev.is_none());

let value_1 = delta_reader.get::<S>(&FIELD_1).await.unwrap();
assert!(value_1.is_none());
}

#[test]
Expand Down Expand Up @@ -499,7 +507,7 @@ mod tests {
);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn get_largest_simple() {
let tmpdir = tempfile::tempdir().unwrap();
let delta_reader = build_simple_delta_reader(tmpdir.path());
Expand All @@ -512,7 +520,7 @@ mod tests {
assert_eq!(TestField(4), largest_value);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn get_largest_elaborate() {
let tmpdir = tempfile::tempdir().unwrap();
let delta_reader = build_elaborate_delta_reader(tmpdir.path());
Expand All @@ -525,7 +533,7 @@ mod tests {
assert_eq!(TestField(6000), largest_value);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn get_prev_simple() {
let tmpdir = tempfile::tempdir().unwrap();
let delta_reader = build_simple_delta_reader(tmpdir.path());
Expand Down Expand Up @@ -558,7 +566,7 @@ mod tests {
assert_eq!(FIELD_3, prev_key);
assert_eq!(TestField(3), prev_value);

// MAX Should return largest value
// MAX Should return the largest value
let (prev_key, prev_value) = delta_reader
.get_prev::<S>(&TestCompositeField::MAX)
.await
Expand Down

0 comments on commit 93daa26

Please sign in to comment.