Skip to content

Commit

Permalink
Release 1.4.0 (support dbt 1.4.1) (#137)
Browse files Browse the repository at this point in the history
* 1.4.0 checkpoint

* 1.4.0 checkpoint

* 1.4.0 checkpoint

* Cleanup

* Try to fix test environment

* Try to fix test environment

* Skip comment test, allow nondeterministic deletes

* Update client_name strings, update changelog re incremental predicates

* Update mypy version

* Fix requirements typo

* Update readme re incremental_projections
  • Loading branch information
genzgd authored Feb 6, 2023
1 parent 3604565 commit 53a71ab
Show file tree
Hide file tree
Showing 28 changed files with 366 additions and 156 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/test_matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ jobs:
- '3.8'
- '3.9'
- '3.10'
- '3.11'
clickhouse-version:
- '22.3'
- '22.8'
- '22.9'
- '22.10'
- '22.11'
- '22.12'
- latest

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Set legacy test settings file
- name: Set environment variables
if: ${{ matrix.clickhouse-version == '22.3' }}
run: echo "TEST_SETTINGS_FILE=22_3" >> $GITHUB_ENV
run: |
echo "TEST_SETTINGS_FILE=22_3" >> $GITHUB_ENV
echo "DBT_CH_TEST_CH_VERSION=22.3" >> $GITHUB_ENV
- name: Run ClickHouse Container
run: docker run
Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
### Release [1.4.0], 2023-02-06
#### Improvements
- Support dbt [1.4.1] https://github.com/ClickHouse/dbt-clickhouse/issues/135
- Adds support for Python 3.11
- Adds additional dbt 1.4.0 tests
- Adds support for incremental_predicates. This only applies to `delete+insert` incremental strategy. Note that incremental
predicates that depend on "non-deterministic" data (such as a subquery using a table that is accepting inserts) could lead to
unexpected results for ReplicatedMergeTree tables depending on the timing of the incremental materialization.
- Replaces deprecated Exception classes
- Setting the `use_lw_deletes` profile value to True will now attempt to enable the `allow_experimental_lightweight_delete`
setting for the dbt session (if user has such permissions on the ClickHouse server). See https://github.com/ClickHouse/dbt-clickhouse/issues/133

#### Bug Fix
- Composite unique keys specified as a list would not work with incremental materializations. This has been fixed.


### Release [1.3.3], 2023-01-18
#### Documentation Update
- The documentation has been updated to reflect that dbt-clickhouse does support ephemeral models, and ephemeral model tests do pass.
Expand Down
26 changes: 15 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,16 @@ your_profile_name:

## Model Configuration

| Option | Description | Required? |
|----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------|
| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) |

| Option | Description | Required? |
|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------|
| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy |
## Known Limitations

* Replicated tables (combined with the `cluster` profile setting) are available using the `on_cluster_clause` macro but are not included in the test suite and not formally tested.
Expand All @@ -107,11 +107,15 @@ goes wrong before the operation completes; however, since it involves a full cop
ClickHouse added "lightweight deletes" as an experimental feature in version 22.8. Lightweight deletes are significantly faster than ALTER TABLE ... DELETE
operations, because they don't require rewriting ClickHouse data parts. The incremental strategy `delete+insert` utilizes lightweight deletes to implement
incremental materializations that perform significantly better than the "legacy" strategy. However, there are important caveats to using this strategy:
- The setting `allow_experimental_lightweight_delete` must be enabled on your ClickHouse server or including in the `custom_settings` for your ClickHouse profile.
- Lightweight deletes must be enabled on your ClickHouse server using the setting `allow_experimental_lightweight_delete=1` or you
must set `use_lw_deletes=true` in your profile (which will enable that setting for your dbt sessions)
- As suggested by the setting name, lightweight delete functionality is still experimental and there are still known issues that must be resolved before the feature is considered production ready,
so usage should be limited to datasets that are easily recreated
- This strategy operates directly on the affected table/relation (with creating any intermediate or temporary tables), so if there is an issue during the operation, the
data in the incremental model is likely to be in an invalid state
- When using lightweight deletes, dbt-clickhouse enabled the setting `allow_nondeterministic_mutations`. In some very rare cases using non-deterministic incremental_predicates
this could result in a race condition for the updated/deleted items (and related log messages in the ClickHouse logs). To ensure consistent results the
incremental predicates should only include sub-queries on data that will not be modified during the incremental materialization.

### The Append Strategy

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.3.3'
version = '1.4.0'
4 changes: 2 additions & 2 deletions dbt/adapters/clickhouse/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, TypeVar

from dbt.adapters.base.column import Column
from dbt.exceptions import RuntimeException
from dbt.exceptions import DbtRuntimeError

Self = TypeVar('Self', bound='ClickHouseColumn')

Expand Down Expand Up @@ -96,7 +96,7 @@ def is_float(self) -> bool:

def string_size(self) -> int:
if not self.is_string():
raise RuntimeException('Called string_size() on non-string field!')
raise DbtRuntimeError('Called string_size() on non-string field!')

if not self.dtype.lower().startswith('fixedstring') or self.char_size is None:
return 256
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/clickhouse/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ def exception_handler(self, sql):
yield
except Exception as exp:
logger.debug('Error running SQL: {}', sql)
if isinstance(exp, dbt.exceptions.RuntimeException):
if isinstance(exp, dbt.exceptions.DbtRuntimeError):
raise
raise dbt.exceptions.RuntimeException(exp) from exp
raise dbt.exceptions.DbtRuntimeError from exp

@classmethod
def open(cls, connection):
Expand Down
5 changes: 2 additions & 3 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from typing import Any, Dict, Optional

from dbt.contracts.connection import Credentials

import dbt
from dbt.exceptions import DbtRuntimeError


@dataclass
Expand Down Expand Up @@ -44,7 +43,7 @@ def unique_field(self):

def __post_init__(self):
if self.database is not None and self.database != self.schema:
raise dbt.exceptions.RuntimeException(
raise DbtRuntimeError(
f' schema: {self.schema} \n'
f' database: {self.database} \n'
f' cluster: {self.cluster} \n'
Expand Down
50 changes: 36 additions & 14 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import uuid
from abc import ABC, abstractmethod

from dbt.exceptions import DatabaseException as DBTDatabaseException
from dbt.exceptions import FailedToConnectException
from dbt.exceptions import DbtDatabaseError, FailedToConnectError

from dbt.adapters.clickhouse.credentials import ClickHouseCredentials
from dbt.adapters.clickhouse.logger import logger
Expand All @@ -23,7 +22,7 @@ def get_db_client(credentials: ClickHouseCredentials):
if not port:
port = 9440 if credentials.secure else 9000
else:
raise FailedToConnectException(f'Unrecognized ClickHouse driver {driver}')
raise FailedToConnectError(f'Unrecognized ClickHouse driver {driver}')

credentials.driver = driver
credentials.port = port
Expand All @@ -35,7 +34,7 @@ def get_db_client(credentials: ClickHouseCredentials):

return ChNativeClient(credentials)
except ImportError:
raise FailedToConnectException(
raise FailedToConnectError(
'Native adapter required but package clickhouse-driver is not installed'
)
try:
Expand All @@ -45,7 +44,7 @@ def get_db_client(credentials: ClickHouseCredentials):

return ChHttpClient(credentials)
except ImportError:
raise FailedToConnectException(
raise FailedToConnectError(
'HTTP adapter required but package clickhouse-connect is not installed'
)

Expand All @@ -69,9 +68,9 @@ def __init__(self, credentials: ClickHouseCredentials):
try:
self._ensure_database(credentials.database_engine)
self.server_version = self._server_version()
lw_deletes = self.get_ch_setting('allow_experimental_lightweight_delete')
self.has_lw_deletes = lw_deletes is not None and int(lw_deletes) > 0
self.use_lw_deletes = self.has_lw_deletes and credentials.use_lw_deletes
self.has_lw_deletes, self.use_lw_deletes = self._check_lightweight_deletes(
credentials.use_lw_deletes
)
self.atomic_exchange = not check_exchange or self._check_atomic_exchange()
except Exception as ex:
self.close()
Expand Down Expand Up @@ -108,6 +107,29 @@ def _set_client_database(self):
def _server_version(self):
pass

def _check_lightweight_deletes(self, requested: bool):
lw_deletes = self.get_ch_setting('allow_experimental_lightweight_delete')
if lw_deletes is None:
if requested:
logger.warning(
'use_lw_deletes requested but are not available on this ClickHouse server'
)
return False, False
lw_deletes = int(lw_deletes)
if lw_deletes == 1:
return True, requested
if not requested:
return False, False
try:
self.command('SET allow_experimental_lightweight_delete = 1')
self.command('SET allow_nondeterministic_mutations = 1')
return True, True
except DbtDatabaseError as ex:
logger.warning(
'use_lw_deletes requested but cannot enable on this ClickHouse server %s', str(ex)
)
return False, False

def _ensure_database(self, database_engine) -> None:
if not self.database:
return
Expand All @@ -119,11 +141,11 @@ def _ensure_database(self, database_engine) -> None:
self.command(f'CREATE DATABASE {self.database}{engine_clause}')
db_exists = self.command(check_db)
if not db_exists:
raise FailedToConnectException(
raise FailedToConnectError(
f'Failed to create database {self.database} for unknown reason'
)
except DBTDatabaseException as ex:
raise FailedToConnectException(
except DbtDatabaseError as ex:
raise FailedToConnectError(
f'Failed to create {self.database} database due to ClickHouse exception'
) from ex
self._set_client_database()
Expand All @@ -143,7 +165,7 @@ def _check_atomic_exchange(self) -> bool:
try:
self.command('EXCHANGE TABLES {} AND {}'.format(*swap_tables))
return True
except DBTDatabaseException:
except DbtDatabaseError:
logger.info('ClickHouse server does not support the EXCHANGE TABLES command')
logger.info(
'This can be caused by an obsolete ClickHouse version or by running ClickHouse on'
Expand All @@ -156,8 +178,8 @@ def _check_atomic_exchange(self) -> bool:
try:
for table in swap_tables:
self.command(f'DROP TABLE IF EXISTS {table}')
except DBTDatabaseException:
except DbtDatabaseError:
logger.info('Unexpected server exception dropping table', exc_info=True)
except DBTDatabaseException:
except DbtDatabaseError:
logger.warning('Failed to run exchange test', exc_info=True)
return False
9 changes: 5 additions & 4 deletions dbt/adapters/clickhouse/httpclient.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import clickhouse_connect
from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError
from dbt.exceptions import DatabaseException as DBTDatabaseException
from dbt.exceptions import DbtDatabaseError
from dbt.version import __version__ as dbt_version

from dbt.adapters.clickhouse.__version__ import version as dbt_clickhouse_version
from dbt.adapters.clickhouse.dbclient import ChClientWrapper, ChRetryableException


Expand All @@ -11,13 +12,13 @@ def query(self, sql, **kwargs):
try:
return self._client.query(sql, **kwargs)
except DatabaseError as ex:
raise DBTDatabaseException(str(ex).strip()) from ex
raise DbtDatabaseError(str(ex).strip()) from ex

def command(self, sql, **kwargs):
try:
return self._client.command(sql, **kwargs)
except DatabaseError as ex:
raise DBTDatabaseException(str(ex).strip()) from ex
raise DbtDatabaseError(str(ex).strip()) from ex

def get_ch_setting(self, setting_name):
setting = self._client.server_settings.get(setting_name)
Expand All @@ -43,7 +44,7 @@ def _create_client(self, credentials):
compress=False if credentials.compression == '' else bool(credentials.compression),
connect_timeout=credentials.connect_timeout,
send_receive_timeout=credentials.send_receive_timeout,
client_name=f'dbt/{dbt_version}',
client_name=f'dbt/{dbt_version} dbt-clickhouse/{dbt_clickhouse_version}',
verify=credentials.verify,
query_limit=0,
settings=self._conn_settings,
Expand Down
Loading

0 comments on commit 53a71ab

Please sign in to comment.