diff --git a/.github/workflows/test_matrix.yml b/.github/workflows/test_matrix.yml index 0e0a3b35..72760bfb 100644 --- a/.github/workflows/test_matrix.yml +++ b/.github/workflows/test_matrix.yml @@ -26,20 +26,23 @@ jobs: - '3.8' - '3.9' - '3.10' + - '3.11' clickhouse-version: - '22.3' - '22.8' - - '22.9' - - '22.10' + - '22.11' + - '22.12' - latest steps: - name: Checkout uses: actions/checkout@v3 - - name: Set legacy test settings file + - name: Set environment variables if: ${{ matrix.clickhouse-version == '22.3' }} - run: echo "TEST_SETTINGS_FILE=22_3" >> $GITHUB_ENV + run: | + echo "TEST_SETTINGS_FILE=22_3" >> $GITHUB_ENV + echo "DBT_CH_TEST_CH_VERSION=22.3" >> $GITHUB_ENV - name: Run ClickHouse Container run: docker run diff --git a/CHANGELOG.md b/CHANGELOG.md index d1f2d33e..17ca0b6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,19 @@ +### Release [1.4.0], 2023-02-06 +#### Improvements +- Support dbt [1.4.1] https://github.com/ClickHouse/dbt-clickhouse/issues/135 + - Adds support for Python 3.11 + - Adds additional dbt 1.4.0 tests + - Adds support for incremental_predicates. This only applies to `delete+insert` incremental strategy. Note that incremental +predicates that depend on "non-deterministic" data (such as a subquery using a table that is accepting inserts) could lead to +unexpected results for ReplicatedMergeTree tables depending on the timing of the incremental materialization. + - Replaces deprecated Exception classes +- Setting the `use_lw_deletes` profile value to True will now attempt to enable the `allow_experimental_lightweight_delete` +setting for the dbt session (if user has such permissions on the ClickHouse server). See https://github.com/ClickHouse/dbt-clickhouse/issues/133 + +#### Bug Fix +- Composite unique keys specified as a list would not work with incremental materializations. This has been fixed. + + ### Release [1.3.3], 2023-01-18 #### Documentation Update - The documentation has been updated to reflect that dbt-clickhouse does support ephemeral models, and ephemeral model tests do pass. diff --git a/README.md b/README.md index 5b496b4b..1b757247 100644 --- a/README.md +++ b/README.md @@ -75,16 +75,16 @@ your_profile_name: ## Model Configuration -| Option | Description | Required? | -|----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| -| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) | -| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) | -| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional | -| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | -| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional | -| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional | -| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) | - +| Option | Description | Required? | +|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| +| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) | +| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) | +| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional | +| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | +| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional | +| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional | +| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) | +| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | ## Known Limitations * Replicated tables (combined with the `cluster` profile setting) are available using the `on_cluster_clause` macro but are not included in the test suite and not formally tested. @@ -107,11 +107,15 @@ goes wrong before the operation completes; however, since it involves a full cop ClickHouse added "lightweight deletes" as an experimental feature in version 22.8. Lightweight deletes are significantly faster than ALTER TABLE ... DELETE operations, because they don't require rewriting ClickHouse data parts. The incremental strategy `delete+insert` utilizes lightweight deletes to implement incremental materializations that perform significantly better than the "legacy" strategy. However, there are important caveats to using this strategy: -- The setting `allow_experimental_lightweight_delete` must be enabled on your ClickHouse server or including in the `custom_settings` for your ClickHouse profile. +- Lightweight deletes must be enabled on your ClickHouse server using the setting `allow_experimental_lightweight_delete=1` or you +must set `use_lw_deletes=true` in your profile (which will enable that setting for your dbt sessions) - As suggested by the setting name, lightweight delete functionality is still experimental and there are still known issues that must be resolved before the feature is considered production ready, so usage should be limited to datasets that are easily recreated - This strategy operates directly on the affected table/relation (with creating any intermediate or temporary tables), so if there is an issue during the operation, the data in the incremental model is likely to be in an invalid state +- When using lightweight deletes, dbt-clickhouse enabled the setting `allow_nondeterministic_mutations`. In some very rare cases using non-deterministic incremental_predicates +this could result in a race condition for the updated/deleted items (and related log messages in the ClickHouse logs). To ensure consistent results the +incremental predicates should only include sub-queries on data that will not be modified during the incremental materialization. ### The Append Strategy diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py index 6cb37e65..f5a0a88b 100644 --- a/dbt/adapters/clickhouse/__version__.py +++ b/dbt/adapters/clickhouse/__version__.py @@ -1 +1 @@ -version = '1.3.3' +version = '1.4.0' diff --git a/dbt/adapters/clickhouse/column.py b/dbt/adapters/clickhouse/column.py index 720c7fd7..43df6aa3 100644 --- a/dbt/adapters/clickhouse/column.py +++ b/dbt/adapters/clickhouse/column.py @@ -3,7 +3,7 @@ from typing import Any, TypeVar from dbt.adapters.base.column import Column -from dbt.exceptions import RuntimeException +from dbt.exceptions import DbtRuntimeError Self = TypeVar('Self', bound='ClickHouseColumn') @@ -96,7 +96,7 @@ def is_float(self) -> bool: def string_size(self) -> int: if not self.is_string(): - raise RuntimeException('Called string_size() on non-string field!') + raise DbtRuntimeError('Called string_size() on non-string field!') if not self.dtype.lower().startswith('fixedstring') or self.char_size is None: return 256 diff --git a/dbt/adapters/clickhouse/connections.py b/dbt/adapters/clickhouse/connections.py index ec1ef53c..c61766b3 100644 --- a/dbt/adapters/clickhouse/connections.py +++ b/dbt/adapters/clickhouse/connections.py @@ -28,9 +28,9 @@ def exception_handler(self, sql): yield except Exception as exp: logger.debug('Error running SQL: {}', sql) - if isinstance(exp, dbt.exceptions.RuntimeException): + if isinstance(exp, dbt.exceptions.DbtRuntimeError): raise - raise dbt.exceptions.RuntimeException(exp) from exp + raise dbt.exceptions.DbtRuntimeError from exp @classmethod def open(cls, connection): diff --git a/dbt/adapters/clickhouse/credentials.py b/dbt/adapters/clickhouse/credentials.py index 84a426de..178625f1 100644 --- a/dbt/adapters/clickhouse/credentials.py +++ b/dbt/adapters/clickhouse/credentials.py @@ -2,8 +2,7 @@ from typing import Any, Dict, Optional from dbt.contracts.connection import Credentials - -import dbt +from dbt.exceptions import DbtRuntimeError @dataclass @@ -44,7 +43,7 @@ def unique_field(self): def __post_init__(self): if self.database is not None and self.database != self.schema: - raise dbt.exceptions.RuntimeException( + raise DbtRuntimeError( f' schema: {self.schema} \n' f' database: {self.database} \n' f' cluster: {self.cluster} \n' diff --git a/dbt/adapters/clickhouse/dbclient.py b/dbt/adapters/clickhouse/dbclient.py index 1a335a25..0628fff6 100644 --- a/dbt/adapters/clickhouse/dbclient.py +++ b/dbt/adapters/clickhouse/dbclient.py @@ -1,8 +1,7 @@ import uuid from abc import ABC, abstractmethod -from dbt.exceptions import DatabaseException as DBTDatabaseException -from dbt.exceptions import FailedToConnectException +from dbt.exceptions import DbtDatabaseError, FailedToConnectError from dbt.adapters.clickhouse.credentials import ClickHouseCredentials from dbt.adapters.clickhouse.logger import logger @@ -23,7 +22,7 @@ def get_db_client(credentials: ClickHouseCredentials): if not port: port = 9440 if credentials.secure else 9000 else: - raise FailedToConnectException(f'Unrecognized ClickHouse driver {driver}') + raise FailedToConnectError(f'Unrecognized ClickHouse driver {driver}') credentials.driver = driver credentials.port = port @@ -35,7 +34,7 @@ def get_db_client(credentials: ClickHouseCredentials): return ChNativeClient(credentials) except ImportError: - raise FailedToConnectException( + raise FailedToConnectError( 'Native adapter required but package clickhouse-driver is not installed' ) try: @@ -45,7 +44,7 @@ def get_db_client(credentials: ClickHouseCredentials): return ChHttpClient(credentials) except ImportError: - raise FailedToConnectException( + raise FailedToConnectError( 'HTTP adapter required but package clickhouse-connect is not installed' ) @@ -69,9 +68,9 @@ def __init__(self, credentials: ClickHouseCredentials): try: self._ensure_database(credentials.database_engine) self.server_version = self._server_version() - lw_deletes = self.get_ch_setting('allow_experimental_lightweight_delete') - self.has_lw_deletes = lw_deletes is not None and int(lw_deletes) > 0 - self.use_lw_deletes = self.has_lw_deletes and credentials.use_lw_deletes + self.has_lw_deletes, self.use_lw_deletes = self._check_lightweight_deletes( + credentials.use_lw_deletes + ) self.atomic_exchange = not check_exchange or self._check_atomic_exchange() except Exception as ex: self.close() @@ -108,6 +107,29 @@ def _set_client_database(self): def _server_version(self): pass + def _check_lightweight_deletes(self, requested: bool): + lw_deletes = self.get_ch_setting('allow_experimental_lightweight_delete') + if lw_deletes is None: + if requested: + logger.warning( + 'use_lw_deletes requested but are not available on this ClickHouse server' + ) + return False, False + lw_deletes = int(lw_deletes) + if lw_deletes == 1: + return True, requested + if not requested: + return False, False + try: + self.command('SET allow_experimental_lightweight_delete = 1') + self.command('SET allow_nondeterministic_mutations = 1') + return True, True + except DbtDatabaseError as ex: + logger.warning( + 'use_lw_deletes requested but cannot enable on this ClickHouse server %s', str(ex) + ) + return False, False + def _ensure_database(self, database_engine) -> None: if not self.database: return @@ -119,11 +141,11 @@ def _ensure_database(self, database_engine) -> None: self.command(f'CREATE DATABASE {self.database}{engine_clause}') db_exists = self.command(check_db) if not db_exists: - raise FailedToConnectException( + raise FailedToConnectError( f'Failed to create database {self.database} for unknown reason' ) - except DBTDatabaseException as ex: - raise FailedToConnectException( + except DbtDatabaseError as ex: + raise FailedToConnectError( f'Failed to create {self.database} database due to ClickHouse exception' ) from ex self._set_client_database() @@ -143,7 +165,7 @@ def _check_atomic_exchange(self) -> bool: try: self.command('EXCHANGE TABLES {} AND {}'.format(*swap_tables)) return True - except DBTDatabaseException: + except DbtDatabaseError: logger.info('ClickHouse server does not support the EXCHANGE TABLES command') logger.info( 'This can be caused by an obsolete ClickHouse version or by running ClickHouse on' @@ -156,8 +178,8 @@ def _check_atomic_exchange(self) -> bool: try: for table in swap_tables: self.command(f'DROP TABLE IF EXISTS {table}') - except DBTDatabaseException: + except DbtDatabaseError: logger.info('Unexpected server exception dropping table', exc_info=True) - except DBTDatabaseException: + except DbtDatabaseError: logger.warning('Failed to run exchange test', exc_info=True) return False diff --git a/dbt/adapters/clickhouse/httpclient.py b/dbt/adapters/clickhouse/httpclient.py index b6a2ffc0..6a1991e9 100644 --- a/dbt/adapters/clickhouse/httpclient.py +++ b/dbt/adapters/clickhouse/httpclient.py @@ -1,8 +1,9 @@ import clickhouse_connect from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError -from dbt.exceptions import DatabaseException as DBTDatabaseException +from dbt.exceptions import DbtDatabaseError from dbt.version import __version__ as dbt_version +from dbt.adapters.clickhouse.__version__ import version as dbt_clickhouse_version from dbt.adapters.clickhouse.dbclient import ChClientWrapper, ChRetryableException @@ -11,13 +12,13 @@ def query(self, sql, **kwargs): try: return self._client.query(sql, **kwargs) except DatabaseError as ex: - raise DBTDatabaseException(str(ex).strip()) from ex + raise DbtDatabaseError(str(ex).strip()) from ex def command(self, sql, **kwargs): try: return self._client.command(sql, **kwargs) except DatabaseError as ex: - raise DBTDatabaseException(str(ex).strip()) from ex + raise DbtDatabaseError(str(ex).strip()) from ex def get_ch_setting(self, setting_name): setting = self._client.server_settings.get(setting_name) @@ -43,7 +44,7 @@ def _create_client(self, credentials): compress=False if credentials.compression == '' else bool(credentials.compression), connect_timeout=credentials.connect_timeout, send_receive_timeout=credentials.send_receive_timeout, - client_name=f'dbt/{dbt_version}', + client_name=f'dbt/{dbt_version} dbt-clickhouse/{dbt_clickhouse_version}', verify=credentials.verify, query_limit=0, settings=self._conn_settings, diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 19aa2013..7bbc2569 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -13,6 +13,7 @@ from dbt.clients.agate_helper import table_from_rows from dbt.contracts.graph.manifest import Manifest from dbt.contracts.relation import RelationType +from dbt.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError from dbt.utils import executor, filter_null_values from dbt.adapters.clickhouse.column import ClickHouseColumn @@ -65,9 +66,7 @@ def convert_date_type(cls, agate_table: agate.Table, col_idx: int) -> str: @classmethod def convert_time_type(cls, agate_table: agate.Table, col_idx: int) -> str: - raise dbt.exceptions.NotImplementedException( - '`convert_time_type` is not implemented for this adapter!' - ) + raise NotImplementedError('`convert_time_type` is not implemented for this adapter!') @available.parse(lambda *a, **k: {}) def get_clickhouse_cluster_name(self): @@ -109,7 +108,7 @@ def calculate_incremental_strategy(self, strategy: str) -> str: strategy = 'delete_insert' if conn.handle.use_lw_deletes else 'legacy' strategy = strategy.replace('+', '_') if strategy not in ['legacy', 'append', 'delete_insert']: - raise dbt.exceptions.RuntimeException( + raise DbtRuntimeError( f"The incremental strategy '{strategy}' is not valid for ClickHouse" ) if not conn.handle.has_lw_deletes and strategy == 'delete_insert': @@ -156,13 +155,9 @@ def s3source_clause( url = f'https://{url}' access = '' if aws_access_key_id and not aws_secret_access_key: - raise dbt.exceptions.RuntimeException( - 'S3 aws_access_key_id specified without aws_secret_access_key' - ) + raise DbtRuntimeError('S3 aws_access_key_id specified without aws_secret_access_key') if aws_secret_access_key and not aws_access_key_id: - raise dbt.exceptions.RuntimeException( - 'S3 aws_secret_access_key specified without aws_access_key_id' - ) + raise DbtRuntimeError('S3 aws_secret_access_key specified without aws_access_key_id') if aws_access_key_id: access = f", '{aws_access_key_id}', '{aws_secret_access_key}'" comp = compression or s3config.get('compression', '') @@ -216,9 +211,7 @@ def list_relations_without_caching( return relations def get_relation(self, database: Optional[str], schema: str, identifier: str): - if not self.Relation.include_policy.database: - database = None - return super().get_relation(database, schema, identifier) + return super().get_relation(None, schema, identifier) @available.parse_none def get_ch_database(self, schema: str): @@ -227,7 +220,7 @@ def get_ch_database(self, schema: str): if len(results.rows): return ClickHouseDatabase(**results.rows[0]) return None - except dbt.exceptions.RuntimeException: + except DbtRuntimeError: return None def get_catalog(self, manifest): @@ -277,7 +270,7 @@ def get_rows_different_sql( relation_a: ClickHouseRelation, relation_b: ClickHouseRelation, column_names: Optional[List[str]] = None, - except_operator: str = None, + except_operator: Optional[str] = None, ) -> str: names: List[str] if column_names is None: @@ -367,9 +360,7 @@ class ClickHouseDatabase: def _expect_row_value(key: str, row: agate.Row): if key not in row.keys(): - raise dbt.exceptions.InternalException( - f'Got a row without \'{key}\' column, columns: {row.keys()}' - ) + raise DbtInternalError(f'Got a row without \'{key}\' column, columns: {row.keys()}') return row[key] @@ -395,9 +386,7 @@ def compare_versions(v1: str, v2: str) -> int: if int(part1) != int(part2): return 1 if int(part1) > int(part2) else -1 except ValueError: - raise dbt.exceptions.RuntimeException( - "Version must consist of only numbers separated by '.'" - ) + raise DbtRuntimeError("Version must consist of only numbers separated by '.'") return 0 diff --git a/dbt/adapters/clickhouse/nativeclient.py b/dbt/adapters/clickhouse/nativeclient.py index cfff8a84..0f400a09 100644 --- a/dbt/adapters/clickhouse/nativeclient.py +++ b/dbt/adapters/clickhouse/nativeclient.py @@ -1,19 +1,26 @@ import clickhouse_driver +import pkg_resources from clickhouse_driver.errors import NetworkError, SocketTimeoutError -from dbt.exceptions import DatabaseException as DBTDatabaseException +from dbt.exceptions import DbtDatabaseError from dbt.version import __version__ as dbt_version from dbt.adapters.clickhouse import ClickHouseCredentials +from dbt.adapters.clickhouse.__version__ import version as dbt_clickhouse_version from dbt.adapters.clickhouse.dbclient import ChClientWrapper, ChRetryableException from dbt.adapters.clickhouse.logger import logger +try: + driver_version = pkg_resources.get_distribution('clickhouse-driver').version +except pkg_resources.ResolutionError: + driver_version = 'unknown' + class ChNativeClient(ChClientWrapper): def query(self, sql, **kwargs): try: return NativeClientResult(self._client.execute(sql, with_column_types=True, **kwargs)) except clickhouse_driver.errors.Error as ex: - raise DBTDatabaseException(str(ex).strip()) from ex + raise DbtDatabaseError(str(ex).strip()) from ex def command(self, sql, **kwargs): try: @@ -21,7 +28,7 @@ def command(self, sql, **kwargs): if len(result) and len(result[0]): return result[0][0] except clickhouse_driver.errors.Error as ex: - raise DBTDatabaseException(str(ex).strip()) from ex + raise DbtDatabaseError(str(ex).strip()) from ex def get_ch_setting(self, setting_name): try: @@ -42,7 +49,7 @@ def _create_client(self, credentials: ClickHouseCredentials): port=credentials.port, user=credentials.user, password=credentials.password, - client_name=f'dbt-{dbt_version}', + client_name=f'dbt/{dbt_version} dbt-clickhouse/{dbt_clickhouse_version} clickhouse-driver/{driver_version}', secure=credentials.secure, verify=credentials.verify, connect_timeout=credentials.connect_timeout, diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 152ba29b..fb929806 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -1,8 +1,8 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Optional -import dbt.exceptions from dbt.adapters.base.relation import BaseRelation, Policy +from dbt.exceptions import DbtRuntimeError @dataclass @@ -21,20 +21,18 @@ class ClickHouseIncludePolicy(Policy): @dataclass(frozen=True, eq=False, repr=False) class ClickHouseRelation(BaseRelation): - quote_policy: ClickHouseQuotePolicy = ClickHouseQuotePolicy() - include_policy: ClickHouseIncludePolicy = ClickHouseIncludePolicy() + quote_policy: Policy = field(default_factory=lambda: ClickHouseQuotePolicy()) + include_policy: Policy = field(default_factory=lambda: ClickHouseIncludePolicy()) quote_character: str = '' can_exchange: bool = False def __post_init__(self): if self.database != self.schema and self.database: - raise dbt.exceptions.RuntimeException( - f'Cannot set database {self.database} in clickhouse!' - ) + raise DbtRuntimeError(f'Cannot set database {self.database} in clickhouse!') def render(self): if self.include_policy.database and self.include_policy.schema: - raise dbt.exceptions.RuntimeException( + raise DbtRuntimeError( 'Got a clickhouse relation with schema and database set to ' 'include, but only one can be set' ) @@ -47,7 +45,5 @@ def matches( identifier: Optional[str] = None, ): if schema: - raise dbt.exceptions.RuntimeException( - f'Passed unexpected schema value {schema} to Relation.matches' - ) + raise DbtRuntimeError(f'Passed unexpected schema value {schema} to Relation.matches') return self.database == database and self.identifier == identifier diff --git a/dbt/include/clickhouse/macros/adapters.sql b/dbt/include/clickhouse/macros/adapters.sql index 205edc73..8b832728 100644 --- a/dbt/include/clickhouse/macros/adapters.sql +++ b/dbt/include/clickhouse/macros/adapters.sql @@ -87,10 +87,6 @@ {% do return(None) %} {%- endmacro %} -{% macro clickhouse__current_timestamp() -%} - now() -{%- endmacro %} - {% macro clickhouse__get_columns_in_query(select_sql) %} {% call statement('get_columns_in_query', fetch_result=True, auto_begin=False) -%} select * from ( diff --git a/dbt/include/clickhouse/macros/materializations/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental.sql index ee18bde7..afa66250 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental.sql @@ -4,6 +4,12 @@ {%- set target_relation = this.incorporate(type='table') -%} {%- set unique_key = config.get('unique_key') -%} + {% if unique_key is not none and unique_key|length == 0 %} + {% set unique_key = none %} + {% endif %} + {% if unique_key is iterable and (unique_key is not string and unique_key is not mapping) %} + {% set unique_key = unique_key|join(', ') %} + {% endif %} {%- set inserts_only = config.get('inserts_only') -%} {%- set grant_config = config.get('grants') -%} {%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} @@ -44,25 +50,29 @@ {% endcall %} {% else %} - {% set schema_changes = none %} - {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} - {% if on_schema_change != 'ignore' %} - {%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%} - {% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %} - {% set incremental_strategy = 'legacy' %} - {% do log('Schema changes detected, switching to legacy incremental strategy') %} - {% endif %} - {% endif %} - {% if incremental_strategy == 'legacy' %} - {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key) %} - {% set need_swap = true %} - {% elif incremental_strategy == 'delete_insert' %} - {% do clickhouse__incremental_delete_insert(existing_relation, unique_key) %} - {% elif incremental_strategy == 'append' %} - {% call statement('main') %} - {{ clickhouse__insert_into(target_relation, sql) }} - {% endcall %} - {% endif %} + {% set schema_changes = none %} + {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} + {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} + {% if on_schema_change != 'ignore' %} + {%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%} + {% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %} + {% set incremental_strategy = 'legacy' %} + {% do log('Schema changes detected, switching to legacy incremental strategy') %} + {% endif %} + {% endif %} + {% if incremental_strategy != 'delete_insert' and incremental_predicates %} + {% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %} + {% endif %} + {% if incremental_strategy == 'legacy' %} + {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key) %} + {% set need_swap = true %} + {% elif incremental_strategy == 'delete_insert' %} + {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %} + {% elif incremental_strategy == 'append' %} + {% call statement('main') %} + {{ clickhouse__insert_into(target_relation, sql) }} + {% endcall %} + {% endif %} {% endif %} {% if need_swap %} @@ -70,7 +80,7 @@ {% do adapter.rename_relation(intermediate_relation, backup_relation) %} {% do exchange_tables_atomic(backup_relation, target_relation) %} {% else %} - {% do adapter.rename_relation(target_relation, backup_relation) %} + {% do adapter.rename_relation(target_relation, backup_relation) %} {% do adapter.rename_relation(intermediate_relation, target_relation) %} {% endif %} {% do to_drop.append(backup_relation) %} @@ -125,7 +135,7 @@ {% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, on_schema_change, unique_key) %} - -- First create a temporary table with all of the new data + -- First create a temporary table for all of the new data {% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + '__dbt_new_data'}) %} {{ drop_relation_if_exists(new_data_relation) }} {% call statement('create_new_data_temp') %} @@ -167,15 +177,20 @@ {% endmacro %} -{% macro clickhouse__incremental_delete_insert(existing_relation, unique_key) %} +{% macro clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %} {% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + '__dbt_new_data'}) %} {{ drop_relation_if_exists(new_data_relation) }} {% call statement('main') %} {{ get_create_table_as_sql(False, new_data_relation, sql) }} {% endcall %} {% call statement('delete_existing_data') %} - delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ new_data_relation }}) + delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} + from {{ new_data_relation }}) + {%- if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%}; {% endcall %} {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} diff --git a/dbt/include/clickhouse/macros/materializations/snapshot.sql b/dbt/include/clickhouse/macros/materializations/snapshot.sql index 62946f63..2a317736 100644 --- a/dbt/include/clickhouse/macros/materializations/snapshot.sql +++ b/dbt/include/clickhouse/macros/materializations/snapshot.sql @@ -5,12 +5,6 @@ {%- endfor -%}) {%- endmacro %} -{% macro clickhouse__snapshot_string_as_time(timestamp) -%} - {%- set result = "toDateTime('" ~ timestamp ~ "')" -%} - {{ return(result) }} -{%- endmacro %} - - {% macro clickhouse__post_snapshot(staging_relation) %} {{ drop_relation_if_exists(staging_relation) }} {% endmacro %} diff --git a/dbt/include/clickhouse/macros/utils/timestamps.sql b/dbt/include/clickhouse/macros/utils/timestamps.sql new file mode 100644 index 00000000..b17c759c --- /dev/null +++ b/dbt/include/clickhouse/macros/utils/timestamps.sql @@ -0,0 +1,8 @@ +{% macro clickhouse__current_timestamp() -%} + now() +{%- endmacro %} + +{% macro clickhouse__snapshot_string_as_time(timestamp) -%} + {%- set result = "toDateTime('" ~ timestamp ~ "')" -%} + {{ return(result) }} +{%- endmacro %} \ No newline at end of file diff --git a/dev_requirements.txt b/dev_requirements.txt index b3a6148d..d24f715c 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,15 +1,16 @@ -dbt-core~=1.3.2 -clickhouse-connect>=0.4.7 +dbt-core~=1.4.1 +clickhouse-connect>=0.5.5 clickhouse-driver>=0.2.3 pytest>=7.2.0 pytest-dotenv==0.5.2 -dbt-tests-adapter~=1.3.2 +dbt-tests-adapter~=1.4.1 black==22.3.0 isort==5.10.1 -mypy==0.960 +mypy==0.991 yamllint==1.26.3 flake8==4.0.1 types-requests==2.27.29 agate~=1.6.3 requests~=2.27.1 -setuptools~=65.3.0 \ No newline at end of file +setuptools~=65.3.0 +types-setuptools==67.1.0.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 1dbf594e..b0809283 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def _dbt_clickhouse_version(): package_version = _dbt_clickhouse_version() description = '''The Clickhouse plugin for dbt (data build tool)''' -dbt_version = '1.3.0' +dbt_version = '1.4.0' dbt_minor = '.'.join(dbt_version.split('.')[0:2]) if not package_version.startswith(dbt_minor): @@ -55,7 +55,7 @@ def _dbt_clickhouse_version(): }, install_requires=[ f'dbt-core~={dbt_version}', - 'clickhouse-connect>=0.4.7', + 'clickhouse-connect>=0.5.5', 'clickhouse-driver>=0.2.3', ], python_requires=">=3.7", @@ -70,5 +70,6 @@ def _dbt_clickhouse_version(): 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', ], ) diff --git a/tests/integration/adapter/test_incremental.py b/tests/integration/adapter/incremental/test_incremental.py similarity index 73% rename from tests/integration/adapter/test_incremental.py rename to tests/integration/adapter/incremental/test_incremental.py index e1f9f4e5..bfa97fab 100644 --- a/tests/integration/adapter/test_incremental.py +++ b/tests/integration/adapter/incremental/test_incremental.py @@ -1,4 +1,6 @@ import pytest +from dbt.tests.adapter.basic.files import model_incremental, schema_base_yml +from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange from dbt.tests.util import run_dbt uniq_schema = """ @@ -131,3 +133,34 @@ def test_compound_key(self, project): run_dbt() result = project.run_sql("select count(*) as num_rows from compound_key_inc", fetch="one") assert result[0] == 180 + + +class TestInsertsOnlyIncrementalMaterialization(BaseIncremental): + @pytest.fixture(scope="class") + def models(self): + config_materialized_incremental = """ + {{ config(order_by='(some_date, id, name)', inserts_only=True, materialized='incremental', unique_key='id') }} + """ + incremental_sql = config_materialized_incremental + model_incremental + return { + "incremental.sql": incremental_sql, + "schema.yml": schema_base_yml, + } + + +incremental_not_schema_change_sql = """ +{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }} +select + toString(1) || '-' || toString(now64()) as user_id_current_time, + {% if is_incremental() %} + 'thisis18characters' as platform + {% else %} + 'okthisis20characters' as platform + {% endif %} +""" + + +class TestIncrementalNotSchemaChange(BaseIncrementalNotSchemaChange): + @pytest.fixture(scope="class") + def models(self): + return {"incremental_not_schema_change.sql": incremental_not_schema_change_sql} diff --git a/tests/integration/adapter/incremental/test_incremental_predicates.py b/tests/integration/adapter/incremental/test_incremental_predicates.py new file mode 100644 index 00000000..44106b32 --- /dev/null +++ b/tests/integration/adapter/incremental/test_incremental_predicates.py @@ -0,0 +1,8 @@ +from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates + + +class TestIncrementalPredicates(BaseIncrementalPredicates): + def test__incremental_predicates(self, project, ch_test_version): + if ch_test_version.startswith('22.3'): + return # lightweight deletes not supported in 22.3 + super().test__incremental_predicates(project) diff --git a/tests/integration/adapter/incremental/test_incremental_unique_key.py b/tests/integration/adapter/incremental/test_incremental_unique_key.py new file mode 100644 index 00000000..8c182075 --- /dev/null +++ b/tests/integration/adapter/incremental/test_incremental_unique_key.py @@ -0,0 +1,85 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_unique_id import ( + BaseIncrementalUniqueKey, + models__duplicated_unary_unique_key_list_sql, + models__empty_str_unique_key_sql, + models__empty_unique_key_list_sql, + models__no_unique_key_sql, + models__nontyped_trinary_unique_key_list_sql, + models__not_found_unique_key_list_sql, + models__not_found_unique_key_sql, + models__str_unique_key_sql, + models__trinary_unique_key_list_sql, + models__unary_unique_key_list_sql, +) + +models__expected__one_str__overwrite_sql = """ +{{ + config( + materialized='table' + ) +}} + +select + 'CT' as state, + 'Hartford' as county, + 'Hartford' as city, + toDate('2022-02-14') as last_visit_date +union all +select 'MA','Suffolk','Boston',toDate('2020-02-12') +union all +select 'NJ','Mercer','Trenton',toDate('2022-01-01') +union all +select 'NY','Kings','Brooklyn',toDate('2021-04-02') +union all +select 'NY','New York','Manhattan',toDate('2021-04-01') +union all +select 'PA','Philadelphia','Philadelphia',toDate('2021-05-21') +""" + + +models__expected__unique_key_list__inplace_overwrite_sql = """ +{{ + config( + materialized='table' + ) +}} + +select + 'CT' as state, + 'Hartford' as county, + 'Hartford' as city, + toDate('2022-02-14') as last_visit_date +union all +select 'MA','Suffolk','Boston',toDate('2020-02-12') +union all +select 'NJ','Mercer','Trenton',toDate('2022-01-01') +union all +select 'NY','Kings','Brooklyn',toDate('2021-04-02') +union all +select 'NY','New York','Manhattan',toDate('2021-04-01') +union all +select 'PA','Philadelphia','Philadelphia',toDate('2021-05-21') + +""" + + +class TestIncrementalUniqueKey(BaseIncrementalUniqueKey): + @pytest.fixture(scope="class") + def models(self): + return { + "trinary_unique_key_list.sql": models__trinary_unique_key_list_sql, + "nontyped_trinary_unique_key_list.sql": models__nontyped_trinary_unique_key_list_sql, + "unary_unique_key_list.sql": models__unary_unique_key_list_sql, + "not_found_unique_key.sql": models__not_found_unique_key_sql, + "empty_unique_key_list.sql": models__empty_unique_key_list_sql, + "no_unique_key.sql": models__no_unique_key_sql, + "empty_str_unique_key.sql": models__empty_str_unique_key_sql, + "str_unique_key.sql": models__str_unique_key_sql, + "duplicated_unary_unique_key_list.sql": models__duplicated_unary_unique_key_list_sql, + "not_found_unique_key_list.sql": models__not_found_unique_key_list_sql, + "expected": { + "one_str__overwrite.sql": models__expected__one_str__overwrite_sql, + "unique_key_list__inplace_overwrite.sql": models__expected__unique_key_list__inplace_overwrite_sql, + }, + } diff --git a/tests/integration/adapter/test_basic.py b/tests/integration/adapter/test_basic.py index 79eeb73f..fc11e146 100644 --- a/tests/integration/adapter/test_basic.py +++ b/tests/integration/adapter/test_basic.py @@ -1,11 +1,11 @@ import pytest -from dbt.tests.adapter.basic.files import model_base, model_incremental, schema_base_yml +from dbt.tests.adapter.basic.files import model_base, schema_base_yml from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod from dbt.tests.adapter.basic.test_base import BaseSimpleMaterializations from dbt.tests.adapter.basic.test_empty import BaseEmpty from dbt.tests.adapter.basic.test_ephemeral import BaseEphemeral from dbt.tests.adapter.basic.test_generic_tests import BaseGenericTests -from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange +from dbt.tests.adapter.basic.test_incremental import BaseIncremental from dbt.tests.adapter.basic.test_singular_tests import BaseSingularTests from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp @@ -112,19 +112,6 @@ def test_base(self, project): assert result[0] == 10 -class TestInsertsOnlyIncrementalMaterialization(BaseIncremental): - @pytest.fixture(scope="class") - def models(self): - config_materialized_incremental = """ - {{ config(order_by='(some_date, id, name)', inserts_only=True, materialized='incremental', unique_key='id') }} - """ - incremental_sql = config_materialized_incremental + model_incremental - return { - "incremental.sql": incremental_sql, - "schema.yml": schema_base_yml, - } - - class TestCSVSeed: @pytest.fixture(scope="class") def seeds(self): @@ -141,21 +128,3 @@ def test_seed(self, project): columns = project.run_sql("DESCRIBE TABLE empty", fetch='all') assert columns[2][1] == 'Nullable(UInt32)' assert columns[3][1] == 'Nullable(String)' - - -incremental_not_schema_change_sql = """ -{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }} -select - toString(1) || '-' || toString(now64()) as user_id_current_time, - {% if is_incremental() %} - 'thisis18characters' as platform - {% else %} - 'okthisis20characters' as platform - {% endif %} -""" - - -class TestIncrementalNotSchemaChange(BaseIncrementalNotSchemaChange): - @pytest.fixture(scope="class") - def models(self): - return {"incremental_not_schema_change.sql": incremental_not_schema_change_sql} diff --git a/tests/integration/adapter/test_changing_relation_type.py b/tests/integration/adapter/test_changing_relation_type.py new file mode 100644 index 00000000..13a87fcb --- /dev/null +++ b/tests/integration/adapter/test_changing_relation_type.py @@ -0,0 +1,5 @@ +from dbt.tests.adapter.relations.test_changing_relation_type import BaseChangeRelationTypeValidator + + +class TestChangeRelationTypes(BaseChangeRelationTypeValidator): + pass diff --git a/tests/integration/adapter/test_comments.py b/tests/integration/adapter/test_comments.py index 6a0f7bc6..2e310c0c 100644 --- a/tests/integration/adapter/test_comments.py +++ b/tests/integration/adapter/test_comments.py @@ -1,4 +1,5 @@ import json +import os import pytest from dbt.tests.util import run_dbt @@ -66,6 +67,8 @@ def models(self): ['table_comment', 'view_comment'], ) def test_comment(self, project, model_name): + if '_cloud' in os.environ.get('GITHUB_REF', ''): + pytest.skip('Not running comment test for cloud') run_dbt(["run"]) run_dbt(["docs", "generate"]) with open("target/catalog.json") as fp: diff --git a/tests/integration/adapter/test_query_comments.py b/tests/integration/adapter/test_query_comments.py new file mode 100644 index 00000000..8c376c88 --- /dev/null +++ b/tests/integration/adapter/test_query_comments.py @@ -0,0 +1,32 @@ +from dbt.tests.adapter.query_comment.test_query_comment import ( + BaseEmptyQueryComments, + BaseMacroArgsQueryComments, + BaseMacroInvalidQueryComments, + BaseMacroQueryComments, + BaseNullQueryComments, + BaseQueryComments, +) + + +class TestQueryComments(BaseQueryComments): + pass + + +class TestMacroQueryComments(BaseMacroQueryComments): + pass + + +class TestMacroArgsQueryComments(BaseMacroArgsQueryComments): + pass + + +class TestMacroInvalidQueryComments(BaseMacroInvalidQueryComments): + pass + + +class TestNullQueryComments(BaseNullQueryComments): + pass + + +class TestEmptyQueryComments(BaseEmptyQueryComments): + pass diff --git a/tests/integration/adapter/utils/test_listagg.py b/tests/integration/adapter/utils/test_listagg.py index 67a6138e..eb60a177 100644 --- a/tests/integration/adapter/utils/test_listagg.py +++ b/tests/integration/adapter/utils/test_listagg.py @@ -1,5 +1,5 @@ import pytest -from dbt.exceptions import CompilationException +from dbt.exceptions import CompilationError from dbt.tests.adapter.utils.fixture_listagg import ( models__test_listagg_yml, seeds__data_listagg_csv, @@ -32,5 +32,5 @@ def models(self): def test_listagg_exception(self, project): try: run_dbt(["build"], False) - except CompilationException as e: + except CompilationError as e: assert 'does not support' in e.msg diff --git a/tests/integration/adapter/utils/test_timestamps.py b/tests/integration/adapter/utils/test_timestamps.py new file mode 100644 index 00000000..4b116a6e --- /dev/null +++ b/tests/integration/adapter/utils/test_timestamps.py @@ -0,0 +1,16 @@ +import pytest +from dbt.tests.adapter.utils.test_timestamps import BaseCurrentTimestamps + + +class TestCurrentTimestamps(BaseCurrentTimestamps): + @pytest.fixture(scope="class") + def expected_schema(self): + return { + "current_timestamp": "DateTime", + "current_timestamp_in_utc_backcompat": "DateTime", + "current_timestamp_backcompat": "DateTime", + } + + @pytest.fixture(scope="class") + def expected_sql(self): + return None diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b7c83fe3..5e79e256 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,10 +20,15 @@ def ch_test_users(): yield test_users +@pytest.fixture(scope="session", autouse=True) +def ch_test_version(): + yield os.environ.get('DBT_CH_TEST_CH_VERSION', 'latest') + + # This fixture is for customizing tests that need overrides in adapter # repos. Example in dbt.tests.adapter.basic.test_base. @pytest.fixture(scope="session") -def test_config(ch_test_users): +def test_config(ch_test_users, ch_test_version): compose_file = f'{Path(__file__).parent}/docker-compose.yml' test_host = os.environ.get('DBT_CH_TEST_HOST', 'localhost') test_port = int(os.environ.get('DBT_CH_TEST_PORT', 8123)) @@ -37,11 +42,12 @@ def test_config(ch_test_users): 'true', 'yes', ) + if ch_test_version.startswith('22.3'): + os.environ['DBT_CH_TEST_SETTINGS'] = '22_3' + docker = os.environ.get('DBT_CH_TEST_USE_DOCKER', '').lower() in ('1', 'true', 'yes') if docker: - if os.environ.get('DBT_CH_TEST_CH_VERSION', '').startswith('22.3'): - os.environ['DBT_CH_TEST_SETTINGS'] = '22_3' client_port = 10723 test_port = 10900 if test_driver == 'native' else client_port try: @@ -109,6 +115,7 @@ def dbt_profile_target(test_config): 'cluster_mode': test_config['cluster_mode'], 'secure': test_config['secure'], 'check_exchange': False, + 'use_lw_deletes': True, 'custom_settings': { 'distributed_ddl_task_timeout': 300, 'input_format_skip_unknown_fields': 1,