diff --git a/src/cache/delta_reader.rs b/src/cache/delta_reader.rs index 6eddacb..20ba0ed 100644 --- a/src/cache/delta_reader.rs +++ b/src/cache/delta_reader.rs @@ -23,9 +23,8 @@ pub struct DeltaReader { } impl DeltaReader { - /// Creates new [`DeltaReader`] with given [`DB`] and - /// vector of uncommited snapshots of [`SchemaBatch`]. - /// `snapshots` should be in chronological order. + /// Creates new [`DeltaReader`] with given [`DB`] and vector with uncommited snapshots of [`SchemaBatch`]. + /// Snapshots should be in chronological order. pub fn new(db: DB, snapshots: Vec>) -> Self { Self { snapshots, db } } @@ -44,20 +43,21 @@ impl DeltaReader { } } - // 2. Check in DB self.db.get(key) }) } /// Get a value of the largest key written value for given [`Schema`]. pub async fn get_largest(&self) -> anyhow::Result> { - let mut iterator = self.iter_rev::()?; - if let Some((key, value)) = iterator.next() { - let key = S::Key::decode_key(&key)?; - let value = S::Value::decode_value(&value)?; - return Ok(Some((key, value))); - } - Ok(None) + tokio::task::block_in_place(|| { + let mut iterator = self.iter_rev::()?; + if let Some((key, value)) = iterator.next() { + let key = S::Key::decode_key(&key)?; + let value = S::Value::decode_value(&value)?; + return Ok(Some((key, value))); + } + Ok(None) + }) } /// Get the largest value in [`Schema`] that is smaller than give `seek_key`. @@ -68,13 +68,15 @@ impl DeltaReader { let seek_key = seek_key.encode_seek_key()?; let range = ..=seek_key; - let mut iterator = self.iter_rev_range::(range)?; - if let Some((key, value)) = iterator.next() { - let key = S::Key::decode_key(&key)?; - let value = S::Value::decode_value(&value)?; - return Ok(Some((key, value))); - } - Ok(None) + tokio::task::block_in_place(|| { + let mut iterator = self.iter_rev_range::(range)?; + if let Some((key, value)) = iterator.next() { + let key = S::Key::decode_key(&key)?; + let value = S::Value::decode_value(&value)?; + return Ok(Some((key, value))); + } + Ok(None) + }) } /// Get `n` keys >= `seek_key` @@ -86,26 +88,27 @@ impl DeltaReader { let seek_key = seek_key.encode_seek_key()?; let range = seek_key..; - let mut iterator = self.iter_range::(range)?; + tokio::task::block_in_place(|| { + let mut iterator = self.iter_range::(range)?; + let results: Vec<(S::Key, S::Value)> = iterator + .by_ref() + .filter_map(|(key_bytes, value_bytes)| { + let key = S::Key::decode_key(&key_bytes).ok()?; + let value = S::Value::decode_value(&value_bytes).ok()?; + Some((key, value)) + }) + .take(n) + .collect(); + + let next_start_key = match iterator.next().map(|(key_bytes, _)| key_bytes) { + None => None, + Some(key_bytes) => Some(S::Key::decode_key(&key_bytes)?), + }; - let results: Vec<(S::Key, S::Value)> = iterator - .by_ref() - .filter_map(|(key_bytes, value_bytes)| { - let key = S::Key::decode_key(&key_bytes).ok()?; - let value = S::Value::decode_value(&value_bytes).ok()?; - Some((key, value)) + Ok(PaginatedResponse { + key_value: results, + next: next_start_key, }) - .take(n) - .collect(); - - let next_start_key = match iterator.next().map(|(key_bytes, _)| key_bytes) { - None => None, - Some(key_bytes) => Some(S::Key::decode_key(&key_bytes)?), - }; - - Ok(PaginatedResponse { - key_value: results, - next: next_start_key, }) } @@ -118,15 +121,17 @@ impl DeltaReader { let upper_bound = range.end.encode_seek_key()?; let range = lower_bound..upper_bound; - let result = self - .iter_range::(range)? - .map(|(key, value)| { - let key = S::Key::decode_key(&key).unwrap(); - let value = S::Value::decode_value(&value).unwrap(); - (key, value) - }) - .collect(); - Ok(result) + tokio::task::block_in_place(|| { + let result = self + .iter_range::(range)? + .map(|(key, value)| { + let key = S::Key::decode_key(&key).unwrap(); + let value = S::Value::decode_value(&value).unwrap(); + (key, value) + }) + .collect(); + Ok(result) + }) } fn iter(&self) -> anyhow::Result> { @@ -303,9 +308,9 @@ where } } - // All next values are observed at this point + // All next values are observed at this point. // Handling actual change of the iterator state. - // the rightmost value in the next locations is the most recent, so it is taken. + // The rightmost value in the next locations is the most recent, so it is taken. if let Some(latest_next_location) = next_value_locations.pop() { // First, move all iterators to the next position for location in &next_value_locations { @@ -452,7 +457,7 @@ mod tests { } // End of test utils - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_empty() { let tmpdir = tempfile::tempdir().unwrap(); let db = open_db(tmpdir.path()); @@ -469,6 +474,9 @@ mod tests { let key = TestCompositeField::MAX; let prev = delta_reader.get_prev::(&key).await.unwrap(); assert!(prev.is_none()); + + let value_1 = delta_reader.get::(&FIELD_1).await.unwrap(); + assert!(value_1.is_none()); } #[test] @@ -499,7 +507,7 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn get_largest_simple() { let tmpdir = tempfile::tempdir().unwrap(); let delta_reader = build_simple_delta_reader(tmpdir.path()); @@ -512,7 +520,7 @@ mod tests { assert_eq!(TestField(4), largest_value); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn get_largest_elaborate() { let tmpdir = tempfile::tempdir().unwrap(); let delta_reader = build_elaborate_delta_reader(tmpdir.path()); @@ -525,7 +533,7 @@ mod tests { assert_eq!(TestField(6000), largest_value); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn get_prev_simple() { let tmpdir = tempfile::tempdir().unwrap(); let delta_reader = build_simple_delta_reader(tmpdir.path()); @@ -558,7 +566,7 @@ mod tests { assert_eq!(FIELD_3, prev_key); assert_eq!(TestField(3), prev_value); - // MAX Should return largest value + // MAX Should return the largest value let (prev_key, prev_value) = delta_reader .get_prev::(&TestCompositeField::MAX) .await