From ccb004f4bf6550b563f0b3eb742dba64e189a1b3 Mon Sep 17 00:00:00 2001
From: Geoff Genz <geoff@clickhouse.com>
Date: Wed, 6 Dec 2023 13:39:46 -0700
Subject: [PATCH] Release 1 6 2 (#219)

* Limited fix to completely broken `on_schema_change`

* Tweak changelog
---
 CHANGELOG.md                                  | 12 ++++
 dbt/adapters/clickhouse/__version__.py        |  2 +-
 dbt/adapters/clickhouse/errors.py             | 24 +++++++
 dbt/adapters/clickhouse/impl.py               | 40 ++++++++++-
 dbt/adapters/clickhouse/util.py               |  8 +++
 .../incremental/incremental.sql               | 71 ++++++++-----------
 .../adapter/basic/test_incremental.py         |  2 +-
 .../adapter/incremental/test_schema_change.py | 71 +++++++++++++++++++
 8 files changed, 185 insertions(+), 45 deletions(-)
 create mode 100644 dbt/adapters/clickhouse/errors.py
 create mode 100644 tests/integration/adapter/incremental/test_schema_change.py

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1db44176..ec8ddc32 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,15 @@
+### Release [1.6.2], 2023-12-06
+#### Bug Fix
+- The dbt `on_schema_change` configuration value for incremental models was effectively being ignored.  This has been fixed
+with a very limited implementation.  Closes https://github.com/ClickHouse/dbt-clickhouse/issues/199.  Because of the way that
+ORDER BY/SORT BY/PARTITION BY/PRIMARY KEYS work in ClickHouse, plus the complexities of correctly transforming ClickHouse data types,
+`sync_all_columns` is not currently supported (although an implementation that works for non-key columns is theoretically possible,
+such an enhancement is not currently planned).  Accordingly, only `ignore`, `fail`, and `append_new_columns` values are supported
+for `on_schema_change`.  It is also not currently supported for Distributed tables.
+
+Note that actually appending new columns requires a fallback to the `legacy` incremental strategy, which is quite inefficient,
+so while theoretically possible, using `append_new_columns` is not recommended except for very small data volumes.
+
 ### Release [1.6.1], 2023-12-04
 #### Bug Fixes
 - Identifier quoting was disabled for tables/databases etc.  This would cause failures for schemas or tables using reserved words
diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py
index 43239b87..5ccf9d1c 100644
--- a/dbt/adapters/clickhouse/__version__.py
+++ b/dbt/adapters/clickhouse/__version__.py
@@ -1 +1 @@
-version = '1.6.1'
+version = '1.6.2'
diff --git a/dbt/adapters/clickhouse/errors.py b/dbt/adapters/clickhouse/errors.py
new file mode 100644
index 00000000..1d3b5c69
--- /dev/null
+++ b/dbt/adapters/clickhouse/errors.py
@@ -0,0 +1,24 @@
+schema_change_fail_error = """
+The source and target schemas on this incremental model are out of sync.
+        They can be reconciled in several ways:
+          - set the `on_schema_change` config to `append_new_columns`.  (ClickHouse does not support `sync_all_columns`)
+          - Re-run the incremental model with `full_refresh: True` to update the target schema.
+          - update the schema manually and re-run the process.
+
+        Additional troubleshooting context:
+           Source columns not in target: {0}
+           Target columns not in source: {1}
+           New column types: {2}
+"""
+
+schema_change_datatype_error = """
+The source and target schemas on this incremental model contain different data types.  This is not supported.
+
+   Changed column types: {0}
+"""
+
+schema_change_missing_source_error = """
+The target schema in on this incremental model contains a column not in the source schema.  This is not supported.
+
+   Source columns not in target: {0}
+"""
diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py
index bd20fb03..ca0c3a44 100644
--- a/dbt/adapters/clickhouse/impl.py
+++ b/dbt/adapters/clickhouse/impl.py
@@ -20,10 +20,15 @@
 from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
 from dbt.adapters.clickhouse.column import ClickHouseColumn
 from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
+from dbt.adapters.clickhouse.errors import (
+    schema_change_datatype_error,
+    schema_change_fail_error,
+    schema_change_missing_source_error,
+)
 from dbt.adapters.clickhouse.logger import logger
 from dbt.adapters.clickhouse.query import quote_identifier
 from dbt.adapters.clickhouse.relation import ClickHouseRelation
-from dbt.adapters.clickhouse.util import compare_versions
+from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions
 
 GET_CATALOG_MACRO_NAME = 'get_catalog'
 LIST_SCHEMAS_MACRO_NAME = 'list_schemas'
@@ -151,6 +156,39 @@ def calculate_incremental_strategy(self, strategy: str) -> str:
             strategy = 'legacy'
         return strategy
 
+    @available.parse_none
+    def check_incremental_schema_changes(
+        self, on_schema_change, existing, target_sql
+    ) -> List[ClickHouseColumn]:
+        if on_schema_change not in ('fail', 'ignore', 'append_new_columns'):
+            raise DbtRuntimeError(
+                "Only `fail`, `ignore`, and `append_new_columns` supported for `on_schema_change`"
+            )
+        source = self.get_columns_in_relation(existing)
+        source_map = {column.name: column for column in source}
+        target = self.get_column_schema_from_query(target_sql)
+        target_map = {column.name: column for column in source}
+        source_not_in_target = [column for column in source if column.name not in target_map.keys()]
+        target_not_in_source = [column for column in target if column.name not in source_map.keys()]
+        new_column_data_types = []
+        for target_column in target:
+            source_column = source_map.get(target_column.name)
+            if source_column and source_column.dtype != target_column.dtype:
+                new_column_data_types.append(
+                    NewColumnDataType(source_column.name, target_column.dtype)
+                )
+        if new_column_data_types:
+            raise DbtRuntimeError(schema_change_datatype_error.format(new_column_data_types))
+        if source_not_in_target:
+            raise DbtRuntimeError(schema_change_missing_source_error.format(source_not_in_target))
+        if target_not_in_source and on_schema_change == 'fail':
+            raise DbtRuntimeError(
+                schema_change_fail_error.format(
+                    source_not_in_target, target_not_in_source, new_column_data_types
+                )
+            )
+        return target_not_in_source
+
     @available.parse_none
     def s3source_clause(
         self,
diff --git a/dbt/adapters/clickhouse/util.py b/dbt/adapters/clickhouse/util.py
index bfe7d239..7114dbde 100644
--- a/dbt/adapters/clickhouse/util.py
+++ b/dbt/adapters/clickhouse/util.py
@@ -1,3 +1,5 @@
+from dataclasses import dataclass
+
 from dbt.exceptions import DbtRuntimeError
 
 
@@ -11,3 +13,9 @@ def compare_versions(v1: str, v2: str) -> int:
         except ValueError:
             raise DbtRuntimeError("Version must consist of only numbers separated by '.'")
     return 0
+
+
+@dataclass
+class NewColumnDataType:
+    column_name: str
+    new_type: str
diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
index 17bdf8a0..b478e0c1 100644
--- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
+++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
@@ -65,24 +65,23 @@
     {% endcall %}
 
   {% else %}
-    {% if config.get('distributed') %}
-      {% do clickhouse__incremental_create_distributed(target_relation) %}
-    {% endif %}
-    {% set schema_changes = none %}
+    {% set column_changes = none %}
     {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy'))  %}
     {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
-    {% if on_schema_change != 'ignore' %}
-      {%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%}
-      {% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %}
-        {% set incremental_strategy = 'legacy' %}
-        {% do log('Schema changes detected, switching to legacy incremental strategy') %}
+    {%- if on_schema_change != 'ignore' %}
+      {%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%}
+      {%- if column_changes %}
+        {%- if incremental_strategy in ('append', 'delete_insert') %}
+          {% set incremental_strategy = 'legacy' %}
+          {{ log('Schema changes detected, switching to legacy incremental strategy') }}
+        {%- endif %}
       {% endif %}
     {% endif %}
     {% if incremental_strategy != 'delete_insert' and incremental_predicates %}
       {% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %}
     {% endif %}
     {% if incremental_strategy == 'legacy' %}
-      {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key) %}
+      {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key) %}
       {% set need_swap = true %}
     {% elif incremental_strategy == 'delete_insert' %}
       {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %}
@@ -127,32 +126,7 @@
 
 {%- endmaterialization %}
 
-
-{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}
-
-    {%- set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) -%}
-    {% if not schema_changes_dict['schema_changed'] %}
-      {{ return }}
-    {% endif %}
-
-    {% if on_schema_change == 'fail' %}
-      {% set fail_msg %}
-          The source and target schemas on this incremental model are out of sync!
-          They can be reconciled in several ways:
-            - set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation.
-            - Re-run the incremental model with `full_refresh: True` to update the target schema.
-            - update the schema manually and re-run the process.
-      {% endset %}
-      {% do exceptions.raise_compiler_error(fail_msg) %}
-      {{ return }}
-    {% endif %}
-
-    {% do sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}
-
-{% endmacro %}
-
-
-{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, on_schema_change, unique_key, is_distributed=False) %}
+{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key, is_distributed=False) %}
     {% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data'}) %}
     {{ drop_relation_if_exists(new_data_relation) }}
 
@@ -161,10 +135,17 @@
 
     -- First create a temporary table for all of the new data
     {% if is_distributed %}
+      {% if column_changes %}
+        {% do exceptions.raise_compiler_error('Schema changes not supported with Distributed tables ') %}
+      {% endif %}
       -- Need to use distributed table to have data on all shards
       {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}
       {%- set inserting_relation = distributed_new_data_relation -%}
       {{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }}
+    {% elif column_changes %}
+      {% call statement('create_new_data_temp') %}
+        {{ get_create_table_as_sql(False, new_data_relation, sql) }}
+      {% endcall %}
     {% else %}
       {% call statement('create_new_data_temp') %}
         {{ get_create_table_as_sql(False, new_data_relation, sql) }}
@@ -186,11 +167,11 @@
 
     -- Insert all the existing rows into the new temporary table, ignoring any rows that have keys in the "new data"
     -- table.
-    {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
-    {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
+    {%- set source_columns = adapter.get_columns_in_relation(existing_relation) -%}
+    {%- set source_columns_csv = source_columns | map(attribute='quoted') | join(', ') -%}
     {% call statement('insert_existing_data') %}
-        insert into {{ inserted_relation }} ({{ dest_cols_csv }})
-        select {{ dest_cols_csv }}
+        insert into {{ inserted_relation }} ({{ source_columns_csv }})
+        select {{ source_columns_csv }}
         from {{ existing_relation }}
           where ({{ unique_key }}) not in (
             select {{ unique_key }}
@@ -200,9 +181,15 @@
     {% endcall %}
 
     -- Insert all of the new data into the temporary table
+    {% if column_changes %}
+        {%- set dest_columns = adapter.get_columns_in_relation(new_data_relation) -%}
+        {%- set dest_columns_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
+    {% else %}
+        {%- set dest_columns_csv = source_columns_csv %}
+    {% endif %}
     {% call statement('insert_new_data') %}
-     insert into {{ inserted_relation }} ({{ dest_cols_csv }})
-        select {{ dest_cols_csv }}
+     insert into {{ inserted_relation }} ({{ dest_columns_csv }})
+        select {{ dest_columns_csv }}
         from {{ inserting_relation }}
       {{ adapter.get_model_query_settings(model) }}
     {% endcall %}
diff --git a/tests/integration/adapter/basic/test_incremental.py b/tests/integration/adapter/basic/test_incremental.py
index 3cc4cce9..c50d477a 100644
--- a/tests/integration/adapter/basic/test_incremental.py
+++ b/tests/integration/adapter/basic/test_incremental.py
@@ -7,7 +7,7 @@ class TestIncremental(BaseIncremental):
 
 
 incremental_not_schema_change_sql = """
-{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }}
+{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="append_new_columns") }}
 select
     toString(1) || '-' || toString(now64()) as user_id_current_time,
     {% if is_incremental() %}
diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py
new file mode 100644
index 00000000..9bccaf4e
--- /dev/null
+++ b/tests/integration/adapter/incremental/test_schema_change.py
@@ -0,0 +1,71 @@
+import pytest
+from dbt.tests.util import run_dbt, run_dbt_and_capture
+
+schema_change_sql = """
+{{
+    config(
+        materialized='incremental',
+        unique_key='col_1',
+        on_schema_change='%schema_change%'
+    )
+}}
+
+{% if not is_incremental() %}
+select
+    number as col_1,
+    number + 1 as col_2
+from numbers(3)
+{% else %}
+select
+    number as col_1,
+    number + 1 as col_2,
+    number + 2 as col_3
+from numbers(2, 3)
+{% endif %}
+"""
+
+
+class TestOnSchemaChange:
+    @pytest.fixture(scope="class")
+    def models(self):
+        return {
+            "schema_change_ignore.sql": schema_change_sql.replace("%schema_change%", "ignore"),
+            "schema_change_fail.sql": schema_change_sql.replace("%schema_change%", "fail"),
+            "schema_change_append.sql": schema_change_sql.replace(
+                "%schema_change%", "append_new_columns"
+            ),
+        }
+
+    def test_ignore(self, project):
+        run_dbt(["run", "--select", "schema_change_ignore"])
+        result = project.run_sql("select * from schema_change_ignore order by col_1", fetch="all")
+        assert len(result) == 3
+        assert result[0][1] == 1
+        run_dbt(["run", "--select", "schema_change_ignore"])
+        result = project.run_sql("select * from schema_change_ignore", fetch="all")
+        assert len(result) == 5
+
+    def test_fail(self, project):
+        run_dbt(["run", "--select", "schema_change_fail"])
+        result = project.run_sql("select * from schema_change_fail order by col_1", fetch="all")
+        assert len(result) == 3
+        assert result[0][1] == 1
+        _, log_output = run_dbt_and_capture(
+            [
+                "run",
+                "--select",
+                "schema_change_fail",
+            ],
+            expect_pass=False,
+        )
+        assert 'out of sync' in log_output.lower()
+
+    def test_append(self, project):
+        run_dbt(["run", "--select", "schema_change_append"])
+        result = project.run_sql("select * from schema_change_append order by col_1", fetch="all")
+        assert len(result) == 3
+        assert result[0][1] == 1
+        run_dbt(["--debug", "run", "--select", "schema_change_append"])
+        result = project.run_sql("select * from schema_change_append order by col_1", fetch="all")
+        assert result[0][2] == 0
+        assert result[3][2] == 5