Skip to content

Commit

Permalink
[auto-merge] branch-25.02 to branch-25.04 [skip ci] [bot] (#12134)
Browse files Browse the repository at this point in the history
auto-merge triggered by github actions on `branch-25.02` to create a PR
keeping `branch-25.04` up-to-date. If this PR is unable to be merged due
to conflicts, it will remain open until manually fix.
  • Loading branch information
nvauto authored Feb 13, 2025
2 parents af3c69a + c696b7e commit 234c9b9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
19 changes: 19 additions & 0 deletions integration_tests/src/main/python/hybrid_parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,22 @@ def udf_fallback(s):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path).filter("ascii(a) >= 50 and udf_fallback(a) = 'udf_100'"),
conf=filter_split_conf)

@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently")
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests")
@hybrid_test
@allow_non_gpu(*non_utc_allow)
def test_hybrid_parquet_filter_pushdown_timestamp(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: gen_df(spark, [('a', TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)))]).write.parquet(data_path),
conf=rebase_write_corrected_conf)

# Timestamp is not fully supported in Hybrid Filter, so it should remain on the GPU
plan = with_gpu_session(
lambda spark: spark.read.parquet(data_path).filter(f.col("a") > f.lit(datetime(2024, 1, 1, tzinfo=timezone.utc)))._jdf.queryExecution().executedPlan(),
conf=filter_split_conf)
check_filter_pushdown(plan, pushed_exprs=[], not_pushed_exprs=['isnotnull', '>'])
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path).filter(f.col("a") > f.lit(datetime(2024, 1, 1, tzinfo=timezone.utc))),
conf=filter_split_conf)
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,13 @@ object HybridExecutionUtils extends PredicateHelper {
}
}

def isTimestampCondition(expr: Expression): Boolean = {
expr.references.exists(attr => attr.dataType == TimestampType)
}

def isExprSupportedByHybridScan(condition: Expression, whitelistExprsName: String): Boolean = {
condition match {
case filter if isTimestampCondition(filter) => false // Timestamp is not fully supported in Hybrid Filter
case filter if HybridExecutionUtils.supportedByHybridFilters(whitelistExprsName)
.exists(_.isInstance(filter)) =>
val childrenSupported = filter.children.forall(
Expand Down

0 comments on commit 234c9b9

Please sign in to comment.