Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/update upstream #1

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8a2731f
Distributed incremental materialization (#172)
gladkikhtutu Jul 27, 2023
3a28a66
Update version and tweak docs
genzgd Jul 27, 2023
1a0649e
Lw delete set fix (#174)
genzgd Jul 27, 2023
4b8a202
Fix legacy incremental materialization (#178)
genzgd Aug 9, 2023
9c8139f
fix: distributed_table materialization issue (#184)
zli06160 Aug 22, 2023
80cba25
Bump version and changelog (#185)
genzgd Aug 22, 2023
b79669a
cluster names containing dash characters (#198) (#200)
the4thamigo-uk Oct 21, 2023
d63285a
Add basic error test, fix minor merge conflict (#202)
genzgd Oct 26, 2023
96474f1
Cluster setting and Distributed Table tests (#186)
gfunc Oct 26, 2023
af72593
Update version and CHANGELOG, incorporate cluster name fix (#203)
genzgd Oct 26, 2023
9edb554
Release 1 5 0 (#210)
genzgd Nov 23, 2023
5e8e54b
Update test and dependency versions. (#211)
genzgd Nov 23, 2023
588e5ca
Adjust the wrapper parenthesis around the table materialization sql c…
kris947 Nov 27, 2023
1f17ec2
Update for 1.5.1 bug fix
genzgd Nov 27, 2023
9997b82
Fix creation of replicated tables when using legacy materialization (…
StevenReitsma Nov 28, 2023
3fec9a4
On cluster sync cleanup
genzgd Nov 28, 2023
bf11cbe
Bug fixes related to model settings. (#214)
genzgd Nov 29, 2023
8561210
Add materialization macro for materialized view (#207)
SoryRawyer Nov 29, 2023
246a4d8
Release 1 6 0 (#215)
genzgd Nov 30, 2023
08bbbf9
Release 1 6 1 (#217)
genzgd Dec 5, 2023
e6e74e4
Release 1 6 2 (#219)
genzgd Dec 6, 2023
2e72a00
Release 1 7 0 (#220)
genzgd Dec 7, 2023
5ccdad5
Correctly warn or error if light weight deletes not available
genzgd Dec 8, 2023
2d5c675
Wrap columns_in_query query in select statement (#222)
ptemarvelde Dec 13, 2023
ca9da0b
Update changelog
genzgd Dec 13, 2023
36ae17e
fix: incremental on cluster clause
Savid Jul 12, 2023
c522eb3
feat(icremental): add distrubted table
Savid Jul 18, 2023
6125272
fix(incremental): replicated delete_insert
Savid Jul 24, 2023
665f184
feat(incremental): distrubted append
Savid Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat(incremental): distrubted append
Savid committed Jan 17, 2024
commit 665f18478e751830ea364e97b3b120e7d4aee294
Original file line number Diff line number Diff line change
@@ -36,26 +36,40 @@
{% if existing_relation is none %}
-- No existing table, simply create a new one
{% call statement('main') %}
{{ get_create_table_as_sql(False, target_relation, sql) }}
{{ create_empty_table(target_relation, sql) }}
{% endcall %}
{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}

{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql, model['name']) }}
{% endcall %}

{% elif full_refresh_mode %}
-- Completely replacing the old table, so create a temporary table and then swap it
{% call statement('main') %}
{{ get_create_table_as_sql(False, intermediate_relation, sql) }}
{% endcall %}
{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}
{% set need_swap = true %}

{% elif inserts_only or unique_key is none -%}
-- There are no updates/deletes or duplicate keys are allowed. Simply add all of the new rows to the existing
-- table. It is the user's responsibility to avoid duplicates. Note that "inserts_only" is a ClickHouse adapter
-- specific configurable that is used to avoid creating an expensive intermediate table.
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{{ clickhouse__insert_into(target_relation, sql, model['name']) }}
{% endcall %}

{% else %}
{% set column_changes = none %}
{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}
{% set schema_changes = none %}
{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{%- if on_schema_change != 'ignore' %}
@@ -77,7 +91,7 @@
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %}
{% elif incremental_strategy == 'append' %}
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{{ clickhouse__insert_into(target_relation, sql, model['name']) }}
{% endcall %}
{% endif %}
{% endif %}
@@ -93,10 +107,6 @@
{% do to_drop.append(backup_relation) %}
{% endif %}

{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

@@ -256,17 +266,31 @@
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
<<<<<<< HEAD:dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
=======
{%- endif %}
SETTINGS mutations_sync = 2, allow_nondeterministic_mutations = 1
>>>>>>> bd0556f (feat(incremental): distrubted append):dbt/include/clickhouse/macros/materializations/incremental.sql
{% endcall %}

{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_new_data') %}
<<<<<<< HEAD:dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
insert into {{ existing_relation }} {{ adapter.get_model_query_settings(model) }} select {{ dest_cols_csv }} from {{ inserting_relation }}
{% endcall %}
{% do adapter.drop_relation(new_data_relation) %}
{{ drop_relation_if_exists(distributed_new_data_relation) }}
=======
insert into {{ model['name'] }} select {{ dest_cols_csv}} from {{ new_data_relation }} SETTINGS mutations_sync = 2, insert_distributed_sync = 1
{% endcall %}
{% do adapter.drop_relation(new_data_relation) %}
{% call statement('optimize_table') %}
optimize table {{ existing_relation }} {{ on_cluster_clause() }} FINAL DEDUPLICATE
{% endcall %}
>>>>>>> bd0556f (feat(incremental): distrubted append):dbt/include/clickhouse/macros/materializations/incremental.sql
{% endmacro %}

{% macro clickhouse__incremental_create_distributed(relation) %}
@@ -283,21 +307,25 @@
{% endif %}
)
{% endcall %}
{% else %}
{% call statement('create_temp_distributed_table') %}
CREATE TABLE {{ model['name'] }}_dbt_temp {{ on_cluster_clause() }} AS {{ relation }}
ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ relation.name }}'
{% if sharding is not none %}
, {{ sharding }}
{% endif %}
)
{% endcall %}
{% call statement('exchange_distributed_table') %}
EXCHANGE TABLES {{ model['name'] }}_dbt_temp AND {{ model['name'] }} {{ on_cluster_clause() }}
{% endcall %}
{% call statement('drop_temp_distributed_table') %}
DROP TABLE {{ model['name'] }}_dbt_temp {{ on_cluster_clause() }}
{% endcall %}
{% endif %}

{% endmacro %}


{% macro create_empty_table(relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create table {{ relation.include(database=False) }}
{{ on_cluster_clause() }}
{{ engine_clause() }}
{{ order_cols(label="order by") }}
{{ primary_key_clause(label="primary key") }}
{{ partition_cols(label="partition by") }}
{{ adapter.get_model_settings(model) }}
empty
as (
{{ sql }}
)
{%- endmacro %}
3 changes: 2 additions & 1 deletion dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -184,9 +184,10 @@

{%- endmacro %}

{% macro clickhouse__insert_into(target_relation, sql, has_contract) %}
{% macro clickhouse__insert_into(target_relation, sql, override_name) %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{%- set target_name = override_name or target_relation.name -%}

insert into {{ target_relation }}
({{ dest_cols_csv }})