Skip to content

Commit

Permalink
Merge branch 'main' into velox-merge-agg
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf authored Nov 18, 2024
2 parents dcc0b4b + 17d9cd8 commit 935d751
Show file tree
Hide file tree
Showing 128 changed files with 7,481 additions and 2,739 deletions.
20 changes: 10 additions & 10 deletions .github/workflows/util/install_spark_resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,26 @@ case "$1" in
3.5)
# Spark-3.5
cd ${INSTALL_DIR} && \
wget -nv https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.3-bin-hadoop3.tgz spark-3.5.3-bin-hadoop3/jars/ && \
rm -rf spark-3.5.3-bin-hadoop3.tgz && \
wget -nv https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.2-bin-hadoop3.tgz spark-3.5.2-bin-hadoop3/jars/ && \
rm -rf spark-3.5.2-bin-hadoop3.tgz && \
mkdir -p ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.12 && \
mv jars ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.12 && \
wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.3.tar.gz && \
tar --strip-components=1 -xf v3.5.3.tar.gz spark-3.5.3/sql/core/src/test/resources/ && \
wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.2.tar.gz && \
tar --strip-components=1 -xf v3.5.2.tar.gz spark-3.5.2/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
mv sql shims/spark35/spark_home/
;;
3.5-scala2.13)
# Spark-3.5, scala 2.13
cd ${INSTALL_DIR} && \
wget -nv https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.3-bin-hadoop3.tgz spark-3.5.3-bin-hadoop3/jars/ && \
rm -rf spark-3.5.3-bin-hadoop3.tgz && \
wget -nv https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.2-bin-hadoop3.tgz spark-3.5.2-bin-hadoop3/jars/ && \
rm -rf spark-3.5.2-bin-hadoop3.tgz && \
mkdir -p ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.13 && \
mv jars ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.13 && \
wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.3.tar.gz && \
tar --strip-components=1 -xf v3.5.3.tar.gz spark-3.5.3/sql/core/src/test/resources/ && \
wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.2.tar.gz && \
tar --strip-components=1 -xf v3.5.2.tar.gz spark-3.5.2/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
mv sql shims/spark35/spark_home/
;;
Expand Down
56 changes: 41 additions & 15 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ jobs:
df -a
cd $GITHUB_WORKSPACE/
bash dev/ci-velox-buildstatic-centos-7.sh
ccache -s
- name: "Save ccache"
uses: actions/cache/save@v3
id: ccache
Expand Down Expand Up @@ -564,6 +565,12 @@ jobs:
with:
name: test-report-spark32
path: '**/surefire-reports/TEST-*.xml'
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
with:
name: golden-files-spark32
path: /tmp/tpch-approved-plan/**

run-spark-test-spark32-slow:
needs: build-native-lib-centos-7
Expand Down Expand Up @@ -633,6 +640,12 @@ jobs:
with:
name: test-report-spark33
path: '**/surefire-reports/TEST-*.xml'
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
with:
name: golden-files-spark33
path: /tmp/tpch-approved-plan/**


run-spark-test-spark33-slow:
Expand Down Expand Up @@ -704,6 +717,12 @@ jobs:
with:
name: test-report-spark34
path: '**/surefire-reports/TEST-*.xml'
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
with:
name: golden-files-spark34
path: /tmp/tpch-approved-plan/**


run-spark-test-spark34-slow:
Expand Down Expand Up @@ -754,19 +773,19 @@ jobs:
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare spark.test.home for Spark 3.5.3 (other tests)
- name: Prepare spark.test.home for Spark 3.5.2 (other tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.3 cython && \
pip3 install pyspark==3.5.2 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.3 (other tests)
- name: Build and Run unit test for Spark 3.5.2 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload test report
Expand All @@ -775,6 +794,12 @@ jobs:
with:
name: test-report-spark35
path: '**/surefire-reports/TEST-*.xml'
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
with:
name: golden-files-spark35
path: /tmp/tpch-approved-plan/**

run-spark-test-spark35-scala213:
needs: build-native-lib-centos-7
Expand All @@ -792,15 +817,15 @@ jobs:
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare spark.test.home for Spark 3.5.3 (other tests)
- name: Prepare spark.test.home for Spark 3.5.2 (other tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5-scala2.13
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.3 cython && \
pip3 install pyspark==3.5.2 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.3 with scala-2.13 (other tests)
- name: Build and Run unit test for Spark 3.5.2 with scala-2.13 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.13
Expand Down Expand Up @@ -830,13 +855,13 @@ jobs:
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare spark.test.home for Spark 3.5.3 (slow tests)
- name: Prepare spark.test.home for Spark 3.5.2 (slow tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5
- name: Build and Run unit test for Spark 3.5.3 (slow tests)
- name: Build and Run unit test for Spark 3.5.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
Expand All @@ -862,15 +887,15 @@ jobs:
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare spark.test.home for Spark 3.5.3 (other tests)
- name: Prepare spark.test.home for Spark 3.5.2 (other tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.3 cython && \
pip3 install pyspark==3.5.2 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.3 (other tests)
- name: Build and Run unit test for Spark 3.5.2 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
Expand Down Expand Up @@ -899,10 +924,10 @@ jobs:
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare spark.test.home for Spark 3.5.3 (slow tests)
- name: Prepare spark.test.home for Spark 3.5.2 (slow tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5
- name: Build and Run unit test for Spark 3.5.3 (slow tests)
- name: Build and Run unit test for Spark 3.5.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
Expand Down Expand Up @@ -930,6 +955,7 @@ jobs:
run: |
df -a
bash dev/ci-velox-buildshared-centos-8.sh
ccache -s
- name: "Save ccache"
uses: actions/cache/save@v3
id: ccache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def supportWindowGroupLimitExec(rankLikeFunction: Expression): Boolean = true

override def supportHiveTableScanNestedColumnPruning: Boolean =
GlutenConfig.getConf.enableColumnarHiveTableScanNestedColumnPruning

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import java.lang.{Long => JLong}
import java.net.URI
Expand Down Expand Up @@ -133,8 +132,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
ExtensionTableBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.{Injector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser}
import org.apache.gluten.sql.shims.SparkShimLoader
Expand All @@ -31,14 +31,14 @@ import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRew
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.DeltaLogFileIndex
import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, CommandResultExec, FileSourceScanExec, GlutenFallbackReporter, RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.util.SparkPlanRules

class CHRuleApi extends RuleApi {
import CHRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
override def injectRules(injector: Injector): Unit = {
injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
Expand All @@ -57,25 +57,27 @@ private object CHRuleApi {
injector.injectResolutionRule(spark => new RewriteToDateExpresstionRule(spark))
injector.injectResolutionRule(spark => new RewriteDateTimestampComparisonRule(spark))
injector.injectOptimizerRule(spark => new CommonSubexpressionEliminateRule(spark))
injector.injectOptimizerRule(spark => new ExtendedGeneratorNestedColumnAliasing(spark))
injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark))
injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
injector.injectOptimizerRule(_ => EqualToRewrite)
injector.injectPreCBORule(spark => new CHOptimizeMetadataOnlyDeltaQuery(spark))
}

def injectLegacy(injector: LegacyInjector): Unit = {

// Gluten columnar: Transform rules.
// Legacy: Pre-transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectTransform(_ => intercept(RewriteSparkPlanRulesManager()))
injector.injectTransform(_ => intercept(AddFallbackTagRule()))
injector.injectTransform(_ => intercept(TransformPreOverrides()))

// Legacy: The Legacy transform rule.
injector.injectTransform(_ => intercept(HeuristicTransform()))

// Legacy: Post-transform rules.
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => intercept(RewriteTransformer.apply(c.session)))
injector.injectTransform(_ => PushDownFilterToScan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.ExpressionNames.MONOTONICALLY_INCREASING_ID
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
Expand Down Expand Up @@ -224,9 +223,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
}
// FIXME: The operation happens inside ReplaceSingleNode().
// Caller may not know it adds project on top of the shuffle.
val project = TransformPreOverrides().apply(
AddFallbackTagRule().apply(
ProjectExec(plan.child.output ++ projectExpressions, plan.child)))
// FIXME: HeuristicTransform is costly. Re-applying it may cause performance issues.
val project =
HeuristicTransform()(ProjectExec(plan.child.output ++ projectExpressions, plan.child))
var newExprs = Seq[Expression]()
for (i <- exprs.indices) {
val pos = newExpressionsPosition(i)
Expand All @@ -249,9 +248,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
}
// FIXME: The operation happens inside ReplaceSingleNode().
// Caller may not know it adds project on top of the shuffle.
val project = TransformPreOverrides().apply(
AddFallbackTagRule().apply(
ProjectExec(plan.child.output ++ projectExpressions, plan.child)))
// FIXME: HeuristicTransform is costly. Re-applying it may cause performance issues.
val project =
HeuristicTransform()(ProjectExec(plan.child.output ++ projectExpressions, plan.child))
var newOrderings = Seq[SortOrder]()
for (i <- orderings.indices) {
val oldOrdering = orderings(i)
Expand Down
Loading

0 comments on commit 935d751

Please sign in to comment.