diff --git a/.changes/unreleased/Features-20240925-232238.yaml b/.changes/unreleased/Features-20240925-232238.yaml new file mode 100644 index 000000000..903884196 --- /dev/null +++ b/.changes/unreleased/Features-20240925-232238.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add Microbatch Strategy to dbt-spark +time: 2024-09-25T23:22:38.216277+01:00 +custom: + Author: michelleark + Issue: "1354" diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 3908bedc2..935280d63 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -4,12 +4,16 @@ {% set invalid_strategy_msg -%} Invalid incremental strategy provided: {{ strategy }} - Expected one of: 'merge', 'insert_overwrite' + Expected one of: 'merge', 'insert_overwrite', 'microbatch' {%- endset %} - {% if strategy not in ['merge', 'insert_overwrite'] %} + {% if strategy not in ['merge', 'insert_overwrite', 'microbatch'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {% endif %} + {% if strategy == 'microbatch' %} + {% do bq_validate_microbatch_config(config) %} + {% endif %} + {% do return(strategy) %} {% endmacro %} @@ -48,8 +52,13 @@ tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions ) %} - {% else %} {# strategy == 'merge' #} + {% elif strategy == 'microbatch' %} + {% set build_sql = bq_generate_microbatch_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + ) %} + + {% else %} {# strategy == 'merge' #} {% set build_sql = bq_generate_incremental_merge_build_sql( tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_predicates ) %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql new file mode 100644 index 000000000..d4c4b7453 --- /dev/null +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql @@ -0,0 +1,28 @@ +{% macro bq_validate_microbatch_config(config) %} + {% if config.get("partition_by") is none %} + {% set missing_partition_msg -%} + The 'microbatch' strategy requires a `partition_by` config. + {%- endset %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} + + {% if config.get("partition_by").granularity != config.get('batch_size') %} + {% set invalid_partition_by_granularity_msg -%} + The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`. + Got: + `batch_size`: {{ config.get('batch_size') }} + `partition_by.granularity`: {{ config.get("partition_by").granularity }} + {%- endset %} + {% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %} + {% endif %} +{% endmacro %} + +{% macro bq_generate_microbatch_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions +) %} + {% set build_sql = bq_insert_overwrite_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + ) %} + + {{ return(build_sql) }} +{% endmacro %} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index 17391b48d..02efbb6c2 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -555,3 +555,59 @@ select * from data """.lstrip() + +microbatch_model_no_unique_id_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + partition_by={ + 'field': 'event_time', + 'data_type': 'timestamp', + 'granularity': 'day' + }, + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) + ) +}} +select * from {{ ref('input_model') }} +""" + +microbatch_input_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + +microbatch_model_no_partition_by_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) + ) +}} +select * from {{ ref('input_model') }} +""" + + +microbatch_model_invalid_partition_by_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), + partition_by={ + 'field': 'event_time', + 'data_type': 'timestamp', + 'granularity': 'hour' + } + ) +}} +select * from {{ ref('input_model') }} +""" diff --git a/tests/functional/adapter/incremental/test_incremental_microbatch.py b/tests/functional/adapter/incremental/test_incremental_microbatch.py new file mode 100644 index 000000000..d1bbbcea3 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_microbatch.py @@ -0,0 +1,55 @@ +import os +import pytest +from unittest import mock + +from dbt.tests.util import run_dbt_and_capture +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, + patch_microbatch_end_time, +) + +from tests.functional.adapter.incremental.incremental_strategy_fixtures import ( + microbatch_model_no_unique_id_sql, + microbatch_input_sql, + microbatch_model_no_partition_by_sql, + microbatch_model_invalid_partition_by_sql, +) + + +class TestBigQueryMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return microbatch_model_no_unique_id_sql + + +class TestBigQueryMicrobatchMissingPartitionBy: + @pytest.fixture(scope="class") + def models(self) -> str: + return { + "microbatch.sql": microbatch_model_no_partition_by_sql, + "input_model.sql": microbatch_input_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_execution_failure_no_partition_by(self, project): + with patch_microbatch_end_time("2020-01-03 13:57:00"): + _, stdout = run_dbt_and_capture(["run"], expect_pass=False) + assert "The 'microbatch' strategy requires a `partition_by` config" in stdout + + +class TestBigQueryMicrobatchInvalidPartitionByGranularity: + @pytest.fixture(scope="class") + def models(self) -> str: + return { + "microbatch.sql": microbatch_model_invalid_partition_by_sql, + "input_model.sql": microbatch_input_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_execution_failure_no_partition_by(self, project): + with patch_microbatch_end_time("2020-01-03 13:57:00"): + _, stdout = run_dbt_and_capture(["run"], expect_pass=False) + assert ( + "The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`" + in stdout + )