Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/update upstream #1

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8a2731f
Distributed incremental materialization (#172)
gladkikhtutu Jul 27, 2023
3a28a66
Update version and tweak docs
genzgd Jul 27, 2023
1a0649e
Lw delete set fix (#174)
genzgd Jul 27, 2023
4b8a202
Fix legacy incremental materialization (#178)
genzgd Aug 9, 2023
9c8139f
fix: distributed_table materialization issue (#184)
zli06160 Aug 22, 2023
80cba25
Bump version and changelog (#185)
genzgd Aug 22, 2023
b79669a
cluster names containing dash characters (#198) (#200)
the4thamigo-uk Oct 21, 2023
d63285a
Add basic error test, fix minor merge conflict (#202)
genzgd Oct 26, 2023
96474f1
Cluster setting and Distributed Table tests (#186)
gfunc Oct 26, 2023
af72593
Update version and CHANGELOG, incorporate cluster name fix (#203)
genzgd Oct 26, 2023
9edb554
Release 1 5 0 (#210)
genzgd Nov 23, 2023
5e8e54b
Update test and dependency versions. (#211)
genzgd Nov 23, 2023
588e5ca
Adjust the wrapper parenthesis around the table materialization sql c…
kris947 Nov 27, 2023
1f17ec2
Update for 1.5.1 bug fix
genzgd Nov 27, 2023
9997b82
Fix creation of replicated tables when using legacy materialization (…
StevenReitsma Nov 28, 2023
3fec9a4
On cluster sync cleanup
genzgd Nov 28, 2023
bf11cbe
Bug fixes related to model settings. (#214)
genzgd Nov 29, 2023
8561210
Add materialization macro for materialized view (#207)
SoryRawyer Nov 29, 2023
246a4d8
Release 1 6 0 (#215)
genzgd Nov 30, 2023
08bbbf9
Release 1 6 1 (#217)
genzgd Dec 5, 2023
e6e74e4
Release 1 6 2 (#219)
genzgd Dec 6, 2023
2e72a00
Release 1 7 0 (#220)
genzgd Dec 7, 2023
5ccdad5
Correctly warn or error if light weight deletes not available
genzgd Dec 8, 2023
2d5c675
Wrap columns_in_query query in select statement (#222)
ptemarvelde Dec 13, 2023
ca9da0b
Update changelog
genzgd Dec 13, 2023
36ae17e
fix: incremental on cluster clause
Savid Jul 12, 2023
c522eb3
feat(icremental): add distrubted table
Savid Jul 18, 2023
6125272
fix(incremental): replicated delete_insert
Savid Jul 24, 2023
665f184
feat(incremental): distrubted append
Savid Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Release 1 6 2 (ClickHouse#219)
* Limited fix to completely broken `on_schema_change`

* Tweak changelog
  • Loading branch information
genzgd authored Dec 6, 2023
commit e6e74e494ae09f6ff56ff106f166933e2114bb34
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
### Release [1.6.2], 2023-12-06
#### Bug Fix
- The dbt `on_schema_change` configuration value for incremental models was effectively being ignored. This has been fixed
with a very limited implementation. Closes https://github.com/ClickHouse/dbt-clickhouse/issues/199. Because of the way that
ORDER BY/SORT BY/PARTITION BY/PRIMARY KEYS work in ClickHouse, plus the complexities of correctly transforming ClickHouse data types,
`sync_all_columns` is not currently supported (although an implementation that works for non-key columns is theoretically possible,
such an enhancement is not currently planned). Accordingly, only `ignore`, `fail`, and `append_new_columns` values are supported
for `on_schema_change`. It is also not currently supported for Distributed tables.

Note that actually appending new columns requires a fallback to the `legacy` incremental strategy, which is quite inefficient,
so while theoretically possible, using `append_new_columns` is not recommended except for very small data volumes.

### Release [1.6.1], 2023-12-04
#### Bug Fixes
- Identifier quoting was disabled for tables/databases etc. This would cause failures for schemas or tables using reserved words
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.6.1'
version = '1.6.2'
24 changes: 24 additions & 0 deletions dbt/adapters/clickhouse/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
schema_change_fail_error = """
The source and target schemas on this incremental model are out of sync.
They can be reconciled in several ways:
- set the `on_schema_change` config to `append_new_columns`. (ClickHouse does not support `sync_all_columns`)
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.

Additional troubleshooting context:
Source columns not in target: {0}
Target columns not in source: {1}
New column types: {2}
"""

schema_change_datatype_error = """
The source and target schemas on this incremental model contain different data types. This is not supported.

Changed column types: {0}
"""

schema_change_missing_source_error = """
The target schema in on this incremental model contains a column not in the source schema. This is not supported.

Source columns not in target: {0}
"""
40 changes: 39 additions & 1 deletion dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
@@ -20,10 +20,15 @@
from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
from dbt.adapters.clickhouse.column import ClickHouseColumn
from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
from dbt.adapters.clickhouse.errors import (
schema_change_datatype_error,
schema_change_fail_error,
schema_change_missing_source_error,
)
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.relation import ClickHouseRelation
from dbt.adapters.clickhouse.util import compare_versions
from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions

GET_CATALOG_MACRO_NAME = 'get_catalog'
LIST_SCHEMAS_MACRO_NAME = 'list_schemas'
@@ -151,6 +156,39 @@ def calculate_incremental_strategy(self, strategy: str) -> str:
strategy = 'legacy'
return strategy

@available.parse_none
def check_incremental_schema_changes(
self, on_schema_change, existing, target_sql
) -> List[ClickHouseColumn]:
if on_schema_change not in ('fail', 'ignore', 'append_new_columns'):
raise DbtRuntimeError(
"Only `fail`, `ignore`, and `append_new_columns` supported for `on_schema_change`"
)
source = self.get_columns_in_relation(existing)
source_map = {column.name: column for column in source}
target = self.get_column_schema_from_query(target_sql)
target_map = {column.name: column for column in source}
source_not_in_target = [column for column in source if column.name not in target_map.keys()]
target_not_in_source = [column for column in target if column.name not in source_map.keys()]
new_column_data_types = []
for target_column in target:
source_column = source_map.get(target_column.name)
if source_column and source_column.dtype != target_column.dtype:
new_column_data_types.append(
NewColumnDataType(source_column.name, target_column.dtype)
)
if new_column_data_types:
raise DbtRuntimeError(schema_change_datatype_error.format(new_column_data_types))
if source_not_in_target:
raise DbtRuntimeError(schema_change_missing_source_error.format(source_not_in_target))
if target_not_in_source and on_schema_change == 'fail':
raise DbtRuntimeError(
schema_change_fail_error.format(
source_not_in_target, target_not_in_source, new_column_data_types
)
)
return target_not_in_source

@available.parse_none
def s3source_clause(
self,
8 changes: 8 additions & 0 deletions dbt/adapters/clickhouse/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dataclasses import dataclass

from dbt.exceptions import DbtRuntimeError


@@ -11,3 +13,9 @@ def compare_versions(v1: str, v2: str) -> int:
except ValueError:
raise DbtRuntimeError("Version must consist of only numbers separated by '.'")
return 0


@dataclass
class NewColumnDataType:
column_name: str
new_type: str
Original file line number Diff line number Diff line change
@@ -50,21 +50,23 @@
{% endcall %}

{% else %}
{% set schema_changes = none %}
{% set column_changes = none %}
{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% if on_schema_change != 'ignore' %}
{%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%}
{% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %}
{% set incremental_strategy = 'legacy' %}
{% do log('Schema changes detected, switching to legacy incremental strategy') %}
{%- if on_schema_change != 'ignore' %}
{%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%}
{%- if column_changes %}
{%- if incremental_strategy in ('append', 'delete_insert') %}
{% set incremental_strategy = 'legacy' %}
{{ log('Schema changes detected, switching to legacy incremental strategy') }}
{%- endif %}
{% endif %}
{% endif %}
{% if incremental_strategy != 'delete_insert' and incremental_predicates %}
{% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %}
{% endif %}
{% if incremental_strategy == 'legacy' %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key) %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key) %}
{% set need_swap = true %}
{% elif incremental_strategy == 'delete_insert' %}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %}
@@ -109,32 +111,7 @@

{%- endmaterialization %}


{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}

{%- set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) -%}
{% if not schema_changes_dict['schema_changed'] %}
{{ return }}
{% endif %}

{% if on_schema_change == 'fail' %}
{% set fail_msg %}
The source and target schemas on this incremental model are out of sync!
They can be reconciled in several ways:
- set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation.
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.
{% endset %}
{% do exceptions.raise_compiler_error(fail_msg) %}
{{ return }}
{% endif %}

{% do sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}

{% endmacro %}


{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, on_schema_change, unique_key, is_distributed=False) %}
{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key, is_distributed=False) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data'}) %}
{{ drop_relation_if_exists(new_data_relation) }}

@@ -143,10 +120,17 @@

-- First create a temporary table for all of the new data
{% if is_distributed %}
{% if column_changes %}
{% do exceptions.raise_compiler_error('Schema changes not supported with Distributed tables ') %}
{% endif %}
-- Need to use distributed table to have data on all shards
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}
{%- set inserting_relation = distributed_new_data_relation -%}
{{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }}
{% elif column_changes %}
{% call statement('create_new_data_temp') %}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
{% endcall %}
{% else %}
{% call statement('create_new_data_temp') %}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
@@ -168,11 +152,11 @@

-- Insert all the existing rows into the new temporary table, ignoring any rows that have keys in the "new data"
-- table.
{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{%- set source_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set source_columns_csv = source_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_existing_data') %}
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
insert into {{ inserted_relation }} ({{ source_columns_csv }})
select {{ source_columns_csv }}
from {{ existing_relation }}
where ({{ unique_key }}) not in (
select {{ unique_key }}
@@ -182,9 +166,15 @@
{% endcall %}

-- Insert all of the new data into the temporary table
{% if column_changes %}
{%- set dest_columns = adapter.get_columns_in_relation(new_data_relation) -%}
{%- set dest_columns_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% else %}
{%- set dest_columns_csv = source_columns_csv %}
{% endif %}
{% call statement('insert_new_data') %}
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
insert into {{ inserted_relation }} ({{ dest_columns_csv }})
select {{ dest_columns_csv }}
from {{ inserting_relation }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}
2 changes: 1 addition & 1 deletion tests/integration/adapter/basic/test_incremental.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ class TestIncremental(BaseIncremental):


incremental_not_schema_change_sql = """
{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }}
{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="append_new_columns") }}
select
toString(1) || '-' || toString(now64()) as user_id_current_time,
{% if is_incremental() %}
71 changes: 71 additions & 0 deletions tests/integration/adapter/incremental/test_schema_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture

schema_change_sql = """
{{
config(
materialized='incremental',
unique_key='col_1',
on_schema_change='%schema_change%'
)
}}

{% if not is_incremental() %}
select
number as col_1,
number + 1 as col_2
from numbers(3)
{% else %}
select
number as col_1,
number + 1 as col_2,
number + 2 as col_3
from numbers(2, 3)
{% endif %}
"""


class TestOnSchemaChange:
@pytest.fixture(scope="class")
def models(self):
return {
"schema_change_ignore.sql": schema_change_sql.replace("%schema_change%", "ignore"),
"schema_change_fail.sql": schema_change_sql.replace("%schema_change%", "fail"),
"schema_change_append.sql": schema_change_sql.replace(
"%schema_change%", "append_new_columns"
),
}

def test_ignore(self, project):
run_dbt(["run", "--select", "schema_change_ignore"])
result = project.run_sql("select * from schema_change_ignore order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
run_dbt(["run", "--select", "schema_change_ignore"])
result = project.run_sql("select * from schema_change_ignore", fetch="all")
assert len(result) == 5

def test_fail(self, project):
run_dbt(["run", "--select", "schema_change_fail"])
result = project.run_sql("select * from schema_change_fail order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
_, log_output = run_dbt_and_capture(
[
"run",
"--select",
"schema_change_fail",
],
expect_pass=False,
)
assert 'out of sync' in log_output.lower()

def test_append(self, project):
run_dbt(["run", "--select", "schema_change_append"])
result = project.run_sql("select * from schema_change_append order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
run_dbt(["--debug", "run", "--select", "schema_change_append"])
result = project.run_sql("select * from schema_change_append order by col_1", fetch="all")
assert result[0][2] == 0
assert result[3][2] == 5