Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Implement SupportsPushDownTopN #196

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,28 @@ object ExprUtils extends SQLConfHelper {
case unsupported => throw CHClientException(s"Unsupported ClickHouse expression: $unsupported")
}

def toClickHouse(transform: Transform): Expr = transform match {
def toClickHouseOpt(v2Expr: V2Expression): Option[Expr] = Try(toClickHouse(v2Expr)).toOption

def toClickHouse(v2Expr: V2Expression): Expr = v2Expr match {
// sort order
case sortOrder: SortOrder =>
val asc = sortOrder.direction == SortDirection.ASCENDING
val nullFirst = sortOrder.nullOrdering == NullOrdering.NULLS_FIRST
OrderExpr(toClickHouse(sortOrder.expression), asc, nullFirst)
// transform
case YearsTransform(FieldReference(Seq(col))) => FuncExpr("toYear", List(FieldRef(col)))
case MonthsTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMM", List(FieldRef(col)))
case DaysTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMMDD", List(FieldRef(col)))
case HoursTransform(FieldReference(Seq(col))) => FuncExpr("toHour", List(FieldRef(col)))
case IdentityTransform(fieldRefs) => FieldRef(fieldRefs.describe)
case ApplyTransform(name, args) => FuncExpr(name, args.map(arg => SQLExpr(arg.describe())).toList)
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
// others
case l: Literal[_] => SQLExpr(l.toString)
case FieldReference(Seq(col)) => FieldRef(col)
case gse: GeneralScalarExpression => SQLExpr(gse.toString) // TODO: excluding unsupported
// unsupported
case unsupported: V2Expression => throw CHClientException(s"Unsupported expression: $unsupported")
}

def inferTransformSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ package xenon.clickhouse.read

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.clickhouse.ExprUtils
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.read.partitioning.{Partitioning, UnknownPartitioning}
Expand All @@ -38,6 +39,7 @@ class ClickHouseScanBuilder(
metadataSchema: StructType,
partitionTransforms: Array[Transform]
) extends ScanBuilder
with SupportsPushDownTopN
with SupportsPushDownLimit
with SupportsPushDownFilters
with SupportsPushDownAggregates
Expand All @@ -56,13 +58,25 @@ class ClickHouseScanBuilder(
physicalSchema.fields ++ reservedMetadataSchema.fields
)

private var _orders: Option[String] = None

private var _limit: Option[Int] = None

override def pushLimit(limit: Int): Boolean = {
this._limit = Some(limit)
true
}

override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
val translated = orders.map(sortOrder => ExprUtils.toClickHouseOpt(sortOrder))
if (translated.exists(_.isEmpty)) {
return false
}
this._orders = Some(translated.flatten.mkString(" "))
this._limit = Some(limit)
true
}

private var _pushedFilters = Array.empty[Filter]

override def pushedFilters: Array[Filter] = this._pushedFilters
Expand Down Expand Up @@ -121,6 +135,7 @@ class ClickHouseScanBuilder(
readSchema = _readSchema,
filtersExpr = compileFilters(AlwaysTrue :: pushedFilters.toList),
groupByClause = _groupByClause,
orderByClause = _orders.map(_.mkString("ORDER BY", " ", "")),
limit = _limit
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ abstract class ClickHouseReader[Record](
|FROM `$database`.`$table`
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.orderByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ case class ScanJobDescription(
// into Scan tasks because the check happens in planing phase on driver side.
filtersExpr: String = "1=1",
groupByClause: Option[String] = None,
orderByClause: Option[String] = None,
limit: Option[Int] = None
) {

Expand Down