Skip to content

Commit

Permalink
added fallback tests
Browse files Browse the repository at this point in the history
  • Loading branch information
razajafri committed Feb 4, 2025
1 parent 6f47d0e commit 1a8b0f9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
18 changes: 18 additions & 0 deletions integration_tests/src/main/python/delta_lake_merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ def checker(data_path, do_merge):
merge_sql=merge_sql,
check_func=checker)

@allow_non_gpu('ColumnarToRowExec', 'BroadcastExchangeExec','BroadcastHashJoinExec', delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
def test_delta_merge_fallback_with_deletion_vectors(spark_tmp_path, spark_tmp_table_factory):
def checker(data_path, do_merge):
assert_gpu_fallback_write(do_merge, read_delta_path, data_path,
delta_write_fallback_check,
conf=copy_and_update(delta_merge_enabled_conf, {"spark.rapids.sql.format.delta.write.enabled": "false"})
merge_sql = "MERGE INTO {dest_table} USING {src_table} ON {dest_table}.a == {src_table}.a" \
" WHEN NOT MATCHED THEN INSERT *"
delta_sql_merge_test(spark_tmp_path, spark_tmp_table_factory,
use_cdf=False, enable_deletion_vectors=True,
src_table_func=lambda spark: unary_op_df(spark, SetValuesGen(IntegerType(), range(100))),
dest_table_func=lambda spark: unary_op_df(spark, int_gen),
merge_sql=merge_sql,
check_func=checker)

@allow_non_gpu("ExecutedCommandExec,BroadcastHashJoinExec,ColumnarToRowExec,BroadcastExchangeExec,DataWritingCommandExec", delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
Expand Down
17 changes: 17 additions & 0 deletions integration_tests/src/main/python/delta_lake_update_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ def checker(data_path, do_update):
delta_sql_update_test(spark_tmp_path, use_cdf, dest_table_func, update_sql, checker,
partition_columns, enable_deletion_vectors)

@allow_non_gpu('ColumnarToRowExec', 'RapidsDeltaWriteExec', delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
def test_delta_update_fallback_with_deletion_vectors(spark_tmp_path):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
setup_delta_dest_tables(spark, data_path,
dest_table_func=lambda spark: unary_op_df(spark, int_gen),
use_cdf=False, enable_deletion_vectors=True)
def write_func(spark, path):
update_sql="UPDATE delta.`{}` SET a = 0".format(path)
spark.sql(update_sql)
with_cpu_session(setup_tables)
assert_gpu_fallback_write(write_func, read_delta_path, data_path,
"RapidsDeltaWriteExec", delta_update_enabled_conf)

@allow_non_gpu(delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
Expand Down

0 comments on commit 1a8b0f9

Please sign in to comment.