From 1a8b0f98c6bd5a6fc3a0104722719244245fd524 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 3 Feb 2025 22:29:30 +0000 Subject: [PATCH] added fallback tests --- .../src/main/python/delta_lake_merge_test.py | 18 ++++++++++++++++++ .../src/main/python/delta_lake_update_test.py | 17 +++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/integration_tests/src/main/python/delta_lake_merge_test.py b/integration_tests/src/main/python/delta_lake_merge_test.py index 0cac4957614..f5881f57e05 100644 --- a/integration_tests/src/main/python/delta_lake_merge_test.py +++ b/integration_tests/src/main/python/delta_lake_merge_test.py @@ -52,6 +52,24 @@ def checker(data_path, do_merge): merge_sql=merge_sql, check_func=checker) +@allow_non_gpu('ColumnarToRowExec', 'BroadcastExchangeExec','BroadcastHashJoinExec', delta_write_fallback_allow, *delta_meta_allow) +@delta_lake +@ignore_order +@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") +def test_delta_merge_fallback_with_deletion_vectors(spark_tmp_path, spark_tmp_table_factory): + def checker(data_path, do_merge): + assert_gpu_fallback_write(do_merge, read_delta_path, data_path, + delta_write_fallback_check, + conf=copy_and_update(delta_merge_enabled_conf, {"spark.rapids.sql.format.delta.write.enabled": "false"}) + merge_sql = "MERGE INTO {dest_table} USING {src_table} ON {dest_table}.a == {src_table}.a" \ + " WHEN NOT MATCHED THEN INSERT *" + delta_sql_merge_test(spark_tmp_path, spark_tmp_table_factory, + use_cdf=False, enable_deletion_vectors=True, + src_table_func=lambda spark: unary_op_df(spark, SetValuesGen(IntegerType(), range(100))), + dest_table_func=lambda spark: unary_op_df(spark, int_gen), + merge_sql=merge_sql, + check_func=checker) + @allow_non_gpu("ExecutedCommandExec,BroadcastHashJoinExec,ColumnarToRowExec,BroadcastExchangeExec,DataWritingCommandExec", delta_write_fallback_allow, *delta_meta_allow) @delta_lake @ignore_order diff --git a/integration_tests/src/main/python/delta_lake_update_test.py b/integration_tests/src/main/python/delta_lake_update_test.py index 3012a156548..0a75f8a11ea 100644 --- a/integration_tests/src/main/python/delta_lake_update_test.py +++ b/integration_tests/src/main/python/delta_lake_update_test.py @@ -62,6 +62,23 @@ def checker(data_path, do_update): delta_sql_update_test(spark_tmp_path, use_cdf, dest_table_func, update_sql, checker, partition_columns, enable_deletion_vectors) +@allow_non_gpu('ColumnarToRowExec', 'RapidsDeltaWriteExec', delta_write_fallback_allow, *delta_meta_allow) +@delta_lake +@ignore_order +@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") +def test_delta_update_fallback_with_deletion_vectors(spark_tmp_path): + data_path = spark_tmp_path + "/DELTA_DATA" + def setup_tables(spark): + setup_delta_dest_tables(spark, data_path, + dest_table_func=lambda spark: unary_op_df(spark, int_gen), + use_cdf=False, enable_deletion_vectors=True) + def write_func(spark, path): + update_sql="UPDATE delta.`{}` SET a = 0".format(path) + spark.sql(update_sql) + with_cpu_session(setup_tables) + assert_gpu_fallback_write(write_func, read_delta_path, data_path, + "RapidsDeltaWriteExec", delta_update_enabled_conf) + @allow_non_gpu(delta_write_fallback_allow, *delta_meta_allow) @delta_lake @ignore_order