From 406335cffd45737dbbac1c52f47663d84b82506d Mon Sep 17 00:00:00 2001 From: Alexandr Beitsiuk Date: Tue, 13 Aug 2024 15:53:41 +0200 Subject: [PATCH 1/2] Fixed ReplacingMergeTree EngineSpec parsing: is_deleted column presence caused error --- .../com/clickhouse/spark/parse/AstVisitor.scala | 2 +- .../clickhouse/spark/parse/SQLParserSuite.scala | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala index f4f4f8fc..d4a71496 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala @@ -102,7 +102,7 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging { case eg: String if "ReplacingMergeTree" equalsIgnoreCase eg => ReplacingMergeTreeEngineSpec( engine_clause = engineExpr, - version_column = seqToOption(engineArgs).map(_.asInstanceOf[FieldRef]), + version_column = engineArgs.lift(0).map(_.asInstanceOf[FieldRef]), _sorting_key = tupleIfNeeded(orderByOpt.toList), _primary_key = tupleIfNeeded(pkOpt.toList), _partition_key = tupleIfNeeded(partOpt.toList), diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala index 181e7f88..8ade7055 100644 --- a/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala +++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala @@ -83,6 +83,20 @@ class SQLParserSuite extends AnyFunSuite { assert(actual === expected) } + test("parse ReplacingMergeTree - 3") { + val ddl = "ReplacingMergeTree(ts, is_deleted) " + + "PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192" + val actual = parser.parseEngineClause(ddl) + val expected = ReplacingMergeTreeEngineSpec( + engine_clause = "ReplacingMergeTree(ts, is_deleted)", + version_column = Some(FieldRef("ts")), + _sorting_key = TupleExpr(FieldRef("id") :: Nil), + _partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))), + _settings = Map("index_granularity" -> "8192") + ) + assert(actual === expected) + } + test("parse ReplicatedReplacingMergeTree - 1") { val ddl = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}') " + "PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192" From c820b7d457920cbdf3bb983ea81596268bc04504 Mon Sep 17 00:00:00 2001 From: Alexandr Beitsiuk Date: Wed, 14 Aug 2024 09:31:29 +0200 Subject: [PATCH 2/2] Preserved is_deleted column in engine spec, added the same fix for ReplicatedReplacingMergeTree --- .../clickhouse/spark/parse/AstVisitor.scala | 4 +++- .../spark/spec/TableEngineSpec.scala | 2 ++ .../spark/parse/SQLParserSuite.scala | 20 +++++++++++++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala index d4a71496..dc1a7331 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala @@ -103,6 +103,7 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging { ReplacingMergeTreeEngineSpec( engine_clause = engineExpr, version_column = engineArgs.lift(0).map(_.asInstanceOf[FieldRef]), + is_deleted_column = engineArgs.lift(1).map(_.asInstanceOf[FieldRef]), _sorting_key = tupleIfNeeded(orderByOpt.toList), _primary_key = tupleIfNeeded(pkOpt.toList), _partition_key = tupleIfNeeded(partOpt.toList), @@ -127,7 +128,8 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging { engine_clause = engineExpr, zk_path = engineArgs.head.asInstanceOf[StringLiteral].value, replica_name = engineArgs(1).asInstanceOf[StringLiteral].value, - version_column = seqToOption(engineArgs.drop(2)).map(_.asInstanceOf[FieldRef]), + version_column = engineArgs.lift(2).map(_.asInstanceOf[FieldRef]), + is_deleted_column = engineArgs.lift(3).map(_.asInstanceOf[FieldRef]), _sorting_key = tupleIfNeeded(orderByOpt.toList), _primary_key = tupleIfNeeded(pkOpt.toList), _partition_key = tupleIfNeeded(partOpt.toList), diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala index 13534d76..48f56c3b 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala @@ -88,6 +88,7 @@ case class ReplicatedMergeTreeEngineSpec( case class ReplacingMergeTreeEngineSpec( engine_clause: String, version_column: Option[FieldRef] = None, + is_deleted_column: Option[FieldRef] = None, var _sorting_key: TupleExpr = TupleExpr(List.empty), var _primary_key: TupleExpr = TupleExpr(List.empty), var _partition_key: TupleExpr = TupleExpr(List.empty), @@ -109,6 +110,7 @@ case class ReplicatedReplacingMergeTreeEngineSpec( zk_path: String, replica_name: String, version_column: Option[FieldRef] = None, + is_deleted_column: Option[FieldRef] = None, var _sorting_key: TupleExpr = TupleExpr(List.empty), var _primary_key: TupleExpr = TupleExpr(List.empty), var _partition_key: TupleExpr = TupleExpr(List.empty), diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala index 8ade7055..bbcdf245 100644 --- a/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala +++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala @@ -90,6 +90,7 @@ class SQLParserSuite extends AnyFunSuite { val expected = ReplacingMergeTreeEngineSpec( engine_clause = "ReplacingMergeTree(ts, is_deleted)", version_column = Some(FieldRef("ts")), + is_deleted_column = Some(FieldRef("is_deleted")), _sorting_key = TupleExpr(FieldRef("id") :: Nil), _partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))), _settings = Map("index_granularity" -> "8192") @@ -129,6 +130,25 @@ class SQLParserSuite extends AnyFunSuite { assert(actual === expected) } + test("parse ReplicatedReplacingMergeTree - 3") { + val ddl = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}', " + + "ts, is_deleted) PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192" + val actual = parser.parseEngineClause(ddl) + val expected = ReplicatedReplacingMergeTreeEngineSpec( + engine_clause = + "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}', " + + "ts, is_deleted)", + zk_path = "/clickhouse/tables/{shard}/wj_report/wj_respondent", + replica_name = "{replica}", + version_column = Some(FieldRef("ts")), + is_deleted_column = Some(FieldRef("is_deleted")), + _sorting_key = TupleExpr(FieldRef("id") :: Nil), + _partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))), + _settings = Map("index_granularity" -> "8192") + ) + assert(actual === expected) + } + test("parse Distributed - 1") { val ddl = "Distributed('default', 'wj_report', 'wj_respondent_local')" val actual = parser.parseEngineClause(ddl)