diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index b84fb758..12eb338e 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -58,9 +58,12 @@ jobs: strategy: fail-fast: false matrix: - clickhouse: [ 22.3, 22.8, 23.3, 23.8, 24.3, 24.6 ] + clickhouse: [ 22.3, 22.8, 23.3, 23.8, 24.3, 24.6, 24.4, 24.5, cloud ] env: CLICKHOUSE_IMAGE: clickhouse/clickhouse-server:${{ matrix.clickhouse }} + CLICKHOUSE_VERSION: ${{ matrix.clickhouse }} + CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} + CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} steps: - uses: actions/checkout@v4 - uses: actions/setup-java@v4 diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/client/NodeClient.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/client/NodeClient.scala index 6f27670f..bcfaf2b2 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/client/NodeClient.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/client/NodeClient.scala @@ -39,7 +39,6 @@ object NodeClient { } class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging { - private val node: ClickHouseNode = ClickHouseNode.builder() .options(nodeSpec.options) .host(nodeSpec.host) diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala index f4f4f8fc..0c5d6292 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala @@ -122,6 +122,18 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging { _ttl = ttlOpt, _settings = settings ) + case eg: String if "SharedMergeTree" equalsIgnoreCase eg => + SharedMergeTreeEngineSpec( + engine_clause = engineExpr, + zk_path = engineArgs.head.asInstanceOf[StringLiteral].value, + replica_name = engineArgs(1).asInstanceOf[StringLiteral].value, + _sorting_key = tupleIfNeeded(orderByOpt.toList), + _primary_key = tupleIfNeeded(pkOpt.toList), + _partition_key = tupleIfNeeded(partOpt.toList), + _sampling_key = tupleIfNeeded(sampleByOpt.toList), + _ttl = ttlOpt, + _settings = settings + ) case eg: String if "ReplicatedReplacingMergeTree" equalsIgnoreCase eg => ReplicatedReplacingMergeTreeEngineSpec( engine_clause = engineExpr, diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala index 13534d76..8a83c1cd 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala @@ -137,3 +137,22 @@ case class DistributedEngineSpec( override def settings: Map[String, String] = _settings override def is_distributed: Boolean = true } +case class SharedMergeTreeEngineSpec( + engine_clause: String, + zk_path: String, + replica_name: String, + var _sorting_key: TupleExpr = TupleExpr(List.empty), + var _primary_key: TupleExpr = TupleExpr(List.empty), + var _partition_key: TupleExpr = TupleExpr(List.empty), + var _sampling_key: TupleExpr = TupleExpr(List.empty), + var _ttl: Option[String] = None, + var _settings: Map[String, String] = Map.empty +) extends MergeTreeFamilyEngineSpec with ReplicatedEngineSpec { + def engine: String = "SharedMergeTreeEngineSpec" + override def sorting_key: TupleExpr = _sorting_key + override def primary_key: TupleExpr = _primary_key + override def partition_key: TupleExpr = _partition_key + override def sampling_key: TupleExpr = _sampling_key + override def ttl: Option[String] = _ttl + override def settings: Map[String, String] = _settings +} diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala index 181e7f88..f1dc26d9 100644 --- a/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala +++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala @@ -205,4 +205,20 @@ class SQLParserSuite extends AnyFunSuite { ) assert(actual === expected) } + + test("parse SharedMergeTree - 1") { + val ddl = "SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') " + + "PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192" + val actual = parser.parseEngineClause(ddl) + val expected = SharedMergeTreeEngineSpec( + engine_clause = "SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')", + zk_path = "/clickhouse/tables/{uuid}/{shard}", + replica_name = "{replica}", + _sorting_key = TupleExpr(FieldRef("id") :: Nil), + _partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))), + _settings = Map("index_granularity" -> "8192") + ) + assert(actual === expected) + } + } diff --git a/clickhouse-core/src/testFixtures/scala/com/clickhouse/spark/base/ClickHouseClusterMixIn.scala b/clickhouse-core/src/testFixtures/scala/com/clickhouse/spark/base/ClickHouseClusterMixIn.scala index 22641a67..1ca32b13 100644 --- a/clickhouse-core/src/testFixtures/scala/com/clickhouse/spark/base/ClickHouseClusterMixIn.scala +++ b/clickhouse-core/src/testFixtures/scala/com/clickhouse/spark/base/ClickHouseClusterMixIn.scala @@ -24,15 +24,16 @@ import java.io.File trait ClickHouseClusterMixIn extends AnyFunSuite with ForAllTestContainer { + val CLICKHOUSE_VERSION: String = Utils.load("CLICKHOUSE_VERSION", "23.8") + val isCloud: Boolean = if (CLICKHOUSE_VERSION.equalsIgnoreCase("cloud")) true else false + val isOnPrem: Boolean = !isCloud + val CLICKHOUSE_IMAGE: String = if (isCloud) "clickhouse/clickhouse-server:23.8" + else Utils.load("CLICKHOUSE_IMAGE", "clickhouse/clickhouse-server:23.8") + protected val ZOOKEEPER_CLIENT_PORT = 2181 protected val CLICKHOUSE_HTTP_PORT = 8123 protected val CLICKHOUSE_TCP_PORT = 9000 - protected val CLICKHOUSE_IMAGE: String = Utils.load( - "CLICKHOUSE_IMAGE", - "clickhouse/clickhouse-server:23.8" - ) - protected val clickhouseVersion: ClickHouseVersion = ClickHouseVersion.of(CLICKHOUSE_IMAGE.split(":").last) test("clickhouse cluster up") { diff --git a/clickhouse-core/src/testFixtures/scala/com/clickhouse/spark/base/ClickHouseSingleMixIn.scala b/clickhouse-core/src/testFixtures/scala/com/clickhouse/spark/base/ClickHouseSingleMixIn.scala index d8325895..01c1a3aa 100644 --- a/clickhouse-core/src/testFixtures/scala/com/clickhouse/spark/base/ClickHouseSingleMixIn.scala +++ b/clickhouse-core/src/testFixtures/scala/com/clickhouse/spark/base/ClickHouseSingleMixIn.scala @@ -29,13 +29,17 @@ import java.nio.file.{Path, Paths} trait ClickHouseSingleMixIn extends AnyFunSuite with ForAllTestContainer { // format: off - val CLICKHOUSE_IMAGE: String = Utils.load("CLICKHOUSE_IMAGE", "clickhouse/clickhouse-server:23.8") + val CLICKHOUSE_VERSION: String = Utils.load("CLICKHOUSE_VERSION", "23.8") + val isCloud: Boolean = if (CLICKHOUSE_VERSION.equalsIgnoreCase("cloud")) true else false + val CLICKHOUSE_IMAGE: String = if (isCloud) "clickhouse/clickhouse-server:23.8" else Utils.load("CLICKHOUSE_IMAGE", "clickhouse/clickhouse-server:23.8") val CLICKHOUSE_USER: String = Utils.load("CLICKHOUSE_USER", "default") val CLICKHOUSE_PASSWORD: String = Utils.load("CLICKHOUSE_PASSWORD", "") val CLICKHOUSE_DB: String = Utils.load("CLICKHOUSE_DB", "") private val CLICKHOUSE_HTTP_PORT = 8123 private val CLICKHOUSE_TPC_PORT = 9000 + private val CLICKHOUSE_CLOUD_HTTP_PORT = 8443 + private val CLICKHOUSE_CLOUD_TCP_PORT = 9000 // format: on protected val clickhouseVersion: ClickHouseVersion = ClickHouseVersion.of(CLICKHOUSE_IMAGE.split(":").last) @@ -73,14 +77,20 @@ trait ClickHouseSingleMixIn extends AnyFunSuite with ForAllTestContainer { .asInstanceOf[ClickHouseContainer] } // format: off - def clickhouseHost: String = container.host - def clickhouseHttpPort: Int = container.mappedPort(CLICKHOUSE_HTTP_PORT) - def clickhouseTcpPort: Int = container.mappedPort(CLICKHOUSE_TPC_PORT) - // format: on + def clickhouseHost: String = if (isCloud) sys.env.get("CLICKHOUSE_CLOUD_HOST").get else container.host + def clickhouseHttpPort: Int = if (isCloud) CLICKHOUSE_CLOUD_HTTP_PORT else container.mappedPort(CLICKHOUSE_HTTP_PORT) + def clickhouseTcpPort: Int = if (isCloud) CLICKHOUSE_CLOUD_TCP_PORT else container.mappedPort(CLICKHOUSE_TPC_PORT) + def clickhousePassword: String = if (isCloud) sys.env.get("CLICKHOUSE_CLOUD_PASSWORD").get else CLICKHOUSE_PASSWORD + // format: on def withNodeClient(protocol: ClickHouseProtocol = HTTP)(block: NodeClient => Unit): Unit = Utils.tryWithResource { - NodeClient(NodeSpec(clickhouseHost, Some(clickhouseHttpPort), Some(clickhouseTcpPort), protocol)) + NodeClient(NodeSpec( + container.host, + Some(container.mappedPort(CLICKHOUSE_HTTP_PORT)), + Some(container.mappedPort(CLICKHOUSE_TPC_PORT)), + protocol + )) } { client => block(client) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala index 51c2ecb8..84aafb37 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala @@ -119,8 +119,7 @@ class ClickHouseCatalog extends TableCatalog case None => throw new NoSuchTableException(ident) case Some((db, tbl)) => nodeClient.syncQueryOutputJSONEachRow(s"SELECT * FROM `$db`.`$tbl` WHERE 1=0") match { - case Left(exception) if exception.code == UNKNOWN_TABLE.code => - throw new NoSuchTableException(ident) + case Left(exception) if exception.code == UNKNOWN_TABLE.code => throw new NoSuchTableException(ident) // not sure if this check is necessary case Left(exception) if exception.code == UNKNOWN_DATABASE.code => throw new NoSuchTableException(s"Database $db does not exist") diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala index c8de2044..68feb956 100644 --- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala +++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.functions.{month, year} trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn { import testImplicits._ - override protected def sparkConf: SparkConf = super.sparkConf .setMaster("local[4]") .setAppName("spark-clickhouse-cluster-ut") diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseTableDDLSuite.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseTableDDLSuite.scala index 7409a590..befdb11e 100644 --- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseTableDDLSuite.scala +++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseTableDDLSuite.scala @@ -22,7 +22,7 @@ class ClickHouseTableDDLSuite extends SparkClickHouseSingleTest { test("clickhouse command runner") { withTable("default.abc") { - runClickHouseSQL("CREATE TABLE default.abc(a UInt8) ENGINE=Log()") + runClickHouseSQL("CREATE TABLE default.abc(a UInt8) ENGINE=Memory()") checkAnswer( spark.sql("""DESC default.abc""").select($"col_name", $"data_type").limit(1), Row("a", "smallint") :: Nil diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala index ca80c49f..49fbfe62 100644 --- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala +++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala @@ -35,7 +35,7 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn { .set("spark.sql.catalog.clickhouse.http_port", clickhouseHttpPort.toString) .set("spark.sql.catalog.clickhouse.protocol", "http") .set("spark.sql.catalog.clickhouse.user", CLICKHOUSE_USER) - .set("spark.sql.catalog.clickhouse.password", CLICKHOUSE_PASSWORD) + .set("spark.sql.catalog.clickhouse.password", clickhousePassword) .set("spark.sql.catalog.clickhouse.database", CLICKHOUSE_DB) .set("spark.sql.catalog.clickhouse.option.custom_http_params", "async_insert=1,wait_for_async_insert=1") // extended configurations @@ -46,14 +46,16 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn { .set("spark.clickhouse.write.write.repartitionNum", "0") .set("spark.clickhouse.read.format", "json") .set("spark.clickhouse.write.format", "json") + .set("spark.sql.catalog.clickhouse.option.ssl", isCloud.toString) override def cmdRunnerOptions: Map[String, String] = Map( "host" -> clickhouseHost, "http_port" -> clickhouseHttpPort.toString, "protocol" -> "http", "user" -> CLICKHOUSE_USER, - "password" -> CLICKHOUSE_PASSWORD, - "database" -> CLICKHOUSE_DB + "password" -> clickhousePassword, + "database" -> CLICKHOUSE_DB, + "option.ssl" -> isCloud.toString ) def withTable( @@ -66,7 +68,6 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn { )(f: => Unit): Unit = try { runClickHouseSQL(s"CREATE DATABASE IF NOT EXISTS $db") - spark.sql( s"""CREATE TABLE $db.$tbl ( | ${schema.fields.map(_.toDDL).mkString(",\n ")} @@ -78,7 +79,6 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn { |) |""".stripMargin ) - f } finally { runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl")