Skip to content

Commit

Permalink
support spark3.0 (pingcap#1606)
Browse files Browse the repository at this point in the history
  • Loading branch information
marsishandsome authored Sep 24, 2020
1 parent 2dad508 commit de4cee5
Show file tree
Hide file tree
Showing 42 changed files with 827 additions and 1,661 deletions.
4 changes: 2 additions & 2 deletions .ci/build.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPul
export LANGUAGE=en_US.UTF-8
cp -R /home/jenkins/agent/git/tispark/. ./
git checkout -f ${ghprbActualCommit}
mvn mvn-scalafmt_2.11:format -Dscalafmt.skip=false
mvn mvn-scalafmt_2.12:format -Dscalafmt.skip=false
mvn com.coveo:fmt-maven-plugin:format
git diff --quiet
formatted="\$?"
if [[ "\${formatted}" -eq 1 ]]
then
echo "code format error, please run the following commands:"
echo " mvn mvn-scalafmt_2.11:format -Dscalafmt.skip=false"
echo " mvn mvn-scalafmt_2.12:format -Dscalafmt.skip=false"
echo " mvn com.coveo:fmt-maven-plugin:format"
exit 1
fi
Expand Down
14 changes: 14 additions & 0 deletions .ci/integration_test.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def call(ghprbActualCommit, ghprbCommentBody, ghprbPullId, ghprbPullTitle, ghprb
def PARALLEL_NUMBER = 18
def TEST_REGION_SIZE = "normal"
def TEST_TIFLASH = "false"
def TEST_SPARK_CATALOG = "false"

// parse tidb branch
def m1 = ghprbCommentBody =~ /tidb\s*=\s*([^\s\\]+)(\s|\\|$)/
Expand Down Expand Up @@ -65,6 +66,12 @@ def call(ghprbActualCommit, ghprbCommentBody, ghprbPullId, ghprbPullTitle, ghprb
TEST_TIFLASH = "${m8[0][1]}"
}

// parse test spark catalog
def m100 = ghprbCommentBody =~ /test-spark-catalog\s*=\s*([^\s\\]+)(\s|\\|$)/
if (m100) {
TEST_SPARK_CATALOG = "${m100[0][1]}"
}

groovy.lang.Closure readfile = { filename ->
def file = readFile filename
return file.split("\n") as List
Expand Down Expand Up @@ -164,6 +171,13 @@ def call(ghprbActualCommit, ghprbCommentBody, ghprbPullId, ghprbPullTitle, ghprb
sh "cp .ci/tidb_config-for-tiflash-test.properties core/src/test/resources/tidb_config.properties"
}

if (TEST_SPARK_CATALOG != "false") {
sh """
touch core/src/test/resources/tidb_config.properties
echo "spark.sql.catalog.tidb_catalog=org.apache.spark.sql.catalyst.catalog.TiCatalog" >> core/src/test/resources/tidb_config.properties
"""
}

sh """
sed -i 's/core\\/src\\/test\\/scala\\///g' test
sed -i 's/\\//\\./g' test
Expand Down
28 changes: 17 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ For other build tools, visit <https://search.maven.org/> and search with GroupId

## How to build from sources

TiSpark now supports Spark 2.3.0+ or 2.4.0+. The earlier TiSpark versions for Spark 2.1.0+ only contain bug-fixes. After these versions, you can still get support for Spark 2.1 until TiSpark 1.2.1.
TiSpark now supports Spark 2.3.0+2.4.0+ and 3.0.0+. The earlier TiSpark versions for Spark 2.1.0+ only contain bug-fixes. After these versions, you can still get support for Spark 2.1 until TiSpark 1.2.1.

Currently `java8` is the only choice to build TiSpark, run `mvn -version` to check.

Expand All @@ -67,6 +67,7 @@ To skip the tests that you do not need to run, add `-Dmaven.test.skip=true`.

| Spark Version | Recommended TiSpark Version |
| ------------- | ---------------------------- |
| Spark-3.0.x | TiSpark-2.4.0-SNAPSHOT |
| Spark-2.4.x | TiSpark-2.3.4、TiSpark-2.1.9 |
| Spark-2.3.x | TiSpark-2.3.4、TiSpark-2.1.9 |
| Spark-2.2.x | TiSpark-1.2.1 |
Expand All @@ -80,26 +81,24 @@ To skip the tests that you do not need to run, add `-Dmaven.test.skip=true`.
| 1.2.x | v2.1.x |
| 2.1.x | v3.0.2 |
| 2.3.x | v4.0.x |
| 2.4.x | v4.0.x |

## Spark versions supported by TiSpark

Although TiSpark provides backward compatibility to TiDB, it only guarantees the **restricted** support for the earlier Spark versions to follow the latest DataSource API changes.

| TiSpark Version | Spark Version |
| --------------- | ---------------------------- |
| 1.x | Spark v2.1.0+ |
| 2.0 | Spark v2.3.0+ |
| 2.1.x | Spark v2.3.0+, Spark v2.4.0+ |
| 2.3.x | Spark v2.3.0+, Spark v2.4.0+ |
| TiSpark Version | Spark Version | Scala Version |
| --------------- | ---------------------------- | ------------- |
| 1.x | Spark v2.1.0+ | 2.11 |
| 2.0 | Spark v2.3.0+ | 2.11 |
| 2.1.x | Spark v2.3.0+, Spark v2.4.0+ | 2.11 |
| 2.3.x | Spark v2.3.0+, Spark v2.4.0+ | 2.11 |
| 2.4.x | Spark v3.0.0+ | 2.12 |

## How to upgrade from Spark 2.1 to Spark 2.3/2.4

For the users of Spark 2.1 who wish to upgrade to the latest TiSpark version on Spark 2.3/2.4, download or install Spark 2.3+/2.4+ by following the instructions on [Apache Spark Site](http://spark.apache.org/downloads.html) and overwrite the old spark version in `$SPARK_HOME`.

## Scala Version

TiSpark currently only supports `scala-2.11`.

## TiSpark Architecture

The figure below show the architecture of TiSpark.
Expand Down Expand Up @@ -207,6 +206,13 @@ The configurations in the table below can be put together with `spark-defaults.c
| `spark.tispark.coprocessor.chunk_batch_size` | `1024` | How many rows fetched from Coprocessor |
| `spark.tispark.isolation_read_engines` | `tikv,tiflash` | List of readable engines of TiSpark, comma separated, storage engines not listed will not be read |

## Spark-3.0 Catalog
Please use the following configuration to enable `Catalog` provided by `spark-3.0`.

```
spark.sql.catalog.tidb_catalog org.apache.spark.sql.catalyst.catalog.TiCatalog
```

## `Log4j` Configuration

When you start `spark-shell` or `spark-sql` and run query, you might see the following warnings:
Expand Down
21 changes: 0 additions & 21 deletions assembly/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,4 @@
<unpack>true</unpack>
</dependencySet>
</dependencySets>

<fileSets>
<fileSet>
<directory>
${project.parent.basedir}/spark-wrapper/spark-2.3/target/classes/
</directory>
<outputDirectory>resources/spark-wrapper-spark-2.3</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/spark-wrapper/spark-2.4/target/classes/
</directory>
<outputDirectory>resources/spark-wrapper-spark-2.4</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
</fileSets>
</assembly>
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public byte[] getBinary(int rowId) {

/** @return child [[ColumnVector]] at the given ordinal. */
@Override
protected ColumnVector getChild(int ordinal) {
public ColumnVector getChild(int ordinal) {
throw new UnsupportedOperationException("TiColumnVectorAdapter is not supported this method");
}
}
9 changes: 4 additions & 5 deletions core/src/main/scala/com/pingcap/tispark/TiDBRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package com.pingcap.tispark
import com.pingcap.tikv.TiSession
import com.pingcap.tikv.exception.{TiBatchWriteException, TiClientInternalException}
import com.pingcap.tikv.meta.{TiDAGRequest, TiTableInfo, TiTimestamp}
import com.pingcap.tispark.utils.ReflectionUtil.newAttributeReference
import com.pingcap.tispark.utils.TiUtil
import com.pingcap.tispark.write.{TiDBOptions, TiDBWriter}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.tispark.{TiHandleRDD, TiRowRDD}
Expand Down Expand Up @@ -88,12 +87,12 @@ case class TiDBRelation(
val ids = dagRequest.getPrunedPhysicalIds.asScala
var tiHandleRDDs = new ListBuffer[TiHandleRDD]()
lazy val attributeRef = Seq(
newAttributeReference("RegionId", LongType, nullable = false, Metadata.empty),
newAttributeReference(
AttributeReference("RegionId", LongType, nullable = false, Metadata.empty)(),
AttributeReference(
"Handles",
ArrayType(LongType, containsNull = false),
nullable = false,
Metadata.empty))
Metadata.empty)())

val tiConf = session.getConf
tiConf.setPartitionPerSplit(TiUtil.getPartitionPerSplit(sqlContext))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/com/pingcap/tispark/TiSparkInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory
object TiSparkInfo {
private final val logger = LoggerFactory.getLogger(getClass.getName)

val SUPPORTED_SPARK_VERSION: List[String] = "2.3" :: "2.4" :: Nil
val SUPPORTED_SPARK_VERSION: List[String] = "3.0" :: Nil

val SPARK_VERSION: String = org.apache.spark.SPARK_VERSION

Expand Down
30 changes: 0 additions & 30 deletions core/src/main/scala/com/pingcap/tispark/examples/OneJarTest.scala

This file was deleted.

Loading

0 comments on commit de4cee5

Please sign in to comment.