From cf5369c34b04e5f2093ff527365b7899ba9199b1 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Wed, 26 Jul 2023 08:29:43 +1000 Subject: [PATCH] feat(incremental): distrubted append --- .../incremental/incremental.sql | 56 ++++++++++++------- .../macros/materializations/table.sql | 2 +- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index cf3ff395..023b3beb 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -66,6 +66,10 @@ {% else %} {% set column_changes = none %} + {% if config.get('distributed') %} + {% do clickhouse__incremental_create_distributed(target_relation) %} + {% endif %} + {% set schema_changes = none %} {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} {%- if on_schema_change != 'ignore' %} @@ -103,10 +107,6 @@ {% do to_drop.append(backup_relation) %} {% endif %} - {% if config.get('distributed') %} - {% do clickhouse__incremental_create_distributed(target_relation) %} - {% endif %} - {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} @@ -266,17 +266,31 @@ {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} +<<<<<<< HEAD:dbt/include/clickhouse/macros/materializations/incremental/incremental.sql {%- endif -%} {{ adapter.get_model_query_settings(model) }} +======= + {%- endif %} + SETTINGS mutations_sync = 2, allow_nondeterministic_mutations = 1 +>>>>>>> bd0556f (feat(incremental): distrubted append):dbt/include/clickhouse/macros/materializations/incremental.sql {% endcall %} {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} {% call statement('insert_new_data') %} +<<<<<<< HEAD:dbt/include/clickhouse/macros/materializations/incremental/incremental.sql insert into {{ existing_relation }} {{ adapter.get_model_query_settings(model) }} select {{ dest_cols_csv }} from {{ inserting_relation }} {% endcall %} {% do adapter.drop_relation(new_data_relation) %} {{ drop_relation_if_exists(distributed_new_data_relation) }} +======= + insert into {{ model['name'] }} select {{ dest_cols_csv}} from {{ new_data_relation }} SETTINGS mutations_sync = 2, insert_distributed_sync = 1 + {% endcall %} + {% do adapter.drop_relation(new_data_relation) %} + {% call statement('optimize_table') %} + optimize table {{ existing_relation }} {{ on_cluster_clause() }} FINAL DEDUPLICATE + {% endcall %} +>>>>>>> bd0556f (feat(incremental): distrubted append):dbt/include/clickhouse/macros/materializations/incremental.sql {% endmacro %} {% macro clickhouse__incremental_create_distributed(relation) %} @@ -293,21 +307,25 @@ {% endif %} ) {% endcall %} - {% else %} - {% call statement('create_temp_distributed_table') %} - CREATE TABLE {{ model['name'] }}_dbt_temp {{ on_cluster_clause() }} AS {{ relation }} - ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ relation.name }}' - {% if sharding is not none %} - , {{ sharding }} - {% endif %} - ) - {% endcall %} - {% call statement('exchange_distributed_table') %} - EXCHANGE TABLES {{ model['name'] }}_dbt_temp AND {{ model['name'] }} {{ on_cluster_clause() }} - {% endcall %} - {% call statement('drop_temp_distributed_table') %} - DROP TABLE {{ model['name'] }}_dbt_temp {{ on_cluster_clause() }} - {% endcall %} {% endif %} {% endmacro %} + + +{% macro create_empty_table(relation, sql) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none }} + + create table {{ relation.include(database=False) }} + {{ on_cluster_clause() }} + {{ engine_clause() }} + {{ order_cols(label="order by") }} + {{ primary_key_clause(label="primary key") }} + {{ partition_cols(label="partition by") }} + {{ adapter.get_model_settings(model) }} + empty + as ( + {{ sql }} + ) +{%- endmacro %} diff --git a/dbt/include/clickhouse/macros/materializations/table.sql b/dbt/include/clickhouse/macros/materializations/table.sql index 73f8ddee..3cb95cff 100644 --- a/dbt/include/clickhouse/macros/materializations/table.sql +++ b/dbt/include/clickhouse/macros/materializations/table.sql @@ -184,7 +184,7 @@ {%- endmacro %} -{% macro clickhouse__insert_into(target_relation, sql, has_contract) %} +{% macro clickhouse__insert_into(target_relation, sql, override_name) %} {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} {%- set target_name = override_name or target_relation.name -%}