diff --git a/CHANGELOG.md b/CHANGELOG.md index 09ae63d4..648668de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ### Improvement * Added support for [range_hashed](https://clickhouse.com/docs/en/sql-reference/dictionaries#range_hashed) and [complex_key_range_hashed](https://clickhouse.com/docs/en/sql-reference/dictionaries#complex_key_range_hashed) layouts to the dictionary materialization. ([#361](https://github.com/ClickHouse/dbt-clickhouse/pull/361)) +### New Features +* Added support for the creation of more than one materialized view inserting records into the same target table. ([#360](https://github.com/ClickHouse/dbt-clickhouse/pull/364)) + ### Release [1.8.4], 2024-09-17 ### Improvement * The S3 help macro now support a `role_arn` parameter as an alternative way to provide authentication for S3 based models. Thanks to diff --git a/README.md b/README.md index 99da7ff6..d058f9c8 100644 --- a/README.md +++ b/README.md @@ -246,6 +246,21 @@ no corresponding REFRESH operation). Instead, it acts as an "insert trigger", a (https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/materialized_view/test_materialized_view.py) for an introductory example of how to use this functionality. +Clickhouse provides the ability for more than one materialized view to write records to the same target table. To support this in dbt-clickhouse, you can construct a `UNION` in your model file, such that the SQL for each of your materialized views is wrapped with comments of the form `--my_mv_name:begin` and `--my_mv_name:end`. + +For example the following will build two materialized views both writing data to the same destination table of the model. The names of the materialized views will take the form `_mv1` and `_mv2` : + +``` +--mv1:begin +select a,b,c from {{ source('raw', 'table_1') }} +--mv1:end +union all +--mv2:begin +select a,b,c from {{ source('raw', 'table_2') }} +--mv2:end +``` + + # Dictionary materializations (experimental) See the tests in https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/dictionary/test_dictionary.py for examples of how to implement materializations for ClickHouse dictionaries diff --git a/dbt/include/clickhouse/macros/materializations/materialized_view.sql b/dbt/include/clickhouse/macros/materializations/materialized_view.sql index 28cb626d..513070ca 100644 --- a/dbt/include/clickhouse/macros/materializations/materialized_view.sql +++ b/dbt/include/clickhouse/macros/materializations/materialized_view.sql @@ -6,7 +6,6 @@ {%- materialization materialized_view, adapter='clickhouse' -%} {%- set target_relation = this.incorporate(type='table') -%} - {%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%} {%- set cluster_clause = on_cluster_clause(target_relation) -%} {# look for an existing relation for the target table and create backup relations if necessary #} @@ -35,16 +34,26 @@ -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} + -- extract the names of the materialized views from the sql + {% set view_names = modules.re.findall('--([^:]+):begin', sql) %} + + -- extract the sql for each of the materialized view into a map + {% set views = {} %} + {% if view_names %} + {% for view_name in view_names %} + {% set view_sql = modules.re.findall('--' + view_name + ':begin(.*)--' + view_name + ':end', sql, flags=modules.re.DOTALL)[0] %} + {%- set _ = views.update({view_name: view_sql}) -%} + {% endfor %} + {% else %} + {%- set _ = views.update({"mv": sql}) -%} + {% endif %} + {% if backup_relation is none %} {{ log('Creating new materialized view ' + target_relation.name )}} - {% call statement('main') -%} - {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql) }} - {%- endcall %} + {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views) }} {% elif existing_relation.can_exchange %} {{ log('Replacing existing materialized view ' + target_relation.name) }} - {% call statement('drop existing materialized view') %} - drop view if exists {{ mv_relation }} {{ cluster_clause }} - {% endcall %} + {{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }} {% if should_full_refresh() %} {% call statement('main') -%} {{ get_create_table_as_sql(False, backup_relation, sql) }} @@ -56,12 +65,10 @@ select 1 {%- endcall %} {% endif %} - {% call statement('create new materialized view') %} - {{ clickhouse__create_mv_sql(mv_relation, existing_relation, cluster_clause, sql) }} - {% endcall %} + {{ clickhouse__create_mvs(existing_relation, cluster_clause, views) }} {% else %} {{ log('Replacing existing materialized view ' + target_relation.name) }} - {{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) }} + {{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) }} {% endif %} -- cleanup @@ -78,7 +85,12 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation, mv_relation]}) }} + {% set relations = [target_relation] %} + {% for view in views %} + {{ relations.append(target_relation.derivative('_' + view, 'materialized_view')) }} + {% endfor %} + + {{ return({'relations': relations}) }} {%- endmaterialization -%} @@ -89,30 +101,47 @@ 2. Create a materialized view using the SQL in the model that inserts data into the table creating during step 1 #} -{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql) -%} - {% call statement('create_target_table') %} +{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views) -%} + {% call statement('main') %} {{ get_create_table_as_sql(False, relation, sql) }} {% endcall %} {%- set cluster_clause = on_cluster_clause(relation) -%} {%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%} - {{ clickhouse__create_mv_sql(mv_relation, relation, cluster_clause, sql) }} + {{ clickhouse__create_mvs(relation, cluster_clause, views) }} {%- endmacro %} +{% macro clickhouse__drop_mv(mv_relation, cluster_clause) -%} + drop view if exists {{ mv_relation }} {{ cluster_clause }} +{%- endmacro %}u -{% macro clickhouse__create_mv_sql(mv_relation, target_table, cluster_clause, sql) -%} +{% macro clickhouse__create_mv(mv_relation, target_table, cluster_clause, sql) -%} create materialized view if not exists {{ mv_relation }} {{ cluster_clause }} to {{ target_table }} as {{ sql }} {%- endmacro %} +{% macro clickhouse__drop_mvs(target_relation, cluster_clause, views) -%} + {% for view in views.keys() %} + {%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%} + {% call statement('drop existing mv: ' + view) -%} + {{ clickhouse__drop_mv(mv_relation, cluster_clause) }}; + {% endcall %} + {% endfor %} +{%- endmacro %} + +{% macro clickhouse__create_mvs(target_relation, cluster_clause, views) -%} + {% for view, view_sql in views.items() %} + {%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%} + {% call statement('create existing mv: ' + view) -%} + {{ clickhouse__create_mv(mv_relation, target_relation, cluster_clause, view_sql) }}; + {% endcall %} + {% endfor %} +{%- endmacro %} -{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %} +{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) %} {# drop existing materialized view while we recreate the target table #} {%- set cluster_clause = on_cluster_clause(target_relation) -%} - {%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%} - {% call statement('drop existing mv') -%} - drop view if exists {{ mv_relation }} {{ cluster_clause }} - {%- endcall %} + {{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }} {# recreate the target table #} {% call statement('main') -%} @@ -122,5 +151,5 @@ {{ adapter.rename_relation(intermediate_relation, target_relation) }} {# now that the target table is recreated, we can finally create our new view #} - {{ clickhouse__create_mv_sql(mv_relation, target_relation, cluster_clause, sql) }} + {{ clickhouse__create_mvs(target_relation, cluster_clause, views) }} {% endmacro %} diff --git a/tests/integration/adapter/materialized_view/test_materialized_view.py b/tests/integration/adapter/materialized_view/test_materialized_view.py index ce651ff3..1e9cc99f 100644 --- a/tests/integration/adapter/materialized_view/test_materialized_view.py +++ b/tests/integration/adapter/materialized_view/test_materialized_view.py @@ -15,6 +15,9 @@ 1231,Dade,33,engineering 6666,Ksenia,48,engineering 8888,Kate,50,engineering +1000,Alfie,10,sales +2000,Bill,20,sales +3000,Charlie,30,sales """.lstrip() # This model is parameterized, in a way, by the "run_type" dbt project variable @@ -40,8 +43,7 @@ from {{ source('raw', 'people') }} where department = 'engineering' -{% else %} - +{% elif var('run_type', '') == 'extended_schema' %} select id, name, @@ -55,6 +57,33 @@ from {{ source('raw', 'people') }} where department = 'engineering' +{% elif var('run_type', '') == 'multiple_materialized_views' %} + +--mv1:begin +select + id, + name, + case + when name like 'Dade' then 'crash_override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' +--mv1:end + +union all + +--mv2:begin +select + id, + name, + -- sales people are not cool enough to have a hacker alias + 'N/A' as hacker_alias +from {{ source('raw', 'people') }} +where department = 'sales' +--mv2:end + {% endif %} """ @@ -191,3 +220,78 @@ def test_update_full_refresh(self, project): f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all" ) assert len(result) == 2 + + +class TestMultipleMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MV_MODEL, + } + + def test_create(self, project): + """ + 1. create a base table via dbt seed + 2. create a model as a materialized view, selecting from the table created in (1) + 3. insert data into the base table and make sure it's there in the target table created in (2) + """ + schema = quote_identifier(project.test_schema + "_custom_schema") + results = run_dbt(["seed"]) + assert len(results) == 1 + columns = project.run_sql("DESCRIBE TABLE people", fetch="all") + assert columns[0][1] == "Int32" + + # create the model + run_vars = {"run_type": "multiple_materialized_views"} + run_dbt(["run", "--vars", json.dumps(run_vars)]) + assert len(results) == 1 + + columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv1", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv2", fetch="all") + assert columns[0][1] == "Int32" + + with pytest.raises(Exception): + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all") + + check_relation_types( + project.adapter, + { + "hackers_mv": "view", + "hackers": "table", + }, + ) + + # insert some data and make sure it reaches the target table + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (4000,'Dave',40,'sales'), (9999,'Eugene',40,'engineering'); + """ + ) + + result = project.run_sql(f"select * from {schema}.hackers order by id", fetch="all") + assert result == [ + (1000, 'Alfie', 'N/A'), + (1231, 'Dade', 'crash_override'), + (2000, 'Bill', 'N/A'), + (3000, 'Charlie', 'N/A'), + (4000, 'Dave', 'N/A'), + (6666, 'Ksenia', 'N/A'), + (8888, 'Kate', 'acid burn'), + (9999, 'Eugene', 'N/A'), + ]