diff --git a/CHANGELOG.md b/CHANGELOG.md index 09ae63d4..648668de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ### Improvement * Added support for [range_hashed](https://clickhouse.com/docs/en/sql-reference/dictionaries#range_hashed) and [complex_key_range_hashed](https://clickhouse.com/docs/en/sql-reference/dictionaries#complex_key_range_hashed) layouts to the dictionary materialization. ([#361](https://github.com/ClickHouse/dbt-clickhouse/pull/361)) +### New Features +* Added support for the creation of more than one materialized view inserting records into the same target table. ([#360](https://github.com/ClickHouse/dbt-clickhouse/pull/364)) + ### Release [1.8.4], 2024-09-17 ### Improvement * The S3 help macro now support a `role_arn` parameter as an alternative way to provide authentication for S3 based models. Thanks to diff --git a/README.md b/README.md index 99da7ff6..7855fbb2 100644 --- a/README.md +++ b/README.md @@ -246,6 +246,27 @@ no corresponding REFRESH operation). Instead, it acts as an "insert trigger", a (https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/materialized_view/test_materialized_view.py) for an introductory example of how to use this functionality. +Clickhouse provides the ability for more than one materialized view to write records to the same target table. To support this in dbt-clickhouse, you can construct a `UNION` in your model file, such that the SQL for each of your materialized views is wrapped with comments of the form `--my_mv_name:begin` and `--my_mv_name:end`. + +For example the following will build two materialized views both writing data to the same destination table of the model. The names of the materialized views will take the form `_mv1` and `_mv2` : + +``` +--mv1:begin +select a,b,c from {{ source('raw', 'table_1') }} +--mv1:end +union all +--mv2:begin +select a,b,c from {{ source('raw', 'table_2') }} +--mv2:end +``` + +> IMPORTANT! +> +> When updating a model with multiple materialized views (MVs), especially when renaming one of the MV names, dbt-clickhouse does not automatically drop the old MV. Instead, +> you will encounter the following warning: `Warning - Table was detected with the same pattern as model name but was not found in this run. In case it is a renamed mv that was previously part of this model, drop it manually (!!!) ` + + + # Dictionary materializations (experimental) See the tests in https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/dictionary/test_dictionary.py for examples of how to implement materializations for ClickHouse dictionaries diff --git a/dbt/include/clickhouse/macros/materializations/materialized_view.sql b/dbt/include/clickhouse/macros/materializations/materialized_view.sql index 28cb626d..4583918b 100644 --- a/dbt/include/clickhouse/macros/materializations/materialized_view.sql +++ b/dbt/include/clickhouse/macros/materializations/materialized_view.sql @@ -6,7 +6,6 @@ {%- materialization materialized_view, adapter='clickhouse' -%} {%- set target_relation = this.incorporate(type='table') -%} - {%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%} {%- set cluster_clause = on_cluster_clause(target_relation) -%} {# look for an existing relation for the target table and create backup relations if necessary #} @@ -35,16 +34,57 @@ -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} + -- extract the names of the materialized views from the sql + {% set view_names = modules.re.findall('--([^:]+):begin', sql) %} + + -- extract the sql for each of the materialized view into a map + {% set views = {} %} + {% if view_names %} + {% for view_name in view_names %} + {% set view_sql = modules.re.findall('--' + view_name + ':begin(.*)--' + view_name + ':end', sql, flags=modules.re.DOTALL)[0] %} + {%- set _ = views.update({view_name: view_sql}) -%} + {% endfor %} + {% else %} + {%- set _ = views.update({"mv": sql}) -%} + {% endif %} + {% if backup_relation is none %} {{ log('Creating new materialized view ' + target_relation.name )}} - {% call statement('main') -%} - {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql) }} - {%- endcall %} + {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views) }} {% elif existing_relation.can_exchange %} {{ log('Replacing existing materialized view ' + target_relation.name) }} - {% call statement('drop existing materialized view') %} - drop view if exists {{ mv_relation }} {{ cluster_clause }} - {% endcall %} + -- in this section, we look for mvs that has the same pattern as this model, but for some reason, + -- are not listed in the model. This might happen when using multiple mv, and renaming one of the mv in the model. + -- In case such mv found, we raise a warning to the user, that they might need to drop the mv manually. + {{ log('Searching for existing materialized views with the pattern of ' + target_relation.name) }} + {{ log('Views dictionary contents: ' + views | string) }} + + {% set tables_query %} + select table_name + from information_schema.tables + where table_schema = '{{ existing_relation.schema }}' + and table_name like '%{{ target_relation.name }}%' + and table_type = 'VIEW' + {% endset %} + + {% set tables_result = run_query(tables_query) %} + {% if tables_result is not none %} + {% set tables = tables_result.columns[0].values() %} + {{ log('Current mvs found in ClickHouse are: ' + tables | join(', ')) }} + {% set mv_names = [] %} + {% for key in views.keys() %} + {% do mv_names.append(target_relation.name ~ "_" ~ key) %} + {% endfor %} + {{ log('Model mvs to replace ' + mv_names | string) }} + {% for table in tables %} + {% if table not in mv_names %} + {{ log('Warning - Table "' + table + '" was detected with the same pattern as model name "' + target_relation.name + '" but was not found in this run. In case it is a renamed mv that was previously part of this model, drop it manually (!!!)') }} + {% endif %} + {% endfor %} + {% else %} + {{ log('No existing mvs found matching the pattern. continuing..', info=True) }} + {% endif %} + {{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }} {% if should_full_refresh() %} {% call statement('main') -%} {{ get_create_table_as_sql(False, backup_relation, sql) }} @@ -56,12 +96,10 @@ select 1 {%- endcall %} {% endif %} - {% call statement('create new materialized view') %} - {{ clickhouse__create_mv_sql(mv_relation, existing_relation, cluster_clause, sql) }} - {% endcall %} + {{ clickhouse__create_mvs(existing_relation, cluster_clause, views) }} {% else %} {{ log('Replacing existing materialized view ' + target_relation.name) }} - {{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) }} + {{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) }} {% endif %} -- cleanup @@ -78,7 +116,12 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation, mv_relation]}) }} + {% set relations = [target_relation] %} + {% for view in views %} + {{ relations.append(target_relation.derivative('_' + view, 'materialized_view')) }} + {% endfor %} + + {{ return({'relations': relations}) }} {%- endmaterialization -%} @@ -89,30 +132,47 @@ 2. Create a materialized view using the SQL in the model that inserts data into the table creating during step 1 #} -{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql) -%} - {% call statement('create_target_table') %} +{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views) -%} + {% call statement('main') %} {{ get_create_table_as_sql(False, relation, sql) }} {% endcall %} {%- set cluster_clause = on_cluster_clause(relation) -%} {%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%} - {{ clickhouse__create_mv_sql(mv_relation, relation, cluster_clause, sql) }} + {{ clickhouse__create_mvs(relation, cluster_clause, views) }} {%- endmacro %} +{% macro clickhouse__drop_mv(mv_relation, cluster_clause) -%} + drop view if exists {{ mv_relation }} {{ cluster_clause }} +{%- endmacro %}u -{% macro clickhouse__create_mv_sql(mv_relation, target_table, cluster_clause, sql) -%} +{% macro clickhouse__create_mv(mv_relation, target_table, cluster_clause, sql) -%} create materialized view if not exists {{ mv_relation }} {{ cluster_clause }} to {{ target_table }} as {{ sql }} {%- endmacro %} +{% macro clickhouse__drop_mvs(target_relation, cluster_clause, views) -%} + {% for view in views.keys() %} + {%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%} + {% call statement('drop existing mv: ' + view) -%} + {{ clickhouse__drop_mv(mv_relation, cluster_clause) }}; + {% endcall %} + {% endfor %} +{%- endmacro %} + +{% macro clickhouse__create_mvs(target_relation, cluster_clause, views) -%} + {% for view, view_sql in views.items() %} + {%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%} + {% call statement('create existing mv: ' + view) -%} + {{ clickhouse__create_mv(mv_relation, target_relation, cluster_clause, view_sql) }}; + {% endcall %} + {% endfor %} +{%- endmacro %} -{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %} +{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) %} {# drop existing materialized view while we recreate the target table #} {%- set cluster_clause = on_cluster_clause(target_relation) -%} - {%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%} - {% call statement('drop existing mv') -%} - drop view if exists {{ mv_relation }} {{ cluster_clause }} - {%- endcall %} + {{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }} {# recreate the target table #} {% call statement('main') -%} @@ -122,5 +182,6 @@ {{ adapter.rename_relation(intermediate_relation, target_relation) }} {# now that the target table is recreated, we can finally create our new view #} - {{ clickhouse__create_mv_sql(mv_relation, target_relation, cluster_clause, sql) }} + {{ clickhouse__create_mvs(target_relation, cluster_clause, views) }} {% endmacro %} + diff --git a/tests/integration/adapter/materialized_view/test_materialized_view.py b/tests/integration/adapter/materialized_view/test_materialized_view.py index ce651ff3..9c9ffdb6 100644 --- a/tests/integration/adapter/materialized_view/test_materialized_view.py +++ b/tests/integration/adapter/materialized_view/test_materialized_view.py @@ -15,6 +15,9 @@ 1231,Dade,33,engineering 6666,Ksenia,48,engineering 8888,Kate,50,engineering +1000,Alfie,10,sales +2000,Bill,20,sales +3000,Charlie,30,sales """.lstrip() # This model is parameterized, in a way, by the "run_type" dbt project variable @@ -40,8 +43,7 @@ from {{ source('raw', 'people') }} where department = 'engineering' -{% else %} - +{% elif var('run_type', '') == 'extended_schema' %} select id, name, @@ -58,6 +60,73 @@ {% endif %} """ +MULTIPLE_MV_MODEL = """ +{{ config( + materialized='materialized_view', + engine='MergeTree()', + order_by='(id)', + schema='custom_schema_for_multiple_mv', +) }} + +{% if var('run_type', '') == '' %} + +--mv1:begin +select + id, + name, + case + when name like 'Dade' then 'crash_override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' +--mv1:end + +union all + +--mv2:begin +select + id, + name, + -- sales people are not cool enough to have a hacker alias + 'N/A' as hacker_alias +from {{ source('raw', 'people') }} +where department = 'sales' +--mv2:end + +{% elif var('run_type', '') == 'extended_schema' %} + +--mv1:begin +select + id, + name, + case + -- Dade wasn't always known as 'crash override'! + when name like 'Dade' and age = 11 then 'zero cool' + when name like 'Dade' and age != 11 then 'crash override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' +--mv1:end + +union all + +--mv2:begin +select + id, + name, + -- sales people are not cool enough to have a hacker alias + 'N/A' as hacker_alias +from {{ source('raw', 'people') }} +where department = 'sales' +--mv2:end + +{% endif %} +""" + SEED_SCHEMA_YML = """ version: 2 @@ -191,3 +260,148 @@ def test_update_full_refresh(self, project): f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all" ) assert len(result) == 2 + + +class TestMultipleMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MULTIPLE_MV_MODEL, + } + + def test_create(self, project): + """ + 1. create a base table via dbt seed + 2. create a model as a materialized view, selecting from the table created in (1) + 3. insert data into the base table and make sure it's there in the target table created in (2) + """ + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + results = run_dbt(["seed"]) + assert len(results) == 1 + columns = project.run_sql("DESCRIBE TABLE people", fetch="all") + assert columns[0][1] == "Int32" + + # create the model + run_dbt(["run"]) + assert len(results) == 1 + + columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv1", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv2", fetch="all") + assert columns[0][1] == "Int32" + + with pytest.raises(Exception): + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all") + + check_relation_types( + project.adapter, + { + "hackers_mv": "view", + "hackers": "table", + }, + ) + + # insert some data and make sure it reaches the target table + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (4000,'Dave',40,'sales'), (9999,'Eugene',40,'engineering'); + """ + ) + + result = project.run_sql(f"select * from {schema}.hackers order by id", fetch="all") + assert result == [ + (1000, 'Alfie', 'N/A'), + (1231, 'Dade', 'crash_override'), + (2000, 'Bill', 'N/A'), + (3000, 'Charlie', 'N/A'), + (4000, 'Dave', 'N/A'), + (6666, 'Ksenia', 'N/A'), + (8888, 'Kate', 'acid burn'), + (9999, 'Eugene', 'N/A'), + ] + + +class TestUpdateMultipleMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MULTIPLE_MV_MODEL, + } + + def test_update_incremental(self, project): + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + # create our initial materialized view + run_dbt(["seed"]) + run_dbt() + + # re-run dbt but this time with the new MV SQL + run_vars = {"run_type": "extended_schema"} + run_dbt(["run", "--vars", json.dumps(run_vars)]) + + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + # assert that we now have both of Dade's aliases in our hackers table + result = project.run_sql( + f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias", + fetch="all", + ) + assert len(result) == 2 + assert result[0][0] == "crash_override" + assert result[1][0] == "zero cool" + + def test_update_full_refresh(self, project): + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + # create our initial materialized view + run_dbt(["seed"]) + run_dbt() + + # re-run dbt but this time with the new MV SQL + run_vars = {"run_type": "extended_schema"} + run_dbt(["run", "--full-refresh", "--vars", json.dumps(run_vars)]) + + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + # assert that we now have both of Dade's aliases in our hackers table + result = project.run_sql( + f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias", + fetch="all", + ) + print(result) + assert len(result) == 2 + assert result[0][0] == "crash override" + assert result[1][0] == "zero cool"