Skip to content

Commit

Permalink
Spark 3.4: Restore files under spark-3.4
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Dec 7, 2023
1 parent a7859ae commit d646647
Show file tree
Hide file tree
Showing 58 changed files with 6,005 additions and 0 deletions.
93 changes: 93 additions & 0 deletions spark-3.4/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

project.ext {
spark_version = "3.4.2"
spark_binary_version = "3.4"
}

project(":clickhouse-spark-${spark_binary_version}_$scala_binary_version") {
dependencies {
api project(":clickhouse-core")

compileOnly "org.apache.spark:spark-sql_$scala_binary_version:$spark_version"

testImplementation "org.apache.spark:spark-sql_$scala_binary_version:$spark_version"
testImplementation "org.scalatest:scalatest_$scala_binary_version:$scalatest_version"
testRuntimeOnly "com.vladsch.flexmark:flexmark-all:$flexmark_version"
}
}

project(":clickhouse-spark-runtime-${spark_binary_version}_$scala_binary_version") {
apply plugin: "com.github.johnrengelman.shadow"

tasks.jar.dependsOn tasks.shadowJar

dependencies {
compileOnly "org.scala-lang:scala-library:$scala_version"

implementation(project(":clickhouse-spark-${spark_binary_version}_$scala_binary_version")) {
exclude group: "org.antlr", module: "antlr4-runtime"
exclude group: "org.scala-lang", module: "scala-library"
exclude group: "org.slf4j", module: "slf4j-api"
exclude group: "org.apache.commons", module: "commons-lang3"
exclude group: "com.clickhouse", module: "clickhouse-jdbc"
exclude group: "com.fasterxml.jackson.core"
exclude group: "com.fasterxml.jackson.datatype"
exclude group: "com.fasterxml.jackson.module"
}
}

shadowJar {
zip64=true
archiveClassifier=null

mergeServiceFiles()
}

jar {
archiveClassifier="empty"
}
}

project(":clickhouse-spark-it-${spark_binary_version}_$scala_binary_version") {
dependencies {
implementation "org.scala-lang:scala-library:$scala_version" // for scala plugin detect scala binary version

testImplementation project(path: ":clickhouse-spark-runtime-${spark_binary_version}_$scala_binary_version", configuration: "shadow")
testImplementation(testFixtures(project(":clickhouse-core"))) {
exclude module: "clickhouse-core"
}

testImplementation "org.apache.spark:spark-sql_$scala_binary_version:$spark_version"

testImplementation "org.apache.spark:spark-core_$scala_binary_version:$spark_version:tests"
testImplementation "org.apache.spark:spark-catalyst_$scala_binary_version:$spark_version:tests"
testImplementation "org.apache.spark:spark-sql_$scala_binary_version:$spark_version:tests"

testImplementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson_version"

testImplementation("com.clickhouse:clickhouse-jdbc:$clickhouse_jdbc_version:all") { transitive = false }

testImplementation "org.apache.kyuubi:kyuubi-spark-connector-tpcds_${scala_binary_version}:$kyuubi_version"
}

test {
classpath += files("${project(':clickhouse-core').projectDir}/src/testFixtures/conf")
}

slowTest {
classpath += files("${project(':clickhouse-core').projectDir}/src/testFixtures/conf")
}
}
35 changes: 35 additions & 0 deletions spark-3.4/clickhouse-spark-it/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<Configuration status="INFO">
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %p %c: %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="stdout"/>
</Root>
<Logger name="org.apache.hadoop.util.Shell" level="ERROR" additivity="false">
<AppenderRef ref="stdout"/>
</Logger>
<Logger name="org.apache.hadoop.util.NativeCodeLoader" level="ERROR" additivity="false">
<AppenderRef ref="stdout"/>
</Logger>
<Logger name="xenon.clickhouse" level="DEBUG" additivity="false">
<AppenderRef ref="stdout"/>
</Logger>
</Loggers>
</Configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.clickhouse

import org.apache.spark.SparkConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{DataFrame, QueryTest}
import xenon.clickhouse.ClickHouseCommandRunner

import java.sql.{Date, Timestamp}
import java.time.Instant

trait SparkTest extends QueryTest with SharedSparkSession {

def cmdRunnerOptions: Map[String, String]

/**
* @param text format yyyy-[m]m-[d]d
* @return A SQL Date
*/
def date(text: String): Date = Date.valueOf(text)

/**
* @param text format 2007-12-03T10:15:30.00Z
* @return A SQL Timestamp
*/
def timestamp(text: String): Timestamp = Timestamp.from(Instant.parse(text))

override protected def sparkConf: SparkConf = super.sparkConf
.setMaster("local[2]")
.setAppName("spark-ut")
.set("spark.ui.enabled", "false")
.set("spark.driver.host", "localhost")
.set("spark.driver.memory", "500M")
.set("spark.sql.catalogImplementation", "in-memory")
.set("spark.sql.codegen.wholeStage", "false")
.set("spark.sql.shuffle.partitions", "2")

def runClickHouseSQL(sql: String, options: Map[String, String] = cmdRunnerOptions): DataFrame =
spark.executeCommand(classOf[ClickHouseCommandRunner].getName, sql, options)

def autoCleanupTable(
database: String,
table: String,
cleanup: Boolean = true
)(block: (String, String) => Unit): Unit =
try {
spark.sql(s"CREATE DATABASE IF NOT EXISTS `$database`")
block(database, table)
} finally if (cleanup) {
spark.sql(s"DROP TABLE IF EXISTS `$database`.`$table`")
spark.sql(s"DROP DATABASE IF EXISTS `$database` CASCADE")
}

def withClickHouseSingleIdTable(
database: String,
table: String,
cleanup: Boolean = true
)(block: (String, String) => Unit): Unit = autoCleanupTable(database, table, cleanup) { (database, table) =>
spark.sql(
s"""CREATE TABLE IF NOT EXISTS `$database`.`$table` (
| id Long NOT NULL
|) USING ClickHouse
|TBLPROPERTIES (
| engine = 'MergeTree()',
| order_by = 'id',
| settings.index_granularity = 8192
|)
|""".stripMargin
)
block(database, table)
}

// for debugging webui
protected def infiniteLoop(): Unit = while (true) {
Thread.sleep(1000)
spark.catalog.listTables()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.clickhouse

object TPCDSTestUtils {
val tablePrimaryKeys: Map[String, Seq[String]] = Map(
"call_center" -> Array("cc_call_center_sk"),
"catalog_page" -> Array("cp_catalog_page_sk"),
"catalog_returns" -> Array("cr_item_sk", "cr_order_number"),
"catalog_sales" -> Array("cs_item_sk", "cs_order_number"),
"customer" -> Array("c_customer_sk"),
"customer_address" -> Array("ca_address_sk"),
"customer_demographics" -> Array("cd_demo_sk"),
"date_dim" -> Array("d_date_sk"),
"household_demographics" -> Array("hd_demo_sk"),
"income_band" -> Array("ib_income_band_sk"),
"inventory" -> Array("inv_date_sk", "inv_item_sk", "inv_warehouse_sk"),
"item" -> Array("i_item_sk"),
"promotion" -> Array("p_promo_sk"),
"reason" -> Array("r_reason_sk"),
"ship_mode" -> Array("sm_ship_mode_sk"),
"store" -> Array("s_store_sk"),
"store_returns" -> Array("sr_item_sk", "sr_ticket_number"),
"store_sales" -> Array("ss_item_sk", "ss_ticket_number"),
"time_dim" -> Array("t_time_sk"),
"warehouse" -> Array("w_warehouse_sk"),
"web_page" -> Array("wp_web_page_sk"),
"web_returns" -> Array("wr_item_sk", "wr_order_number"),
"web_sales" -> Array("ws_item_sk", "ws_order_number"),
"web_site" -> Array("web_site_sk")
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.clickhouse

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.ClassTagExtensions

object TestUtils {

@transient lazy val om: ObjectMapper with ClassTagExtensions = {
val _om = new ObjectMapper() with ClassTagExtensions
_om.findAndRegisterModules()
_om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
_om
}

def toJson(value: Any): String = om.writeValueAsString(value)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.clickhouse.cluster

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

abstract class BaseClusterWriteSuite extends SparkClickHouseClusterTest {

test("clickhouse write cluster") {
withSimpleDistTable("single_replica", "db_w", "t_dist", true) { (_, db, tbl_dist, tbl_local) =>
val tblSchema = spark.table(s"$db.$tbl_dist").schema
assert(tblSchema == StructType(
StructField("create_time", DataTypes.TimestampType, nullable = false) ::
StructField("y", DataTypes.IntegerType, nullable = false) ::
StructField("m", DataTypes.IntegerType, nullable = false) ::
StructField("id", DataTypes.LongType, nullable = false) ::
StructField("value", DataTypes.StringType, nullable = true) :: Nil
))

checkAnswer(
spark
.table(s"$db.$tbl_dist")
.select("create_time", "y", "m", "id", "value"),
Seq(
Row(timestamp("2021-01-01T10:10:10Z"), 2021, 1, 1L, "1"),
Row(timestamp("2022-02-02T10:10:10Z"), 2022, 2, 2L, "2"),
Row(timestamp("2023-03-03T10:10:10Z"), 2023, 3, 3L, "3"),
Row(timestamp("2024-04-04T10:10:10Z"), 2024, 4, 4L, "4")
)
)

checkAnswer(
spark.table(s"clickhouse_s1r1.$db.$tbl_local"),
Row(timestamp("2024-04-04T10:10:10Z"), 2024, 4, 4L, "4") :: Nil
)
checkAnswer(
spark.table(s"clickhouse_s1r2.$db.$tbl_local"),
Row(timestamp("2021-01-01T10:10:10Z"), 2021, 1, 1L, "1") :: Nil
)
checkAnswer(
spark.table(s"clickhouse_s2r1.$db.$tbl_local"),
Row(timestamp("2022-02-02T10:10:10Z"), 2022, 2, 2L, "2") :: Nil
)
checkAnswer(
spark.table(s"clickhouse_s2r2.$db.$tbl_local"),
Row(timestamp("2023-03-03T10:10:10Z"), 2023, 3, 3L, "3") :: Nil
)
}
}
}

class ClusterNodesWriteSuite extends BaseClusterWriteSuite {

override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.clickhouse.write.write.repartitionNum", "0")
.set("spark.clickhouse.write.distributed.useClusterNodes", "true")
.set("spark.clickhouse.write.distributed.convertLocal", "false")
}

class ConvertDistToLocalWriteSuite extends BaseClusterWriteSuite {

override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.clickhouse.write.write.repartitionNum", "0")
.set("spark.clickhouse.write.distributed.useClusterNodes", "true")
.set("spark.clickhouse.write.distributed.convertLocal", "true")
}
Loading

0 comments on commit d646647

Please sign in to comment.