diff --git a/README.md b/README.md index 11c1f728..585ca188 100644 --- a/README.md +++ b/README.md @@ -137,9 +137,23 @@ if `cluster` is set in profile, `on_cluster_clause` now will return cluster info - Distributed materializations - Models with Replicated engines + +By default, tables and incremental materializations with non-replicated engines will not be affected by the `cluster` setting (model would be created on the connected node only). + +To force relations to be created on a cluster regardless of their engine or materialization, use the `force_on_cluster` argument: +```sql +{{ config( + engine='Null', + materialized='materialized_view', + force_on_cluster='true' + ) +}} +``` + table and incremental materializations with non-replicated engine will not be affected by `cluster` setting (model would be created on the connected node only). + ### Compatibility If a model has been created without a `cluster` setting, dbt-clickhouse will detect the situation and run all DDL/DML diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index c40a01f6..ad0ea9e1 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -51,6 +51,7 @@ @dataclass class ClickHouseConfig(AdapterConfig): engine: str = 'MergeTree()' + force_on_cluster: Optional[bool] = False order_by: Optional[Union[List[str], str]] = 'tuple()' partition_by: Optional[Union[List[str], str]] = None sharding_key: Optional[Union[List[str], str]] = 'rand()' diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 8ad3b39a..87134c5a 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -112,20 +112,20 @@ def create_from( # If the database is set, and the source schema is "defaulted" to the source.name, override the # schema with the database instead, since that's presumably what's intended for clickhouse schema = relation_config.schema + + cluster = quoting.credentials.cluster or '' can_on_cluster = None # We placed a hardcoded const (instead of importing it from dbt-core) in order to decouple the packages if relation_config.resource_type == NODE_TYPE_SOURCE: if schema == relation_config.source_name and relation_config.database: schema = relation_config.database + if cluster and str(relation_config.config.get("force_on_cluster")).lower() == "true": + can_on_cluster = True + else: - cluster = quoting.credentials.cluster if quoting.credentials.cluster else '' - materialized = ( - relation_config.config.materialized if relation_config.config.materialized else '' - ) - engine = ( - relation_config.config.get('engine') if relation_config.config.get('engine') else '' - ) + materialized = relation_config.config.get('materialized') or '' + engine = relation_config.config.get('engine') or '' can_on_cluster = cls.get_on_cluster(cluster, materialized, engine) return cls.create( diff --git a/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py b/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py index ff6e2efb..c75396ff 100644 --- a/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py +++ b/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py @@ -249,3 +249,105 @@ def test_base(self, project): assert len(results) == 1 self.assert_total_count_correct(project) + + +class TestMergeTreeForceClusterMaterialization(BaseSimpleMaterializations): + '''Test MergeTree materialized view is created across a cluster using the + `force_on_cluster` config argument + ''' + + @pytest.fixture(scope="class") + def models(self): + config_force_on_cluster = """ + {{ config( + engine='MergeTree', + materialized='materialized_view', + force_on_cluster='true' + ) + }} + """ + + return { + "force_on_cluster.sql": config_force_on_cluster + model_base, + "schema.yml": schema_base_yml, + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "schema.yml": base_seeds_schema_yml, + "base.csv": seeds_base_csv, + } + + def assert_total_count_correct(self, project): + '''Check if table is created on cluster''' + cluster = project.test_config['cluster'] + + # check if data is properly distributed/replicated + table_relation = relation_from_name(project.adapter, "force_on_cluster") + # ClickHouse cluster in the docker-compose file + # under tests/integration is configured with 3 nodes + host_count = project.run_sql( + f"select count(host_name) as host_count from system.clusters where cluster='{cluster}'", + fetch="one", + ) + assert host_count[0] > 1 + + table_count = project.run_sql( + f"select count() From clusterAllReplicas('{cluster}', system.tables) " + f"where database='{table_relation.schema}' and name='{table_relation.identifier}'", + fetch="one", + ) + + assert table_count[0] == 3 + + mv_count = project.run_sql( + f"select count() From clusterAllReplicas('{cluster}', system.tables) " + f"where database='{table_relation.schema}' and name='{table_relation.identifier}_mv'", + fetch="one", + ) + + assert mv_count[0] == 3 + + @pytest.mark.skipif( + os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' + ) + def test_base(self, project): + # cluster setting must exist + cluster = project.test_config['cluster'] + assert cluster + + # seed command + results = run_dbt(["seed"]) + # seed result length + assert len(results) == 1 + + # run command + results = run_dbt() + # run result length + assert len(results) == 1 + + # names exist in result nodes + check_result_nodes_by_name(results, ["force_on_cluster"]) + + # check relation types + expected = { + "base": "table", + "replicated": "table", + } + check_relation_types(project.adapter, expected) + + relation = relation_from_name(project.adapter, "base") + # table rowcount + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + assert result[0] == 10 + + # relations_equal + self.assert_total_count_correct(project) + + # run full refresh + results = run_dbt(['--debug', 'run', '--full-refresh']) + # run result length + assert len(results) == 1 + + self.assert_total_count_correct(project)