Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple materialized views to write to same target (#280) #364

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
### Improvement
* Added support for [range_hashed](https://clickhouse.com/docs/en/sql-reference/dictionaries#range_hashed) and [complex_key_range_hashed](https://clickhouse.com/docs/en/sql-reference/dictionaries#complex_key_range_hashed) layouts to the dictionary materialization. ([#361](https://github.com/ClickHouse/dbt-clickhouse/pull/361))

### New Features
* Added support for the creation of more than one materialized view inserting records into the same target table. ([#360](https://github.com/ClickHouse/dbt-clickhouse/pull/364))

### Release [1.8.4], 2024-09-17
### Improvement
* The S3 help macro now support a `role_arn` parameter as an alternative way to provide authentication for S3 based models. Thanks to
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,21 @@ no corresponding REFRESH operation). Instead, it acts as an "insert trigger", a
(https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/materialized_view/test_materialized_view.py) for an introductory example
of how to use this functionality.

Clickhouse provides the ability for more than one materialized view to write records to the same target table. To support this in dbt-clickhouse, you can construct a `UNION` in your model file, such that the SQL for each of your materialized views is wrapped with comments of the form `--my_mv_name:begin` and `--my_mv_name:end`.

For example the following will build two materialized views both writing data to the same destination table of the model. The names of the materialized views will take the form `<model_name>_mv1` and `<model_name>_mv2` :

```
--mv1:begin
select a,b,c from {{ source('raw', 'table_1') }}
--mv1:end
union all
--mv2:begin
select a,b,c from {{ source('raw', 'table_2') }}
--mv2:end
```


# Dictionary materializations (experimental)
See the tests in https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/dictionary/test_dictionary.py for examples of how to
implement materializations for ClickHouse dictionaries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
{%- materialization materialized_view, adapter='clickhouse' -%}

{%- set target_relation = this.incorporate(type='table') -%}
{%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}

{# look for an existing relation for the target table and create backup relations if necessary #}
Expand Down Expand Up @@ -35,16 +34,26 @@
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- extract the names of the materialized views from the sql
{% set view_names = modules.re.findall('--([^:]+):begin', sql) %}

-- extract the sql for each of the materialized view into a map
{% set views = {} %}
{% if view_names %}
{% for view_name in view_names %}
{% set view_sql = modules.re.findall('--' + view_name + ':begin(.*)--' + view_name + ':end', sql, flags=modules.re.DOTALL)[0] %}
{%- set _ = views.update({view_name: view_sql}) -%}
{% endfor %}
{% else %}
{%- set _ = views.update({"mv": sql}) -%}
{% endif %}

{% if backup_relation is none %}
{{ log('Creating new materialized view ' + target_relation.name )}}
{% call statement('main') -%}
{{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql) }}
{%- endcall %}
{{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views) }}
{% elif existing_relation.can_exchange %}
{{ log('Replacing existing materialized view ' + target_relation.name) }}
{% call statement('drop existing materialized view') %}
drop view if exists {{ mv_relation }} {{ cluster_clause }}
{% endcall %}
{{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }}
{% if should_full_refresh() %}
{% call statement('main') -%}
{{ get_create_table_as_sql(False, backup_relation, sql) }}
Expand All @@ -56,12 +65,10 @@
select 1
{%- endcall %}
{% endif %}
{% call statement('create new materialized view') %}
{{ clickhouse__create_mv_sql(mv_relation, existing_relation, cluster_clause, sql) }}
{% endcall %}
{{ clickhouse__create_mvs(existing_relation, cluster_clause, views) }}
{% else %}
{{ log('Replacing existing materialized view ' + target_relation.name) }}
{{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) }}
{{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) }}
{% endif %}

-- cleanup
Expand All @@ -78,7 +85,12 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation, mv_relation]}) }}
{% set relations = [target_relation] %}
{% for view in views %}
{{ relations.append(target_relation.derivative('_' + view, 'materialized_view')) }}
{% endfor %}

{{ return({'relations': relations}) }}

{%- endmaterialization -%}

Expand All @@ -89,30 +101,47 @@
2. Create a materialized view using the SQL in the model that inserts
data into the table creating during step 1
#}
{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql) -%}
{% call statement('create_target_table') %}
{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views) -%}
{% call statement('main') %}
{{ get_create_table_as_sql(False, relation, sql) }}
{% endcall %}
{%- set cluster_clause = on_cluster_clause(relation) -%}
{%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%}
{{ clickhouse__create_mv_sql(mv_relation, relation, cluster_clause, sql) }}
{{ clickhouse__create_mvs(relation, cluster_clause, views) }}
{%- endmacro %}

{% macro clickhouse__drop_mv(mv_relation, cluster_clause) -%}
drop view if exists {{ mv_relation }} {{ cluster_clause }}
{%- endmacro %}u

{% macro clickhouse__create_mv_sql(mv_relation, target_table, cluster_clause, sql) -%}
{% macro clickhouse__create_mv(mv_relation, target_table, cluster_clause, sql) -%}
create materialized view if not exists {{ mv_relation }} {{ cluster_clause }}
to {{ target_table }}
as {{ sql }}
{%- endmacro %}

{% macro clickhouse__drop_mvs(target_relation, cluster_clause, views) -%}
{% for view in views.keys() %}
{%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%}
{% call statement('drop existing mv: ' + view) -%}
{{ clickhouse__drop_mv(mv_relation, cluster_clause) }};
{% endcall %}
{% endfor %}
{%- endmacro %}

{% macro clickhouse__create_mvs(target_relation, cluster_clause, views) -%}
{% for view, view_sql in views.items() %}
{%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%}
{% call statement('create existing mv: ' + view) -%}
{{ clickhouse__create_mv(mv_relation, target_relation, cluster_clause, view_sql) }};
{% endcall %}
{% endfor %}
{%- endmacro %}

{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %}
{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) %}
{# drop existing materialized view while we recreate the target table #}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}
{%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%}
{% call statement('drop existing mv') -%}
drop view if exists {{ mv_relation }} {{ cluster_clause }}
{%- endcall %}
{{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }}

{# recreate the target table #}
{% call statement('main') -%}
Expand All @@ -122,5 +151,5 @@
{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{# now that the target table is recreated, we can finally create our new view #}
{{ clickhouse__create_mv_sql(mv_relation, target_relation, cluster_clause, sql) }}
{{ clickhouse__create_mvs(target_relation, cluster_clause, views) }}
{% endmacro %}
108 changes: 106 additions & 2 deletions tests/integration/adapter/materialized_view/test_materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
1231,Dade,33,engineering
6666,Ksenia,48,engineering
8888,Kate,50,engineering
1000,Alfie,10,sales
2000,Bill,20,sales
3000,Charlie,30,sales
""".lstrip()

# This model is parameterized, in a way, by the "run_type" dbt project variable
Expand All @@ -40,8 +43,7 @@
from {{ source('raw', 'people') }}
where department = 'engineering'

{% else %}

{% elif var('run_type', '') == 'extended_schema' %}
select
id,
name,
Expand All @@ -55,6 +57,33 @@
from {{ source('raw', 'people') }}
where department = 'engineering'

{% elif var('run_type', '') == 'multiple_materialized_views' %}

--mv1:begin
select
id,
name,
case
when name like 'Dade' then 'crash_override'
when name like 'Kate' then 'acid burn'
else 'N/A'
end as hacker_alias
from {{ source('raw', 'people') }}
where department = 'engineering'
--mv1:end

union all

--mv2:begin
select
id,
name,
-- sales people are not cool enough to have a hacker alias
'N/A' as hacker_alias
from {{ source('raw', 'people') }}
where department = 'sales'
--mv2:end

{% endif %}
"""

Expand Down Expand Up @@ -191,3 +220,78 @@ def test_update_full_refresh(self, project):
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
assert len(result) == 2


class TestMultipleMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
we need a base table to pull from
"""
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MV_MODEL,
}

def test_create(self, project):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also test for updates and full refresh?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think if you change the model to add or rename MVs, then old ones will be left running, which is incorrect.

"""
1. create a base table via dbt seed
2. create a model as a materialized view, selecting from the table created in (1)
3. insert data into the base table and make sure it's there in the target table created in (2)
"""
schema = quote_identifier(project.test_schema + "_custom_schema")
results = run_dbt(["seed"])
assert len(results) == 1
columns = project.run_sql("DESCRIBE TABLE people", fetch="all")
assert columns[0][1] == "Int32"

# create the model
run_vars = {"run_type": "multiple_materialized_views"}
run_dbt(["run", "--vars", json.dumps(run_vars)])
assert len(results) == 1

columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all")
assert columns[0][1] == "Int32"

columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv1", fetch="all")
assert columns[0][1] == "Int32"

columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv2", fetch="all")
assert columns[0][1] == "Int32"

with pytest.raises(Exception):
columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all")

check_relation_types(
project.adapter,
{
"hackers_mv": "view",
"hackers": "table",
},
)

# insert some data and make sure it reaches the target table
project.run_sql(
f"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (4000,'Dave',40,'sales'), (9999,'Eugene',40,'engineering');
"""
)

result = project.run_sql(f"select * from {schema}.hackers order by id", fetch="all")
assert result == [
(1000, 'Alfie', 'N/A'),
(1231, 'Dade', 'crash_override'),
(2000, 'Bill', 'N/A'),
(3000, 'Charlie', 'N/A'),
(4000, 'Dave', 'N/A'),
(6666, 'Ksenia', 'N/A'),
(8888, 'Kate', 'acid burn'),
(9999, 'Eugene', 'N/A'),
]