From 717c2758aa5ea15fe14ad34ea2849165f19eaf75 Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Mon, 7 Aug 2023 17:58:16 +0200 Subject: [PATCH] Add an option to use INFORMATION_SCHEMA for partition info retrieval --- .../unreleased/Features-20230807-235539.yaml | 6 + dbt/adapters/bigquery/connections.py | 9 +- .../bigquery/relation_configs/_partition.py | 1 + dbt/include/bigquery/macros/etc.sql | 7 + .../incremental_strategy/common.sql | 62 ++++- .../incremental_strategy/insert_overwrite.sql | 26 +- .../test_incremental_partition_information.py | 244 ++++++++++++++++++ tests/unit/test_bigquery_adapter.py | 12 + 8 files changed, 352 insertions(+), 15 deletions(-) create mode 100644 .changes/unreleased/Features-20230807-235539.yaml create mode 100644 tests/functional/adapter/incremental/test_incremental_partition_information.py diff --git a/.changes/unreleased/Features-20230807-235539.yaml b/.changes/unreleased/Features-20230807-235539.yaml new file mode 100644 index 000000000..0fbde028f --- /dev/null +++ b/.changes/unreleased/Features-20230807-235539.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add an option to use INFORMATION_SCHEMA for partition info retrieval +time: 2023-08-07T23:55:39.31409+02:00 +custom: + Author: Kayrnt + Issue: "867" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index de84e4bf8..2667fdc26 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -621,14 +621,11 @@ def _bq_job_link(location, project_id, job_id) -> str: return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults" def get_partitions_metadata(self, table): - def standard_to_legacy(table): - return table.project + ":" + table.dataset + "." + table.identifier + query_sql = f"SELECT * FROM `{table.project}.{table.dataset}.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '{table.identifier}'" - legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" - - sql = self._add_query_comment(legacy_sql) + sql = self._add_query_comment(query_sql) # auto_begin is ignored on bigquery, and only included for consistency - _, iterator = self.raw_execute(sql, use_legacy_sql=True) + _, iterator = self.raw_execute(sql) return self.get_table_from_response(iterator) def copy_bq_table(self, source, destination, write_disposition): diff --git a/dbt/adapters/bigquery/relation_configs/_partition.py b/dbt/adapters/bigquery/relation_configs/_partition.py index 094e4f1c4..2b390fcc9 100644 --- a/dbt/adapters/bigquery/relation_configs/_partition.py +++ b/dbt/adapters/bigquery/relation_configs/_partition.py @@ -16,6 +16,7 @@ class PartitionConfig(dbtClassMixin): range: Optional[Dict[str, Any]] = None time_ingestion_partitioning: bool = False copy_partitions: bool = False + partition_information: str = "model" PARTITION_DATE = "_PARTITIONDATE" PARTITION_TIME = "_PARTITIONTIME" diff --git a/dbt/include/bigquery/macros/etc.sql b/dbt/include/bigquery/macros/etc.sql index 59b61473e..0b4ba015f 100644 --- a/dbt/include/bigquery/macros/etc.sql +++ b/dbt/include/bigquery/macros/etc.sql @@ -6,6 +6,13 @@ {% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %} {% endmacro %} +{# + This macro returns the partition metadata for provided table. + The expected input is a table object (ie through a `source` or `ref`). + The output contains the result from partitions information for your input table. + The details of the retrieved columns can be found on https://cloud.google.com/bigquery/docs/managing-partitioned-tables + It will leverage the INFORMATION_SCHEMA.PARTITIONS table. +#} {%- macro get_partitions_metadata(table) -%} {%- if execute -%} {%- set res = adapter.get_partitions_metadata(table) -%} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql index 9d71ba7c0..6992e4bd2 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql @@ -2,12 +2,68 @@ {#-- TODO: revisit partitioning with python models --#} {%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%} + {%- if partition_by.partition_information == "information_schema" -%} + {{ dbt_max_partition_from_information_schema_data_sql(relation, partition_by) }} + {%- else -%} + {{ dbt_max_partition_from_model_data_sql(relation, partition_by) }} + {%- endif -%} - declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( - select max({{ partition_by.field }}) from {{ this }} + {%- endif -%} + +{% endmacro %} + +{% macro dbt_max_partition_from_model_data_sql(relation, partition_by) %} + declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( + select max({{ partition_by.field }}) from {{ relation }} where {{ partition_by.field }} is not null - ); + ); +{% endmacro %} + +{% macro max_partition_wrapper(field) %} + MAX({{ field }}) AS max_partition +{% endmacro %} + +{% macro array_distinct_partition_wrapper(field) %} + as struct + -- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null + array_agg(distinct {{ field }} IGNORE NULLS) +{% endmacro %} + +{% macro dbt_max_partition_from_information_schema_data_sql(relation, partition_by) %} + declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( + {{ partition_from_information_schema_data_sql(relation, partition_by, max_partition_wrapper) }} + ); +{% endmacro %} + +{% macro partition_from_model_data_sql(relation, partition_by, field_function) %} + select {{ field_function(partition_by.render_wrapped()) }} + from {{ relation }} +{% endmacro %} + +{% macro partition_from_information_schema_data_sql(relation, partition_by, field_function) %} + {%- set data_type = partition_by.data_type -%} + {%- set granularity = partition_by.granularity -%} + + {# Format partition_id to match the declared variable type #} + {%- if data_type | lower in ('date', 'timestamp', 'datetime') -%} + {# Datetime using time partitioning require timestamp #} + {%- if partition_by.time_ingestion_partitioning and partition_by.data_type == 'datetime' -%} + {%- set data_type = 'timestamp' -%} + {%- endif -%} + {%- if granularity == "day" -%} + {%- set format = "%Y%m%d" -%} + {%- else -%} + {%- set format = "%Y%m%d%H" -%} + {%- endif -%} + {%- set field = "parse_" ~ data_type ~ "('" ~ format ~ "', partition_id)" -%} + {%- else -%} + {%- set field = "CAST(partition_id AS INT64)" -%} {%- endif -%} + SELECT {{ field_function(field) }} + FROM `{{relation.project}}.{{relation.dataset}}.INFORMATION_SCHEMA.PARTITIONS` + WHERE TABLE_NAME = '{{relation.identifier}}' + AND NOT(STARTS_WITH(partition_id, "__")) + {% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 4c22fd376..bcf19df14 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -107,8 +107,7 @@ {%- endcall %} {%- endif -%} {%- set partitions_sql -%} - select distinct {{ partition_by.render_wrapped() }} - from {{ tmp_relation }} + {{ bq_dynamic_copy_partitions_affected_partitions_sql(tmp_relation, partition_by) }} {%- endset -%} {%- set partitions = run_query(partitions_sql).columns[0].values() -%} {# We copy the partitions #} @@ -117,6 +116,19 @@ drop table if exists {{ tmp_relation }} {% endmacro %} +{% macro distinct_partition_wrapper(field) %} + distinct {{ field }} AS partition_ids +{% endmacro %} + +{% macro bq_dynamic_copy_partitions_affected_partitions_sql(tmp_relation, partition_by) %} +{% if partition_by.partition_information == "information_schema" %} + {{ partition_from_information_schema_data_sql(tmp_relation, partition_by, distinct_partition_wrapper) }} +{% else %} + select distinct {{ partition_by.render_wrapped() }} + from {{ tmp_relation }} +{% endif %} +{% endmacro %} + {% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %} {%- if copy_partitions is true %} {{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} @@ -149,10 +161,12 @@ -- 2. define partitions to update set (dbt_partitions_for_replacement) = ( - select as struct - -- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null - array_agg(distinct {{ partition_field }} IGNORE NULLS) - from {{ tmp_relation }} + {%- if partition_by.partition_information == "information_schema" -%} + {{ partition_from_information_schema_data_sql(tmp_relation, partition_by, array_distinct_partition_wrapper) }} + {%- else -%} + {# TODO fix datetime case to render_wrapped with timestamp #} + {{ partition_from_model_data_sql(tmp_relation, partition_by, array_distinct_partition_wrapper) }} + {%- endif -%} ); -- 3. run the merge statement diff --git a/tests/functional/adapter/incremental/test_incremental_partition_information.py b/tests/functional/adapter/incremental/test_incremental_partition_information.py new file mode 100644 index 000000000..4efc20d3b --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_partition_information.py @@ -0,0 +1,244 @@ +import pytest + +from dbt.tests.adapter.simple_seed.test_seed import SeedConfigBase + +from dbt.tests.adapter.incremental.fixtures import ( + _MODELS__A, +) + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + + +class TestIncrementalPartitionInformation: + pass + + +def incremental_model_int64_partition_information(partition_information: str): + return """ + {{ + config( + materialized='incremental', + unique_key='id', + partition_by={ + "field": "id", + "data_type": "int64", + "partition_information": "{partition_information}", + "range": { + "start": 1, + "end": 7, + "interval": 1 + }, + "copy_partitions": true + }, + incremental_strategy='insert_overwrite' + ) + }} + + WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + + {% set string_type = 'string' %} + + {% if is_incremental() %} + + SELECT id, + cast(field1 as {{string_type}}) as field1, + + FROM source_data WHERE id > _dbt_max_partition + + {% else %} + + SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2 + + FROM source_data WHERE id <= 3 + + {% endif %} + """.replace( + "{partition_information}", partition_information + ) + + +def incremental_model_time_ingestion_partition_information(partition_information: str): + return """ + {{ + config( + materialized='incremental', + unique_key='id', + partition_by={ + "field": "date_hour", + "data_type": "datetime", + "partition_information": "{partition_information}", + "granularity": "hour", + "time_ingestion_partitioning": true + }, + incremental_strategy='insert_overwrite' + ) + }} + + with data as ( + + {% if not is_incremental() %} + + select 1 as id, + cast('2020-01-01 01:00:00' as datetime) as date_hour, + 2 as field_2 union all + select 2 as id, + cast('2020-01-01 01:00:00' as datetime) as date_hour, + 2 as field_2 union all + select 3 as id, + cast('2020-01-01 01:00:00' as datetime) as date_hour, + 2 as field_2 union all + select 4 as id, + cast('2020-01-01 01:00:00' as datetime) as date_hour, + 2 as field_2 + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 01:00:00 partition + -- with the 2 records below, but add two more in the 2020-01-00 02:00:00 partition + select 10 as id, + cast('2020-01-01 01:00:00' as datetime) as date_hour, + 2 as field_2 union all + select 20 as id, + cast('2020-01-01 01:00:00' as datetime) as date_hour, + 2 as field_2 union all + select 30 as id, + cast('2020-01-01 02:00:00' as datetime) as date_hour, + 2 as field_2 union all + select 40 as id, + cast('2020-01-01 02:00:00' as datetime) as date_hour, + 2 as field_2 + + {% endif %} + + ) + + select * from data + """.replace( + "{partition_information}", partition_information + ) + + +def incremental_model_date_partition_information(partition_information: str): + return """ + { + config( + materialized='incremental', + unique_key='id', + partition_by={ + "field": "date", + "data_type": "date", + "partition_information": "{partition_information}", + "granularity": "day", + }, + incremental_strategy='insert_overwrite' + ) + } + + with data as ( + + {% if not is_incremental() %} + + select 1 as id, + cast('2020-01-01' as date) as date, + 1 as field_1, + 2 as field_2 union all + select 2 as id, + cast('2020-01-01' as date) as date, + 1 as field_1, + 2 as field_2 union all + select 3 as id, + cast('2020-01-01' as date) as date, + 1 as field_1, + 2 as field_2 union all + select 4 as id, + cast('2020-01-01' as date) as date, + 1 as field_1, + 2 as field_2 + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 01:00:00 partition + -- with the 2 records below, but add two more in the 2020-01-00 02:00:00 partition + select 10 as id, + cast('2020-01-01' as date) as date, + 2 as field_2 union all + select 20 as id, + cast('2020-01-01' as date) as date, + 2 as field_2 union all + select 30 as id, + cast('2020-01-02' as date) as date, + 2 as field_2 union all + select 40 as id, + cast('2020-01-02' as date) as date, + 2 as field_2 + + {% endif %} + + ) + + select * from data + """.replace( + "{partition_information}", partition_information + ) + + +class TestIncrementalPartitionInformationBigQuerySpecific(SeedConfigBase): + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": _MODELS__A, + "incremental_model_int64_partition_information_information_schema.sql": incremental_model_int64_partition_information( + "information_schema" + ), + "incremental_model_int64_partition_information_model.sql": incremental_model_int64_partition_information( + "model" + ), + "incremental_model_time_ingestion_partition_information_information_schema.sql": incremental_model_time_ingestion_partition_information( + "information_schema" + ), + "incremental_model_time_ingestion_partition_information_model.sql": incremental_model_time_ingestion_partition_information( + "model" + ), + } + + def run_twice_and_assert( + self, include, compare_source, compare_target, project, expected_model_count=3 + ): + # dbt run (twice) + run_args = ["run"] + if include: + run_args.extend(("--select", include)) + results_one = run_dbt(run_args) + assert len(results_one) == expected_model_count + + results_two = run_dbt(run_args) + assert len(results_two) == expected_model_count + + check_relations_equal(project.adapter, [compare_source, compare_target]) + + def test_run_incremental_model_int64_partition_information(self, project): + select = ( + "model_a incremental_model_int64_partition_information_information_schema " + "incremental_model_int64_partition_information_model" + ) + compare_source = "incremental_model_int64_partition_information_information_schema" + compare_target = "incremental_model_int64_partition_information_model" + self.run_twice_and_assert(select, compare_source, compare_target, project) + + def test_run_incremental_model_time_ingestion_partition_information(self, project): + select = ( + "incremental_model_time_ingestion_partition_information_information_schema " + "incremental_model_time_ingestion_partition_information_model" + ) + compare_source = ( + "incremental_model_time_ingestion_partition_information_information_schema" + ) + compare_target = "incremental_model_time_ingestion_partition_information_model" + self.run_twice_and_assert( + select, compare_source, compare_target, project, expected_model_count=2 + ) diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index 926547e10..282e58ec8 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -572,6 +572,7 @@ def test_parse_partition_by(self): "granularity": "day", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -588,6 +589,7 @@ def test_parse_partition_by(self): "granularity": "day", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -601,6 +603,7 @@ def test_parse_partition_by(self): "granularity": "month", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -614,6 +617,7 @@ def test_parse_partition_by(self): "granularity": "year", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -627,6 +631,7 @@ def test_parse_partition_by(self): "granularity": "hour", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -640,6 +645,7 @@ def test_parse_partition_by(self): "granularity": "month", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -653,6 +659,7 @@ def test_parse_partition_by(self): "granularity": "year", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -666,6 +673,7 @@ def test_parse_partition_by(self): "granularity": "hour", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -679,6 +687,7 @@ def test_parse_partition_by(self): "granularity": "month", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -692,6 +701,7 @@ def test_parse_partition_by(self): "granularity": "year", "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, ) @@ -705,6 +715,7 @@ def test_parse_partition_by(self): "granularity": "day", "time_ingestion_partitioning": True, "copy_partitions": True, + "partition_information": "model", }, ) @@ -728,6 +739,7 @@ def test_parse_partition_by(self): "range": {"start": 1, "end": 100, "interval": 20}, "time_ingestion_partitioning": False, "copy_partitions": False, + "partition_information": "model", }, )