Skip to content

Commit

Permalink
support Spark3.3 (pingcap#2492)
Browse files Browse the repository at this point in the history
  • Loading branch information
qidi1 authored Aug 22, 2022
1 parent 3b5458f commit 83afee3
Show file tree
Hide file tree
Showing 28 changed files with 1,356 additions and 27 deletions.
9 changes: 9 additions & 0 deletions assembly/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,14 @@
<include>**/*</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/spark-wrapper/spark-3.3/target/classes/
</directory>
<outputDirectory>resources/spark-wrapper-spark-3_3</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
</fileSets>
</assembly>
20 changes: 18 additions & 2 deletions core-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,27 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version.test}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -118,6 +128,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
16 changes: 14 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand All @@ -60,6 +60,14 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -107,6 +115,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/com/pingcap/tispark/TiSparkInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory
object TiSparkInfo {
private final val logger = LoggerFactory.getLogger(getClass.getName)

val SUPPORTED_SPARK_VERSION: List[String] = "3.0" :: "3.1" :: "3.2" :: Nil
val SUPPORTED_SPARK_VERSION: List[String] = "3.0" :: "3.1" :: "3.2" :: "3.3" :: Nil

val SPARK_VERSION: String = org.apache.spark.SPARK_VERSION

Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
package com.pingcap.tispark.utils

import com.pingcap.tispark.TiSparkInfo
import com.pingcap.tispark.auth.TiAuthorization
import com.pingcap.tispark.write.TiDBOptions
import org.apache.spark.sql.{SQLContext, SparkSession, Strategy, TiContext}
import org.apache.spark.sql.catalyst.expressions.BasicExpression.TiExpression
import org.apache.spark.sql.catalyst.expressions.{Alias, ExprId, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{SparkSession, Strategy, TiContext}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.slf4j.LoggerFactory

import java.io.File
import java.lang.reflect.InvocationTargetException
import java.net.{URL, URLClassLoader}

/**
Expand Down Expand Up @@ -60,6 +64,8 @@ object ReflectionUtil {
private val SPARK_WRAPPER_CLASS = "com.pingcap.tispark.SparkWrapper"
private val TI_BASIC_EXPRESSION_CLASS =
"org.apache.spark.sql.catalyst.expressions.TiBasicExpression"
private val TI_BASIC_LOGICAL_PLAN_CLASS =
"org.apache.spark.sql.catalyst.plans.logical.TiBasicLogicalPlan"
private val TI_STRATEGY_CLASS =
"org.apache.spark.sql.extensions.TiStrategy"
private val TIDB_WRITE_BUILDER_CLASS =
Expand Down Expand Up @@ -105,6 +111,24 @@ object ReflectionUtil {
.asInstanceOf[Option[TiExpression]]
}

def callTiBasicLogicalPlanVerifyAuthorizationRule(
logicalPlan: LogicalPlan,
tiAuthorization: Option[TiAuthorization]): LogicalPlan = {
try {
classLoader
.loadClass(TI_BASIC_LOGICAL_PLAN_CLASS)
.getDeclaredMethod(
"verifyAuthorizationRule",
classOf[LogicalPlan],
classOf[Option[TiAuthorization]])
.invoke(null, logicalPlan, tiAuthorization)
.asInstanceOf[LogicalPlan]
} catch {
case ex: InvocationTargetException =>
throw ex.getTargetException
}
}

def newTiStrategy(
getOrCreateTiContext: SparkSession => TiContext,
sparkSession: SparkSession): Strategy = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.spark.sql.catalyst.analyzer

import com.pingcap.tispark.auth.TiAuthorization
import com.pingcap.tispark.v2.TiDBTable
import org.apache.spark.sql.catalyst.plans.logical.{
BasicLogicalPlan,
DeleteFromTable,
LogicalPlan,
SetCatalogAndNamespace,
Expand All @@ -26,15 +28,13 @@ import org.apache.spark.sql.catalyst.plans.logical.{
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.{SparkSession, TiContext}
import com.pingcap.tispark.v2.TiDBTable

/**
* Only work for table v2(catalog plugin)
*/
case class TiAuthorizationRule(getOrCreateTiContext: SparkSession => TiContext)(
sparkSession: SparkSession)
extends Rule[LogicalPlan] {

protected val tiContext: TiContext = getOrCreateTiContext(sparkSession)
private lazy val tiAuthorization: Option[TiAuthorization] = tiContext.tiAuthorization

Expand All @@ -47,12 +47,8 @@ case class TiAuthorizationRule(getOrCreateTiContext: SparkSession => TiContext)(
tiAuthorization)
}
dt
case sd @ SetCatalogAndNamespace(catalogManager, catalogName, namespace) =>
if (catalogName.nonEmpty && catalogName.get.equals("tidb_catalog") && namespace.isDefined) {
namespace.get
.foreach(TiAuthorization.authorizeForSetDatabase(_, tiAuthorization))
}
sd
case s: SetCatalogAndNamespace =>
BasicLogicalPlan.verifyAuthorizationRule(s, tiAuthorization)
case dr @ DataSourceV2Relation(
TiDBTable(_, tableRef, _, _, _),
output,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,17 @@ class TiCatalog extends TableCatalog with SupportsNamespaces {

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = ???

override def dropNamespace(namespace: Array[String]): Boolean = ???

override def createNamespace(
namespace: Array[String],
metadata: util.Map[String, String]): Unit =
???

// for spark version smaller than 3.3
def dropNamespace(strings: Array[String]): Boolean = ???

// for spark version bigger equal 3.3
def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = ???

override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = ???

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class TiParser(
@scala.throws[ParseException]("Text cannot be parsed to a DataType")
def parseRawDataType(sqlText: String): DataType = ???

@scala.throws[ParseException]("Text cannot be parsed to a LogicalPlan")
def parseQuery(sqlText: String): LogicalPlan = ???

def getOrElseInitTiCatalog: TiCatalog = {
val catalogManager = sparkSession.sessionState.catalogManager
catalogManager.catalog("tidb_catalog").asInstanceOf[TiCatalog]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
*
* Copyright 2022 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.spark.sql.catalyst.plans.logical

import com.pingcap.tispark.auth.TiAuthorization
import com.pingcap.tispark.utils.ReflectionUtil

object BasicLogicalPlan {
def verifyAuthorizationRule(
logicalPlan: LogicalPlan,
tiAuthorization: Option[TiAuthorization]): LogicalPlan =
ReflectionUtil.callTiBasicLogicalPlanVerifyAuthorizationRule(logicalPlan, tiAuthorization)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class DeleteNotSupportSuite extends BaseBatchWriteTest("test_delete_not_support"

test("Delete without WHERE clause") {
jdbcUpdate(s"create table $dbtable(i int, s int,PRIMARY KEY (i))")

the[IllegalArgumentException] thrownBy {
spark.sql(s"delete from $dbtable")
} should have message "requirement failed: Delete without WHERE clause is not supported"
} should (have message "requirement failed: Delete without WHERE clause is not supported"
// when where clause is empty, Spark3.3 will send AlwaysTrue() to TiSpark.
or have message "requirement failed: Delete with alwaysTrue WHERE clause is not supported")
}

test("Delete with alwaysTrue WHERE clause") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class UnsignedOverflowSuite extends BaseBatchWriteTest("test_data_type_unsigned_
val jdbcErrorClass = classOf[java.lang.RuntimeException]
val tidbErrorClass = classOf[java.lang.RuntimeException]
val tidbErrorMsgStartWith =
"Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of bigint\nif (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, c1), LongType) AS c1"
"Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of bigint"

compareTiDBWriteFailureWithJDBC(
List(row),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.pingcap.tispark.telemetry

import com.pingcap.tikv.{TiConfiguration, TiSession}
import com.pingcap.tispark.auth.TiAuthorization.tiConf
import com.pingcap.tispark.listener.CacheInvalidateListener
import com.pingcap.tispark.utils.HttpClientUtil
import com.sun.net.httpserver.{
HttpExchange,
Expand All @@ -24,6 +27,7 @@ import com.sun.net.httpserver.{
HttpsConfigurator,
HttpsServer
}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.test.SharedSQLContext
import org.scalatest.Matchers.{be, noException}

Expand Down
4 changes: 2 additions & 2 deletions docs/authorization_userguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ perform.
This feature allows you to execute SQL in TiSpark with Authorization and authentication, the same behavior as TiDB

## Prerequisites

- The database's user account must have the `PROCESS` privilege.
- TiSpark version >= 2.5.0
- Spark version = 3.0.x or 3.1.x
- Spark version = 3.0.x or 3.1.x or 3.2.x or 3.3.x

## Setup

Expand Down
3 changes: 1 addition & 2 deletions docs/delete_userguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ spark.sql.catalog.tidb_catalog.pd.addresses ${your_pd_address}

## Requirement
- TiDB 4.x or 5.x
- Spark >= 3.0
- Spark = 3.0.x or 3.1.x or 3.2.x or 3.3.x

## Delete with SQL
```
Expand All @@ -38,4 +38,3 @@ spark.sql("delete from tidb_catalog.db.table where xxx")
- Delete from partition table is not supported.
- Delete with Pessimistic Transaction Mode is not supported.


16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,13 @@
<module>spark-wrapper/spark-3.0</module>
<module>spark-wrapper/spark-3.1</module>
<module>spark-wrapper/spark-3.2</module>
<module>spark-wrapper/spark-3.3</module>
<module>assembly</module>
</modules>

<profiles>
<profile>
<id>spark-3.1.1</id>
<id>spark-3.1</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
Expand All @@ -186,7 +187,7 @@
</properties>
</profile>
<profile>
<id>spark-3.2.1</id>
<id>spark-3.2</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
Expand All @@ -196,6 +197,17 @@
<spark.version.release>3.2</spark.version.release>
</properties>
</profile>
<profile>
<id>spark-3.3</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<spark.version.compile>3.3.0</spark.version.compile>
<spark.version.test>3.3.0</spark.version.test>
<spark.version.release>3.3</spark.version.release>
</properties>
</profile>
<profile>
<id>jenkins</id>
<modules>
Expand Down
Loading

0 comments on commit 83afee3

Please sign in to comment.