Skip to content

Commit

Permalink
Support Runtime Filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyifan279 committed Oct 20, 2023
1 parent 05adedc commit 2dbfbd0
Showing 1 changed file with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package org.apache.spark.sql.clickhouse.cluster

import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.READ_DISTRIBUTED_CONVERT_LOCAL
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

class ClickHouseClusterReadSuite extends SparkClickHouseClusterTest {

Expand Down Expand Up @@ -83,4 +85,33 @@ class ClickHouseClusterReadSuite extends SparkClickHouseClusterTest {
)
}
}

test("runtime filter - distributed table") {
withSimpleDistTable("single_replica", "runtime_db", "runtime_tbl", true) { (_, db, tbl_dist, _) =>
spark.sql("set spark.clickhouse.read.runtimeFilter.enabled=false")
checkAnswer(
spark.sql(s"SELECT id FROM $db.$tbl_dist " +
s"WHERE id IN (" +
s" SELECT id FROM $db.$tbl_dist " +
s" WHERE DATE_FORMAT(create_time, 'yyyy-MM-dd') between '2021-01-01' and '2022-01-01'" +
s")"),
Row(1)
)

spark.sql("set spark.clickhouse.read.runtimeFilter.enabled=true")
val df = spark.sql(s"SELECT id FROM $db.$tbl_dist " +
s"WHERE id IN (" +
s" SELECT id FROM $db.$tbl_dist " +
s" WHERE DATE_FORMAT(create_time, 'yyyy-MM-dd') between '2021-01-01' and '2022-01-01'" +
s")")
checkAnswer(df, Row(1))
val runtimeFilterExists = df.queryExecution.sparkPlan.exists {
case BatchScanExec(_, _, runtimeFilters, _, _, table, _, _, _)
if table.name() == TableIdentifier(tbl_dist, Some(db)).quotedString
&& runtimeFilters.nonEmpty => true
case _ => false
}
assert(runtimeFilterExists)
}
}
}

0 comments on commit 2dbfbd0

Please sign in to comment.