Skip to content

Commit

Permalink
Adding proptest for more iterator related things
Browse files Browse the repository at this point in the history
  • Loading branch information
citizen-stig committed May 31, 2024
1 parent f16c414 commit 43265c7
Showing 1 changed file with 92 additions and 57 deletions.
149 changes: 92 additions & 57 deletions src/cache/delta_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl DeltaReader {
}
}

/// Core logic of [`DeltaReader`]. Handles matching key and progresses underlying iterators.
/// Core logic of [`DeltaReader`]. Handles matching keys and progresses underlying iterators.
struct DeltaReaderIter<'a, SnapshotIter>
where
SnapshotIter: Iterator<Item = (&'a SchemaKey, &'a Operation)>,
Expand Down Expand Up @@ -403,32 +403,7 @@ mod tests {
<<S as Schema>::Value as ValueCodec<S>>::decode_value(value).unwrap()
}

async fn check_value(delta_reader: &DeltaReader, key: u32, expected_value: Option<u32>) {
let actual_value = delta_reader
.get::<S>(&TestCompositeField(key, 0, 0))
.await
.unwrap()
.map(|v| v.0);
assert_eq!(expected_value, actual_value);
}

// DeltaReader with a simple set of known values. Useful for minimal checks.
// Contains only writes.
fn build_simple_delta_reader(path: impl AsRef<Path>) -> DeltaReader {
let db = open_db(path.as_ref());
let mut db_data = SchemaBatch::new();
db_data.put::<S>(&FIELD_1, &TestField(1)).unwrap();
db_data.put::<S>(&FIELD_4, &TestField(4)).unwrap();
db.write_schemas(&db_data).unwrap();

let mut snapshot_1 = SchemaBatch::new();
snapshot_1.put::<S>(&FIELD_2, &TestField(2)).unwrap();
snapshot_1.put::<S>(&FIELD_3, &TestField(3)).unwrap();

DeltaReader::new(db, vec![Arc::new(snapshot_1)])
}

// DeltaReader with a known set of sample values, includes deletion.
// A DeltaReader with a known set of sample values, includes deletion.
// DB contains fields 1, 4, 5, and 7.
// Have 3 snapshots, value is equal to field_id * 10^snapshot_level:
// 1. Written: field 6.
Expand Down Expand Up @@ -709,31 +684,7 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread")]
async fn get_n_simple() {
let tmpdir = tempfile::tempdir().unwrap();
let delta_reader = build_simple_delta_reader(tmpdir.path());

let paginated_response = delta_reader
.get_n_from_first_match::<S>(&TestCompositeField::MIN, 2)
.await
.unwrap();
assert_eq!(2, paginated_response.key_value.len());
let expected_first_page = vec![(FIELD_1, TestField(1)), (FIELD_2, TestField(2))];
assert_eq!(expected_first_page, paginated_response.key_value);
assert_eq!(Some(FIELD_3), paginated_response.next);

let paginated_response = delta_reader
.get_n_from_first_match::<S>(&FIELD_3, 2)
.await
.unwrap();
assert_eq!(2, paginated_response.key_value.len());
let expected_first_page = vec![(FIELD_3, TestField(3)), (FIELD_4, TestField(4))];
assert_eq!(expected_first_page, paginated_response.key_value);
assert!(paginated_response.next.is_none());
}

#[tokio::test(flavor = "multi_thread")]
async fn get_n_elaborate() {
async fn get_n_sample() {
let tmpdir = tempfile::tempdir().unwrap();
let delta_reader = build_sample_delta_reader(tmpdir.path());

Expand Down Expand Up @@ -887,7 +838,6 @@ mod tests {
}
}


let rt = Runtime::new().unwrap();
let _ = rt.block_on(async {

Expand Down Expand Up @@ -943,12 +893,97 @@ mod tests {
values_rev.windows(2).all(|w| w[0].0 >= w[1].0),
"iter_rev should be sorted in reversed order"
);

// Building a reference for all K/V for validation if basic check is passed.
let mut all_kv: BTreeMap<TestCompositeField, TestField> = BTreeMap::new();
for (key, value) in db_entries {
all_kv.insert(key, value);
}
for snapshot in snapshots {
for (key, value) in snapshot {
all_kv.insert(key.clone(), value);
}
}

let rt = Runtime::new().unwrap();
let _ = rt.block_on(async {
// get_n_from
for n in 1..4 {
let mut next_key = Some(TestCompositeField::MIN);
while let Some(actual_next_key) = next_key {
let paginated_response = delta_reader
.get_n_from_first_match::<S>(&actual_next_key, n)
.await;
prop_assert!(paginated_response.is_ok());
let paginated_response = paginated_response.unwrap();
if paginated_response.key_value.is_empty() {
break;
}
let first_on_page = paginated_response.key_value[0].clone();
prop_assert!(first_on_page.0 >= actual_next_key);
next_key = paginated_response.next;
}
}
// Early return on empty values.
if values.is_empty() {
return Ok(())
}

// collect_range, checking full ranges
let all_values: Vec<_> = values
.into_iter()
.map(|(k, v)| (decode_key(&k), decode_value(&v)))
.collect();

let (min_key, _) = all_kv.iter().min().unwrap();
let (max_key, _) = all_kv.iter().max().unwrap();

let full_ranges = vec![
TestCompositeField::MIN..TestCompositeField::MAX,
min_key.clone()..TestCompositeField::MAX,
];
// last key is definitely excluded
let def_chopped_ranges = vec![
TestCompositeField::MIN..max_key.clone(),
min_key.clone()..max_key.clone(),
];

for range in def_chopped_ranges {
let range_values = delta_reader
.collect_in_range::<S, TestCompositeField>(range)
.await;
prop_assert!(range_values.is_ok());
let range_values = range_values.unwrap();
prop_assert_eq!(
&all_values[..all_values.len()],
&range_values[..]
);
}

for range in full_ranges {
let range_values = delta_reader
.collect_in_range::<S, TestCompositeField>(range)
.await;
prop_assert!(range_values.is_ok());
let range_values = range_values.unwrap();

let last_key = all_values.iter().last().map(|(k, _v)| k);
let expected_values = if last_key == Some(&TestCompositeField::MAX) {
&all_values[..all_values.len()]
} else {
&all_values[..]
};
prop_assert_eq!(expected_values, &range_values[..]);
}

Ok(())
});
}

#[test]
fn proptest_rev_iterator_everything_was_deleted((db_entries, snapshots) in generate_db_entries_and_snapshots()) {
let values = execute_with_all_deleted_in_last_snapshot(db_entries, snapshots);
prop_assert!(values.is_empty());
#[test]
fn proptest_rev_iterator_everything_was_deleted((db_entries, snapshots) in generate_db_entries_and_snapshots()) {
let values = execute_with_all_deleted_in_last_snapshot(db_entries, snapshots);
prop_assert!(values.is_empty());
}
}
}

0 comments on commit 43265c7

Please sign in to comment.