Skip to content

Commit

Permalink
use optimized plan
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Jan 25, 2024
1 parent ecefc2a commit 196845a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, SparkPlan, SQLExecution, TakeOrderedAndProjectExec}
import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand Down Expand Up @@ -294,18 +295,17 @@ object SparkDatasetHelper extends Logging {
SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
}

private[kyuubi] def planLimit(plan: SparkPlan): Option[Int] = plan match {
case tp: TakeOrderedAndProjectExec => Option(tp.limit)
case c: CollectLimitExec => Option(c.limit)
case ap: AdaptiveSparkPlanExec => planLimit(ap.inputPlan)
case _ => None
}
private[kyuubi] def optimizedPlanLimit(queryExecution: QueryExecution): Option[Long] =
queryExecution.optimizedPlan match {
case globalLimit: GlobalLimit => globalLimit.maxRows
case _ => None
}

def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
return false
}
val finalLimit = planLimit(result.queryExecution.sparkPlan) match {
val finalLimit = optimizedPlanLimit(result.queryExecution) match {
case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
case Some(limit) => limit
case None => resultMaxRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine {
" SELECT * FROM VALUES(1),(2),(3),(4) AS t(id)")

val topKStatement = s"SELECT * FROM(SELECT * FROM tv ORDER BY id LIMIT ${topKThreshold - 1})"
assert(SparkDatasetHelper.planLimit(
spark.sql(topKStatement).queryExecution.sparkPlan) === Option(topKThreshold - 1))
assert(SparkDatasetHelper.optimizedPlanLimit(
spark.sql(topKStatement).queryExecution) === Option(topKThreshold - 1))

val collectLimitStatement =
s"SELECT * FROM (SELECT * FROM tv ORDER BY id LIMIT $topKThreshold)"
assert(SparkDatasetHelper.planLimit(
spark.sql(collectLimitStatement).queryExecution.sparkPlan) === Option(topKThreshold))
assert(SparkDatasetHelper.optimizedPlanLimit(
spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold))
}
}
}

0 comments on commit 196845a

Please sign in to comment.