Skip to content

Commit

Permalink
Merge branch 'main' into lihbdfs3-support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuan authored Nov 5, 2024
2 parents 3f513a0 + 3108d91 commit 34c95e2
Show file tree
Hide file tree
Showing 73 changed files with 756 additions and 644 deletions.
103 changes: 15 additions & 88 deletions .github/workflows/build_bundle_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ name: Build bundle package

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
CCACHE_DIR: "${{ github.workspace }}/.ccache"

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
Expand All @@ -25,10 +26,6 @@ concurrency:
on:
workflow_dispatch:
inputs:
os:
description: 'OS version: ubuntu:20.04, ubuntu:22.04, centos:7 or centos:8'
required: true
default: 'ubuntu:20.04'
spark:
description: 'Spark version: spark-3.2, spark-3.3, spark-3.4 or spark-3.5'
required: true
Expand All @@ -41,19 +38,22 @@ on:
jobs:
build-native-lib:
runs-on: ubuntu-20.04
container: apache/gluten:gluten-vcpkg-builder_2024_05_29
container: apache/gluten:vcpkg-centos-7
steps:
- uses: actions/checkout@v2
- name: Get Ccache
uses: actions/cache/restore@v3
with:
path: '${{ env.CCACHE_DIR }}'
key: ccache-centos7-release-default-${{github.sha}}
restore-keys: |
ccache-centos7-release-default
- name: Build Gluten velox third party
run: |
yum install sudo patch java-1.8.0-openjdk-devel -y && \
cd $GITHUB_WORKSPACE/ep/build-velox/src && \
./get_velox.sh && \
source /opt/rh/devtoolset-11/enable && \
cd $GITHUB_WORKSPACE/ && \
export NUM_THREADS=4
./dev/builddeps-veloxbe.sh --enable_vcpkg=ON --build_tests=OFF --build_benchmarks=OFF --enable_s3=OFF \
--enable_gcs=OFF --enable_hdfs=ON --enable_abfs=OFF
df -a
yum install ccache -y
cd $GITHUB_WORKSPACE/
bash dev/ci-velox-buildstatic-centos-7.sh
- name: Upload native libs
uses: actions/upload-artifact@v2
with:
Expand All @@ -66,44 +66,10 @@ jobs:
path: /root/.m2/repository/org/apache/arrow/
name: velox-arrow-jar-centos-7-${{github.sha}}

build-bundle-package-ubuntu:
if: startsWith(github.event.inputs.os, 'ubuntu')
needs: build-native-lib
runs-on: ubuntu-20.04
container: ${{ github.event.inputs.os }}
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-${{github.sha}}
path: ./cpp/build/releases
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
apt-get update && \
apt-get install -y openjdk-8-jdk maven && \
apt remove openjdk-11* -y
- name: Build for Spark ${{ github.event.inputs.spark }}
run: |
cd $GITHUB_WORKSPACE/ && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
name: gluten-velox-bundle-package
path: package/target/gluten-velox-bundle-*.jar
retention-days: 7

build-bundle-package-centos7:
if: ${{ github.event.inputs.os == 'centos:7' }}
needs: build-native-lib
runs-on: ubuntu-20.04
container: ${{ github.event.inputs.os }}
container: centos:7
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
Expand All @@ -127,50 +93,11 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -Puniffle -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
name: gluten-velox-bundle-package
path: package/target/gluten-velox-bundle-*.jar
retention-days: 7

build-bundle-package-centos8:
if: ${{ github.event.inputs.os == 'centos:8' }}
needs: build-native-lib
runs-on: ubuntu-20.04
container: ${{ github.event.inputs.os }}
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-${{github.sha}}
path: ./cpp/build/releases
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true && \
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
- name: Setup java and maven
run: |
yum update -y && yum install -y java-1.8.0-openjdk-devel wget && \
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz && \
tar -xvf apache-maven-3.8.8-bin.tar.gz && \
mv apache-maven-3.8.8 /usr/lib/maven
- name: Build for Spark ${{ github.event.inputs.spark }}
run: |
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
name: gluten-velox-bundle-package
path: package/target/gluten-velox-bundle-*.jar
retention-days: 7
12 changes: 4 additions & 8 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -993,16 +993,14 @@ jobs:
pip3 install setuptools && \
pip3 install pyspark==3.5.3 cython && \
pip3 install pandas pyarrow
- name: (To be fixed) Build and Run unit test for Spark 3.5.3 (other tests)
continue-on-error: true
- name: Build and Run unit test for Spark 3.5.3 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/ -Dspark.gluten.ras.enabled=true" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: (To be enabled) Upload test report
if: false
- name: Upload test report
uses: actions/upload-artifact@v4
with:
name: test-report-spark35-ras
Expand Down Expand Up @@ -1035,15 +1033,13 @@ jobs:
- name: Prepare spark.test.home for Spark 3.5.3 (slow tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5
- name: (To be fixed) Build and Run unit test for Spark 3.5.3 (slow tests)
continue-on-error: true
- name: Build and Run unit test for Spark 3.5.3 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/ -Dspark.gluten.ras.enabled=true" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: (To be enabled) Upload test report
if: false
- name: Upload test report
uses: actions/upload-artifact@v4
with:
name: test-report-spark35-slow-ras
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import java.util.Set;

public class CHNativeCacheManager {
public static String cacheParts(String table, Set<String> columns) {
return nativeCacheParts(table, String.join(",", columns));
public static String cacheParts(String table, Set<String> columns, boolean onlyMetaCache) {
return nativeCacheParts(table, String.join(",", columns), onlyMetaCache);
}

private static native String nativeCacheParts(String table, String columns);
private static native String nativeCacheParts(
String table, String columns, boolean onlyMetaCache);

public static CacheResult getCacheStatus(String jobId) {
return nativeGetCacheStatus(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
.toLowerCase(Locale.getDefault)
}

override def validateScan(
override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String]): ValidationResult = {
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = {

// Validate if all types are supported.
def hasComplexType: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ private object CHRuleApi {
injector.injectTransform(
c =>
intercept(
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session)))
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
c.session)))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
Expand All @@ -98,14 +99,15 @@ private object CHRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => intercept(each(c.session))))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
injector.injectTransform(
c =>
intercept(SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session)))
intercept(
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session)))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.gluten.vectorized.CHNativeExpressionEvaluator

import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -71,6 +71,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg
}

override def doColumnarShuffleExchangeExecValidate(
outputAttributes: Seq[Attribute],
outputPartitioning: Partitioning,
child: SparkPlan): Option[String] = {
val outputAttributes = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import scala.util.control.Breaks.{break, breakable}
// queryStagePrepRules.
case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
val columnarConf: GlutenConfig = GlutenConfig.getConf
val glutenConf: GlutenConfig = GlutenConfig.getConf
plan.foreach {
case bhj: BroadcastHashJoinExec =>
val buildSidePlan = bhj.buildSide match {
Expand All @@ -53,8 +53,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
case Some(exchange @ BroadcastExchangeExec(mode, child)) =>
val isTransformable =
if (
!columnarConf.enableColumnarBroadcastExchange ||
!columnarConf.enableColumnarBroadcastJoin
!glutenConf.enableColumnarBroadcastExchange ||
!glutenConf.enableColumnarBroadcastJoin
) {
ValidationResult.failed(
"columnar broadcast exchange is disabled or " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession)
extends Rule[SparkPlan]
with Logging {

val columnarConf: GlutenConfig = GlutenConfig.getConf
val scanOnly: Boolean = columnarConf.enableScanOnly
val enableColumnarHashAgg: Boolean = !scanOnly && columnarConf.enableColumnarHashAgg
val glutenConf: GlutenConfig = GlutenConfig.getConf
val scanOnly: Boolean = glutenConf.enableScanOnly
val enableColumnarHashAgg: Boolean = !scanOnly && glutenConf.enableColumnarHashAgg
val replaceSortAggWithHashAgg: Boolean = GlutenConfig.getConf.forceToUseHashAgg

private def isPartialAgg(partialAgg: BaseAggregateExec, finalAgg: BaseAggregateExec): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns, onlyMetaCache) =>
try {
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns, onlyMetaCache)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ object GlutenRpcMessages {
extends GlutenRpcMessage

// for mergetree cache
case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String])
case class GlutenMergeTreeCacheLoad(
mergeTreeTable: String,
columns: util.Set[String],
onlyMetaCache: Boolean)
extends GlutenRpcMessage

case class GlutenCacheLoadStatus(jobId: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ case class GlutenCHCacheDataCommand(
(
executorId,
executor.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava)
GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
} else {
Expand All @@ -213,7 +213,7 @@ case class GlutenCHCacheDataCommand(
(
value._1,
executorData.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava)
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1461,4 +1461,26 @@ class GlutenClickHouseFileFormatSuite
spark.createDataFrame(data, schema).toDF().write.parquet(fileName)
fileName
}

/** TODO: fix the issue and test in spark 3.5 */
testSparkVersionLE33("write into hdfs") {

/**
* There is a bug in pipeline write to HDFS; when a pipeline returns column batch, it doesn't
* close the hdfs file, and hence the file is not flushed.HDFS file is closed when LocalExecutor
* is destroyed, but before that, the file moved by spark committer.
*/
val tableName = "write_into_hdfs"
val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/"
val format = "parquet"
val sql =
s"""
| select *
| from $format.`$tablePath`
| where long_field > 30
|""".stripMargin
withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) {
testFileFormatBase(tablePath, format, sql, df => {})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -563,5 +563,12 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
compareResultsAgainstVanillaSpark(sql, true, { _ => })
spark.sql("drop table t1")
}

test("GLUTEN-7780 fix split diff") {
val sql = "select split(concat('a|b|c', cast(id as string)), '\\|')" +
", split(concat('a|b|c', cast(id as string)), '\\\\|')" +
", split(concat('a|b|c', cast(id as string)), '|') from range(10)"
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
}
// scalastyle:off line.size.limit
Loading

0 comments on commit 34c95e2

Please sign in to comment.