Skip to content

Commit

Permalink
evaluation test
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime committed Feb 21, 2025
1 parent b45696f commit f503417
Showing 1 changed file with 255 additions and 0 deletions.
255 changes: 255 additions & 0 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,261 @@ mod tests {
TestDB::durable().map(|db| test_fn(&db.with_row_count(Arc::new(|_, _| 5))))?
}

#[test]
/// TODO: This test is a slight modifaction of [test_eval_incr_for_index_join].
/// Essentially the WHERE condition is on different tables.
/// Should refactor to reduce duplicate logic between the two tests.
fn test_eval_incr_for_left_semijoin() -> ResultTest<()> {
fn compile_query(db: &RelationalDB) -> ResultTest<SubscriptionPlan> {
db.with_read_only(Workload::ForTests, |tx| {
let auth = AuthCtx::for_testing();
let tx = SchemaViewer::new(tx, &auth);
// Should be answered using an index semijion
let sql = "select lhs.* from lhs join rhs on lhs.id = rhs.id where lhs.x >= 5 and lhs.x <= 7";
Ok(SubscriptionPlan::compile(sql, &tx).unwrap())
})
}

// Case 1:
// Delete a row inside the region of lhs,
// Insert a row inside the region of lhs.
fn index_join_case_1(db: &RelationalDB) -> ResultTest<()> {
let _ = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let r1 = product!(10, 0, 2);
let r2 = product!(10, 0, 3);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;

// No updates to report
assert!(result.is_empty());
Ok(())
}

// Case 2:
// Delete a row outside the region of lhs,
// Insert a row outside the region of lhs.
fn index_join_case_2(db: &RelationalDB) -> ResultTest<()> {
let _ = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let r1 = product!(13, 3, 5);
let r2 = product!(13, 4, 6);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;

// No updates to report
assert!(result.is_empty());
Ok(())
}

// Case 3:
// Delete a row inside the region of lhs,
// Insert a row outside the region of lhs.
fn index_join_case_3(db: &RelationalDB) -> ResultTest<()> {
let lhs_id = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let r1 = product!(10, 0, 2);
let r2 = product!(10, 3, 5);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;

// A single delete from lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
Ok(())
}

// Case 4:
// Delete a row outside the region of lhs,
// Insert a row inside the region of lhs.
fn index_join_case_4(db: &RelationalDB) -> ResultTest<()> {
let lhs_id = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let r1 = product!(13, 3, 5);
let r2 = product!(13, 2, 4);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;

// A single insert into lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(2, 7)));
Ok(())
}

// Case 5:
// Insert row into rhs,
// Insert matching row inside the region of lhs.
fn index_join_case_5(db: &RelationalDB) -> ResultTest<()> {
let lhs_id = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let lhs_row = product!(5, 6);
let rhs_row = product!(20, 5, 3);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(
db,
&mut metrics,
&query,
vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
)?;

// A single insert into lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(5, 6)));
Ok(())
}

// Case 6:
// Insert row into rhs,
// Insert matching row outside the region of lhs.
fn index_join_case_6(db: &RelationalDB) -> ResultTest<()> {
let lhs_id = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let lhs_row = product!(5, 10);
let rhs_row = product!(20, 5, 5);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(
db,
&mut metrics,
&query,
vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
)?;

// No updates to report
assert_eq!(result.tables.len(), 0);
Ok(())
}

// Case 7:
// Delete row from rhs,
// Delete matching row inside the region of lhs.
fn index_join_case_7(db: &RelationalDB) -> ResultTest<()> {
let lhs_id = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let lhs_row = product!(0, 5);
let rhs_row = product!(10, 0, 2);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(
db,
&mut metrics,
&query,
vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
)?;

// A single delete from lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
Ok(())
}

// Case 8:
// Delete row from rhs,
// Delete matching row outside the region of lhs.
fn index_join_case_8(db: &RelationalDB) -> ResultTest<()> {
let lhs_id = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let lhs_row = product!(3, 8);
let rhs_row = product!(13, 3, 5);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(
db,
&mut metrics,
&query,
vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
)?;

// No updates to report
assert_eq!(result.tables.len(), 0);
Ok(())
}

// Case 9:
// Update row from rhs,
// Update matching row inside the region of lhs.
fn index_join_case_9(db: &RelationalDB) -> ResultTest<()> {
let lhs_id = create_lhs_table_for_eval_incr(db)?;
let rhs_id = create_rhs_table_for_eval_incr(db)?;
let query = compile_query(db)?;

let lhs_old = product!(1, 6);
let lhs_new = product!(1, 7);
let rhs_old = product!(11, 1, 3);
let rhs_new = product!(11, 1, 4);

let mut metrics = ExecutionMetrics::default();

let result = eval_incr(
db,
&mut metrics,
&query,
vec![
(lhs_id, lhs_old, false),
(rhs_id, rhs_old, false),
(lhs_id, lhs_new, true),
(rhs_id, rhs_new, true),
],
)?;

let lhs_old = product!(1, 6);
let lhs_new = product!(1, 7);

// A delete and an insert into lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(
result.tables[0],
DatabaseTableUpdate {
table_id: lhs_id,
table_name: "lhs".into(),
deletes: [lhs_old].into(),
inserts: [lhs_new].into(),
},
);
Ok(())
}

run_eval_incr_test(index_join_case_1)?;
run_eval_incr_test(index_join_case_2)?;
run_eval_incr_test(index_join_case_3)?;
run_eval_incr_test(index_join_case_4)?;
run_eval_incr_test(index_join_case_5)?;
run_eval_incr_test(index_join_case_6)?;
run_eval_incr_test(index_join_case_7)?;
run_eval_incr_test(index_join_case_8)?;
run_eval_incr_test(index_join_case_9)?;
Ok(())
}

#[test]
fn test_eval_incr_for_index_join() -> ResultTest<()> {
// Case 1:
Expand Down

0 comments on commit f503417

Please sign in to comment.