diff --git a/dbt/adapters/clickhouse/column.py b/dbt/adapters/clickhouse/column.py index 835b0f0d..c90a22ec 100644 --- a/dbt/adapters/clickhouse/column.py +++ b/dbt/adapters/clickhouse/column.py @@ -1,6 +1,6 @@ import re -from dataclasses import dataclass -from typing import Any, TypeVar +from dataclasses import dataclass, field +from typing import Any, List, Literal, TypeVar from dbt.adapters.base.column import Column from dbt_common.exceptions import DbtRuntimeError @@ -134,3 +134,32 @@ def _inner_dtype(self, dtype) -> str: inner_dtype = null_match.group(1) return inner_dtype + + +@dataclass(frozen=True) +class ClickHouseColumnChanges: + on_schema_change: Literal['ignore', 'fail', 'append_new_columns', 'sync_all_columns'] + columns_to_add: List[Column] = field(default_factory=list) + columns_to_drop: List[Column] = field(default_factory=list) + columns_to_modify: List[Column] = field(default_factory=list) + + def __bool__(self) -> bool: + return bool(self.columns_to_add or self.columns_to_drop or self.columns_to_modify) + + @property + def has_schema_changes(self) -> bool: + return bool(self) + + @property + def has_sync_changes(self) -> bool: + return bool(self.columns_to_drop or self.columns_to_modify) + + @property + def has_conflicting_changes(self) -> bool: + if self.on_schema_change == 'fail' and self.has_schema_changes: + return True + + if self.on_schema_change != 'sync_all_columns' and self.has_sync_changes: + return True + + return False diff --git a/dbt/adapters/clickhouse/errors.py b/dbt/adapters/clickhouse/errors.py index bfcd5f95..29a572f3 100644 --- a/dbt/adapters/clickhouse/errors.py +++ b/dbt/adapters/clickhouse/errors.py @@ -1,14 +1,14 @@ schema_change_fail_error = """ The source and target schemas on this incremental model are out of sync. - They can be reconciled in several ways: - - set the `on_schema_change` config to `append_new_columns`. (ClickHouse does not support `sync_all_columns`) - - Re-run the incremental model with `full_refresh: True` to update the target schema. - - update the schema manually and re-run the process. - - Additional troubleshooting context: - Source columns not in target: {0} - Target columns not in source: {1} - New column types: {2} +They can be reconciled in several ways: + - set the `on_schema_change` config to `append_new_columns` or `sync_all_columns`. + - Re-run the incremental model with `full_refresh: True` to update the target schema. + - update the schema manually and re-run the process. + +Additional troubleshooting context: + Source columns not in target: {0} + Target columns not in source: {1} + New column types: {2} """ schema_change_datatype_error = """ diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index bb5414b4..bd65ad57 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -29,7 +29,7 @@ from dbt_common.utils import filter_null_values from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache -from dbt.adapters.clickhouse.column import ClickHouseColumn +from dbt.adapters.clickhouse.column import ClickHouseColumn, ClickHouseColumnChanges from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager from dbt.adapters.clickhouse.errors import ( schema_change_datatype_error, @@ -39,7 +39,7 @@ from dbt.adapters.clickhouse.logger import logger from dbt.adapters.clickhouse.query import quote_identifier from dbt.adapters.clickhouse.relation import ClickHouseRelation, ClickHouseRelationType -from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions +from dbt.adapters.clickhouse.util import compare_versions if TYPE_CHECKING: import agate @@ -193,35 +193,41 @@ def calculate_incremental_strategy(self, strategy: str) -> str: @available.parse_none def check_incremental_schema_changes( self, on_schema_change, existing, target_sql - ) -> List[ClickHouseColumn]: - if on_schema_change not in ('fail', 'ignore', 'append_new_columns'): + ) -> ClickHouseColumnChanges: + if on_schema_change not in ('fail', 'ignore', 'append_new_columns', 'sync_all_columns'): raise DbtRuntimeError( - "Only `fail`, `ignore`, and `append_new_columns` supported for `on_schema_change`" + "Only `fail`, `ignore`, `append_new_columns`, and `sync_all_columns` supported for `on_schema_change`." ) + source = self.get_columns_in_relation(existing) source_map = {column.name: column for column in source} target = self.get_column_schema_from_query(target_sql) - target_map = {column.name: column for column in source} + target_map = {column.name: column for column in target} + source_not_in_target = [column for column in source if column.name not in target_map.keys()] target_not_in_source = [column for column in target if column.name not in source_map.keys()] - new_column_data_types = [] - for target_column in target: - source_column = source_map.get(target_column.name) - if source_column and source_column.dtype != target_column.dtype: - new_column_data_types.append( - NewColumnDataType(source_column.name, target_column.dtype) - ) - if new_column_data_types: - raise DbtRuntimeError(schema_change_datatype_error.format(new_column_data_types)) - if source_not_in_target: - raise DbtRuntimeError(schema_change_missing_source_error.format(source_not_in_target)) - if target_not_in_source and on_schema_change == 'fail': + target_in_source = [column for column in target if column.name in source_map.keys()] + changed_data_types = [] + for column in target_in_source: + source_column = source_map.get(column.name) + if source_column is not None and column.dtype != source_column.dtype: + changed_data_types.append(column) + + clickhouse_column_changes = ClickHouseColumnChanges( + columns_to_add=target_not_in_source, + columns_to_drop=source_not_in_target, + columns_to_modify=changed_data_types, + on_schema_change=on_schema_change, + ) + + if clickhouse_column_changes.has_conflicting_changes: raise DbtRuntimeError( schema_change_fail_error.format( - source_not_in_target, target_not_in_source, new_column_data_types + source_not_in_target, target_not_in_source, changed_data_types ) ) - return target_not_in_source + + return clickhouse_column_changes @available.parse_none def s3source_clause( diff --git a/dbt/adapters/clickhouse/util.py b/dbt/adapters/clickhouse/util.py index b730b9fd..9410ad7d 100644 --- a/dbt/adapters/clickhouse/util.py +++ b/dbt/adapters/clickhouse/util.py @@ -13,9 +13,3 @@ def compare_versions(v1: str, v2: str) -> int: except ValueError: raise DbtRuntimeError("Version must consist of only numbers separated by '.'") return 0 - - -@dataclass -class NewColumnDataType: - column_name: str - new_type: str diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index 3181d40f..43b2d7ff 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -96,11 +96,13 @@ ) {% endmacro %} -{% macro create_empty_table_from_relation(relation, source_relation) -%} +{% macro create_empty_table_from_relation(relation, source_relation, sql=none) -%} {%- set sql_header = config.get('sql_header', none) -%} - {%- set columns = adapter.get_columns_in_relation(source_relation) | list -%} - - + {%- if sql -%} + {%- set columns = adapter.get_column_schema_from_query(sql) | list -%} + {%- else -%} + {%- set columns = adapter.get_columns_in_relation(source_relation) | list -%} + {%- endif -%} {%- set col_list = [] -%} {% for col in columns %} {{col_list.append(col.name + ' ' + col.data_type) or '' }} @@ -123,7 +125,7 @@ {{ drop_relation_if_exists(shard_relation) }} {{ drop_relation_if_exists(distributed_relation) }} {{ create_schema(shard_relation) }} - {% do run_query(create_empty_table_from_relation(shard_relation, structure_relation)) or '' %} + {% do run_query(create_empty_table_from_relation(shard_relation, structure_relation, sql_query)) or '' %} {% do run_query(create_distributed_table(distributed_relation, shard_relation)) or '' %} {% if sql_query is not none %} {% do run_query(clickhouse__insert_into(distributed_relation, sql_query)) or '' %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index a8aba321..ef31a76c 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -76,18 +76,18 @@ {% else %} {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} - {% if on_schema_change != 'ignore' %} - {%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%} - {% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %} - {% set incremental_strategy = 'legacy' %} - {% do log('Schema changes detected, switching to legacy incremental strategy') %} + {%- if on_schema_change != 'ignore' %} + {%- set local_column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation_local, sql) -%} + {% if local_column_changes and incremental_strategy != 'legacy' %} + {% do clickhouse__apply_column_changes(local_column_changes, existing_relation, True) %} + {% set existing_relation = load_cached_relation(this) %} {% endif %} {% endif %} {% if incremental_strategy != 'delete_insert' and incremental_predicates %} {% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %} {% endif %} {% if incremental_strategy == 'legacy' %} - {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key, True) %} + {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, local_column_changes, unique_key, True) %} {% set need_swap = true %} {% elif incremental_strategy == 'delete_insert' %} {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index 5ce35a5d..c4dcb1b8 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -59,11 +59,9 @@ {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} {%- if on_schema_change != 'ignore' %} {%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%} - {%- if column_changes %} - {%- if incremental_strategy in ('append', 'delete_insert') %} - {% set incremental_strategy = 'legacy' %} - {{ log('Schema changes detected, switching to legacy incremental strategy') }} - {%- endif %} + {% if column_changes and incremental_strategy != 'legacy' %} + {% do clickhouse__apply_column_changes(column_changes, existing_relation) %} + {% set existing_relation = load_cached_relation(this) %} {% endif %} {% endif %} {% if incremental_strategy != 'delete_insert' and incremental_predicates %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/schema_changes.sql b/dbt/include/clickhouse/macros/materializations/incremental/schema_changes.sql new file mode 100644 index 00000000..c8e982ff --- /dev/null +++ b/dbt/include/clickhouse/macros/materializations/incremental/schema_changes.sql @@ -0,0 +1,66 @@ +{% macro clickhouse__apply_column_changes(column_changes, existing_relation, is_distributed=False) %} + {{ log('Schema changes detected. Trying to apply the following changes: ' ~ column_changes) }} + {%- set existing_local = none -%} + {% if is_distributed %} + {%- set local_suffix = adapter.get_clickhouse_local_suffix() -%} + {%- set local_db_prefix = adapter.get_clickhouse_local_db_prefix() -%} + {%- set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none -%} + {% endif %} + + {% if column_changes.on_schema_change == 'append_new_columns' %} + {% do clickhouse__add_columns(column_changes.columns_to_add, existing_relation, existing_local, is_distributed) %} + + {% elif column_changes.on_schema_change == 'sync_all_columns' %} + {% do clickhouse__drop_columns(column_changes.columns_to_drop, existing_relation, existing_local, is_distributed) %} + {% do clickhouse__add_columns(column_changes.columns_to_add, existing_relation, existing_local, is_distributed) %} + {% do clickhouse__modify_columns(column_changes.columns_to_modify, existing_relation, existing_local, is_distributed) %} + {% endif %} + +{% endmacro %} + +{% macro clickhouse__add_columns(columns, existing_relation, existing_local=none, is_distributed=False) %} + {% for column in columns %} + {% set alter_action -%} + add column if not exists `{{ column.name }}` {{ column.data_type }} + {%- endset %} + {% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %} + {% endfor %} + +{% endmacro %} + +{% macro clickhouse__drop_columns(columns, existing_relation, existing_local=none, is_distributed=False) %} + {% for column in columns %} + {% set alter_action -%} + drop column if exists `{{ column.name }}` + {%- endset %} + {% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %} + {% endfor %} + +{% endmacro %} + +{% macro clickhouse__modify_columns(columns, existing_relation, existing_local=none, is_distributed=False) %} + {% for column in columns %} + {% set alter_action -%} + modify column if exists `{{ column.name }}` {{ column.data_type }} + {%- endset %} + {% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %} + {% endfor %} + +{% endmacro %} + +{% macro clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local=none, is_distributed=False) %} + {% if is_distributed %} + {% call statement('alter_table') %} + alter table {{ existing_local }} {{ on_cluster_clause(existing_relation) }} {{ alter_action }} + {% endcall %} + {% call statement('alter_table') %} + alter table {{ existing_relation }} {{ on_cluster_clause(existing_relation) }} {{ alter_action }} + {% endcall %} + + {% else %} + {% call statement('alter_table') %} + alter table {{ existing_relation }} {{ alter_action }} + {% endcall %} + {% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py index 9bccaf4e..93ca1828 100644 --- a/tests/integration/adapter/incremental/test_schema_change.py +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -1,27 +1,29 @@ +from functools import reduce + import pytest from dbt.tests.util import run_dbt, run_dbt_and_capture schema_change_sql = """ {{ config( - materialized='incremental', + materialized='%s', unique_key='col_1', - on_schema_change='%schema_change%' + on_schema_change='%s' ) }} -{% if not is_incremental() %} +{%% if not is_incremental() %%} select number as col_1, number + 1 as col_2 from numbers(3) -{% else %} +{%% else %%} select number as col_1, number + 1 as col_2, number + 2 as col_3 from numbers(2, 3) -{% endif %} +{%% endif %%} """ @@ -29,43 +31,175 @@ class TestOnSchemaChange: @pytest.fixture(scope="class") def models(self): return { - "schema_change_ignore.sql": schema_change_sql.replace("%schema_change%", "ignore"), - "schema_change_fail.sql": schema_change_sql.replace("%schema_change%", "fail"), - "schema_change_append.sql": schema_change_sql.replace( - "%schema_change%", "append_new_columns" - ), + "schema_change_ignore.sql": schema_change_sql % ("incremental", "ignore"), + "schema_change_fail.sql": schema_change_sql % ("incremental", "fail"), + "schema_change_append.sql": schema_change_sql % ("incremental", "append_new_columns"), + "schema_change_distributed_ignore.sql": schema_change_sql + % ("distributed_incremental", "ignore"), + "schema_change_distributed_fail.sql": schema_change_sql + % ("distributed_incremental", "fail"), + "schema_change_distributed_append.sql": schema_change_sql + % ("distributed_incremental", "append_new_columns"), } - def test_ignore(self, project): - run_dbt(["run", "--select", "schema_change_ignore"]) - result = project.run_sql("select * from schema_change_ignore order by col_1", fetch="all") + @pytest.mark.parametrize("model", ("schema_change_ignore", "schema_change_distributed_ignore")) + def test_ignore(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 - run_dbt(["run", "--select", "schema_change_ignore"]) - result = project.run_sql("select * from schema_change_ignore", fetch="all") + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model}", fetch="all") assert len(result) == 5 - def test_fail(self, project): - run_dbt(["run", "--select", "schema_change_fail"]) - result = project.run_sql("select * from schema_change_fail order by col_1", fetch="all") + @pytest.mark.parametrize("model", ("schema_change_fail", "schema_change_distributed_fail")) + def test_fail(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 _, log_output = run_dbt_and_capture( [ "run", "--select", - "schema_change_fail", + model, ], expect_pass=False, ) assert 'out of sync' in log_output.lower() - def test_append(self, project): - run_dbt(["run", "--select", "schema_change_append"]) - result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") + @pytest.mark.parametrize("model", ("schema_change_append", "schema_change_distributed_append")) + def test_append(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 - run_dbt(["--debug", "run", "--select", "schema_change_append"]) - result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") + run_dbt(["--debug", "run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert result[0][2] == 0 assert result[3][2] == 5 + + +# contains dropped, added, and changed (type) columns +complex_schema_change_sql = """ +{{ + config( + materialized='%s', + unique_key='col_1', + on_schema_change='%s' + ) +}} + +{%% if not is_incremental() %%} +select + toUInt8(number) as col_1, + number + 1 as col_2 +from numbers(3) +{%% else %%} +select + toFloat32(number) as col_1, + number + 2 as col_3 +from numbers(2, 3) +{%% endif %%} +""" + + +class TestComplexSchemaChange: + @pytest.fixture(scope="class") + def models(self): + return { + "complex_schema_change_fail.sql": complex_schema_change_sql % ("incremental", "fail"), + "complex_schema_change_append.sql": complex_schema_change_sql + % ("incremental", "append_new_columns"), + "complex_schema_change_sync.sql": complex_schema_change_sql + % ("incremental", "sync_all_columns"), + "complex_schema_change_distributed_fail.sql": complex_schema_change_sql + % ("distributed_incremental", "fail"), + "complex_schema_change_distributed_append.sql": complex_schema_change_sql + % ("distributed_incremental", "append_new_columns"), + "complex_schema_change_distributed_sync.sql": complex_schema_change_sql + % ("distributed_incremental", "sync_all_columns"), + } + + @pytest.mark.parametrize( + "model", + ( + "complex_schema_change_fail", + "complex_schema_change_distributed_fail", + "complex_schema_change_append", + "complex_schema_change_distributed_append", + ), + ) + def test_fail(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + _, log_output = run_dbt_and_capture( + [ + "run", + "--select", + model, + ], + expect_pass=False, + ) + assert 'out of sync' in log_output.lower() + + @pytest.mark.parametrize( + "model", ("complex_schema_change_sync", "complex_schema_change_distributed_sync") + ) + def test_sync(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") + assert all(len(row) == 2 for row in result) + assert result[0][1] == 0 + assert result[3][1] == 5 + result_types = project.run_sql(f"select toColumnTypeName(col_1) from {model}", fetch="one") + assert result_types[0] == 'Float32' + + +out_of_order_columns_sql = """ +{{ + config( + materialized='%s', + unique_key='col_1', + on_schema_change='fail' + ) +}} + +{%% if not is_incremental() %%} +select + number as col_1, + number + 1 as col_2 +from numbers(3) +{%% else %%} +select + number + 1 as col_2, + number as col_1 +from numbers(2, 3) +{%% endif %%} +""" + + +class TestReordering: + @pytest.fixture(scope="class") + def models(self): + return { + "out_of_order_columns.sql": out_of_order_columns_sql % "incremental", + "out_of_order_columns_distributed.sql": out_of_order_columns_sql + % "distributed_incremental", + } + + @pytest.mark.parametrize("model", ("out_of_order_columns", "out_of_order_columns_distributed")) + def test_reordering(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") + assert result[0][1] == 1 + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") + assert result[0][1] == 1 + assert result[3][1] == 4