From 8106bb1499c4088b938b0fdc2e3b3e984169db20 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 27 Sep 2024 17:18:03 -0400 Subject: [PATCH 1/6] add skeleton test case --- .../unreleased/Fixes-20240927-171725.yaml | 7 +++ .../columns_in_relation_tests/__init__.py | 0 .../test_columns_in_relation.py | 0 .../test_incremental_on_schema_change.py | 63 +++++++++++++++++++ tests/functional/utils.py | 8 +++ 5 files changed, 78 insertions(+) create mode 100644 .changes/unreleased/Fixes-20240927-171725.yaml create mode 100644 tests/functional/columns_in_relation_tests/__init__.py rename tests/functional/{ => columns_in_relation_tests}/test_columns_in_relation.py (100%) create mode 100644 tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py create mode 100644 tests/functional/utils.py diff --git a/.changes/unreleased/Fixes-20240927-171725.yaml b/.changes/unreleased/Fixes-20240927-171725.yaml new file mode 100644 index 000000000..e414b0057 --- /dev/null +++ b/.changes/unreleased/Fixes-20240927-171725.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Fix scenario where dbt attempts to add existing columns to relations when using + the SDK for column metadata +time: 2024-09-27T17:17:25.584838-04:00 +custom: + Author: mikealfare + Issue: "914" diff --git a/tests/functional/columns_in_relation_tests/__init__.py b/tests/functional/columns_in_relation_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/functional/test_columns_in_relation.py b/tests/functional/columns_in_relation_tests/test_columns_in_relation.py similarity index 100% rename from tests/functional/test_columns_in_relation.py rename to tests/functional/columns_in_relation_tests/test_columns_in_relation.py diff --git a/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py b/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py new file mode 100644 index 000000000..c66579285 --- /dev/null +++ b/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py @@ -0,0 +1,63 @@ +from dbt.tests.util import run_dbt +import pytest + +from tests.functional.utils import update_model + + +SEED = """ +column_a,column_b,column_c,column_d +1,thunder,ho,Cheetara +2,THUNDER,HO,Tygra +3,THUNDERCATS,HOOOO,Lion-O +""".strip() + + +MODEL_INITIAL = """ +{{ config( + materialized='incremental', + on_schema_change='sync_all_columns', +) }} +select + column_a, + column_b, + column_c +from {{ ref('my_seed') }} +""" + + +MODEL_UPDATE = """ +{{ config( + materialized='incremental', + on_schema_change='sync_all_columns', +) }} +select + column_b as column_B, + column_c as "COLUMN_C", + column_D +from {{ ref('my_seed') }} +""" + + +class TestIncrementalOnSchemaChange: + """ + This addresses: https://github.com/dbt-labs/dbt-redshift/issues/914 + """ + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"restrict_direct_pg_catalog_access": False}} + + @pytest.fixture(scope="class") + def seeds(self): + return {"my_seed.csv": SEED} + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": MODEL_INITIAL} + + def test_columns_in_relation(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + update_model(project, "my_model", MODEL_UPDATE) + run_dbt(["run"]) + # a successful run is a pass diff --git a/tests/functional/utils.py b/tests/functional/utils.py new file mode 100644 index 000000000..c2fdde9f1 --- /dev/null +++ b/tests/functional/utils.py @@ -0,0 +1,8 @@ +from dbt.tests.util import get_model_file, relation_from_name, set_model_file + + +def update_model(project, name: str, model: str) -> str: + relation = relation_from_name(project.adapter, name) + original_model = get_model_file(project, relation) + set_model_file(project, relation, model) + return original_model From cb62bbba87038281fe4e35626c0aa79a018c75a7 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 27 Sep 2024 18:06:47 -0400 Subject: [PATCH 2/6] changie --- .changes/unreleased/Fixes-20240927-171725.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.changes/unreleased/Fixes-20240927-171725.yaml b/.changes/unreleased/Fixes-20240927-171725.yaml index e414b0057..d417b8801 100644 --- a/.changes/unreleased/Fixes-20240927-171725.yaml +++ b/.changes/unreleased/Fixes-20240927-171725.yaml @@ -1,6 +1,5 @@ kind: Fixes -body: Fix scenario where dbt attempts to add existing columns to relations when using - the SDK for column metadata +body: Fix scenario where dbt attempts to add existing columns to relations when using the SDK for column metadata time: 2024-09-27T17:17:25.584838-04:00 custom: Author: mikealfare From b0c1a0538e400e68e42e97651ef2b04f799e9e12 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 9 Oct 2024 14:13:05 -0400 Subject: [PATCH 3/6] alias dtypes back to legacy method names --- dbt/adapters/redshift/connections.py | 9 +++++ dbt/adapters/redshift/impl.py | 2 +- .../test_columns_in_relation.py | 26 +++----------- .../test_incremental_on_schema_change.py | 34 ++++++++++++------- 4 files changed, 37 insertions(+), 34 deletions(-) diff --git a/dbt/adapters/redshift/connections.py b/dbt/adapters/redshift/connections.py index 8e7ae36d2..e445bdce6 100644 --- a/dbt/adapters/redshift/connections.py +++ b/dbt/adapters/redshift/connections.py @@ -503,6 +503,15 @@ def _parse_column_results(record: Tuple[Any, ...]) -> Dict[str, Any]: char_dtypes = [1, 12] num_dtypes = [2, 3, 4, 5, 6, 7, 8, -5, 2003] + # the results from `get_columns` vary slightly from the pg_catalog tables for dtype names + dtype_alias = { + "bool": "boolean", + "int4": "integer", + "timestamp": "timestamp without time zone", + "varchar": "character varying", + } + dtype_name = dtype_alias.get(dtype_name, dtype_name) + if dtype_code in char_dtypes: return {"column": column_name, "dtype": dtype_name, "char_size": column_size} elif dtype_code in num_dtypes: diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index dd80a7f4e..666dec68b 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -73,7 +73,7 @@ def _behavior_flags(self) -> List[BehaviorFlag]: return [ { "name": "restrict_direct_pg_catalog_access", - "default": False, + "default": True, "description": ( "The dbt-redshift adapter is migrating from using pg_ tables " "to using Redshift Metadata API and information_schema tables " diff --git a/tests/functional/columns_in_relation_tests/test_columns_in_relation.py b/tests/functional/columns_in_relation_tests/test_columns_in_relation.py index 60aeaa2aa..9f9f90a94 100644 --- a/tests/functional/columns_in_relation_tests/test_columns_in_relation.py +++ b/tests/functional/columns_in_relation_tests/test_columns_in_relation.py @@ -15,11 +15,7 @@ def models(self): def setup(self, project): run_dbt(["run"]) - @pytest.fixture(scope="class") - def expected_columns(self): - return [] - - def test_columns_in_relation(self, project, expected_columns): + def test_columns_in_relation(self, project): my_relation = RedshiftRelation.create( database=project.database, schema=project.test_schema, @@ -28,6 +24,10 @@ def test_columns_in_relation(self, project, expected_columns): ) with project.adapter.connection_named("_test"): actual_columns = project.adapter.get_columns_in_relation(my_relation) + expected_columns = [ + Column(column="my_num", dtype="numeric", numeric_precision=3, numeric_scale=2), + Column(column="my_char", dtype="character varying", char_size=1), + ] assert actual_columns == expected_columns @@ -36,24 +36,8 @@ class TestColumnsInRelationBehaviorFlagOff(ColumnsInRelation): def project_config_update(self): return {"flags": {}} - @pytest.fixture(scope="class") - def expected_columns(self): - # the SDK query returns "varchar" whereas our custom query returns "character varying" - return [ - Column(column="my_num", dtype="numeric", numeric_precision=3, numeric_scale=2), - Column(column="my_char", dtype="character varying", char_size=1), - ] - class TestColumnsInRelationBehaviorFlagOn(ColumnsInRelation): @pytest.fixture(scope="class") def project_config_update(self): return {"flags": {"restrict_direct_pg_catalog_access": True}} - - @pytest.fixture(scope="class") - def expected_columns(self): - # the SDK query returns "varchar" whereas our custom query returns "character varying" - return [ - Column(column="my_num", dtype="numeric", numeric_precision=3, numeric_scale=2), - Column(column="my_char", dtype="varchar", char_size=1), - ] diff --git a/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py b/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py index c66579285..35dcadb5d 100644 --- a/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py +++ b/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py @@ -5,36 +5,46 @@ SEED = """ -column_a,column_b,column_c,column_d -1,thunder,ho,Cheetara -2,THUNDER,HO,Tygra -3,THUNDERCATS,HOOOO,Lion-O +id,col7,col6,occurred_at +1,Cheetara,thunder,'2024-01-01' +2,Tygra,thunder,'2024-01-01' +2,Tygra,THUNDER,'2024-02-01' +3,Lion-O,thunder,'2024-01-01' +3,Lion-O,THUNDER,'2024-02-01' +3,Lion-O,THUNDERCATS,'2024-03-01' """.strip() MODEL_INITIAL = """ {{ config( materialized='incremental', - on_schema_change='sync_all_columns', + dist='col6', + on_schema_change='append_new_columns', ) }} select - column_a, - column_b, - column_c + id::bigint as id, + col6::varchar(128) as col6, + occurred_at::timestamptz as occurred_at from {{ ref('my_seed') }} +where occurred_at::timestamptz >= '2024-01-01'::timestamptz +and occurred_at::timestamptz < '2024-02-01'::timestamptz """ MODEL_UPDATE = """ {{ config( materialized='incremental', - on_schema_change='sync_all_columns', + dist='col6', + on_schema_change='append_new_columns', ) }} select - column_b as column_B, - column_c as "COLUMN_C", - column_D + id::bigint as id, + col6::varchar(128) as col6, + occurred_at::timestamptz as occurred_at, + col7::varchar(56) as col7 from {{ ref('my_seed') }} +where occurred_at::timestamptz >= '2024-02-01'::timestamptz +and occurred_at::timestamptz < '2024-03-01'::timestamptz """ From d92c9ad88d1b793ed0a99820616c284df5795ca3 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 9 Oct 2024 14:23:31 -0400 Subject: [PATCH 4/6] add a test case for both settings of the flag --- .../test_incremental_on_schema_change.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py b/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py index 35dcadb5d..a335114bb 100644 --- a/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py +++ b/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py @@ -51,6 +51,9 @@ class TestIncrementalOnSchemaChange: """ This addresses: https://github.com/dbt-labs/dbt-redshift/issues/914 + + We test it with the `restrict_direct_pg_catalog_access` flag both off and on since the bug + only emerges when the flag is on (the former is a control). """ @pytest.fixture(scope="class") @@ -71,3 +74,10 @@ def test_columns_in_relation(self, project): update_model(project, "my_model", MODEL_UPDATE) run_dbt(["run"]) # a successful run is a pass + + +class TestIncrementalOnSchemaChangeFlagOn: + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"restrict_direct_pg_catalog_access": True}} From bef7bcc32787eaa46926d1f3ce5062c706adb358 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 15 Oct 2024 18:01:06 -0400 Subject: [PATCH 5/6] instead of testing schema updates, test data updates --- dbt/adapters/redshift/impl.py | 2 +- .../columns_in_relation_tests/test_columns_in_relation.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index 7e24f6a89..e0cefb989 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -73,7 +73,7 @@ def _behavior_flags(self) -> List[BehaviorFlag]: return [ { "name": "restrict_direct_pg_catalog_access", - "default": True, + "default": False, "description": ( "The dbt-redshift adapter is migrating from using pg_ tables " "to using Redshift Metadata API and information_schema tables " diff --git a/tests/functional/columns_in_relation_tests/test_columns_in_relation.py b/tests/functional/columns_in_relation_tests/test_columns_in_relation.py index 9f9f90a94..246a4c65a 100644 --- a/tests/functional/columns_in_relation_tests/test_columns_in_relation.py +++ b/tests/functional/columns_in_relation_tests/test_columns_in_relation.py @@ -34,7 +34,7 @@ def test_columns_in_relation(self, project): class TestColumnsInRelationBehaviorFlagOff(ColumnsInRelation): @pytest.fixture(scope="class") def project_config_update(self): - return {"flags": {}} + return {"flags": {"restrict_direct_pg_catalog_access": False}} class TestColumnsInRelationBehaviorFlagOn(ColumnsInRelation): From b946cddf171f36d534f1c0ca3fa27a2ec55c601c Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 15 Oct 2024 18:02:07 -0400 Subject: [PATCH 6/6] instead of testing schema updates, test data updates --- .../test_incremental_on_schema_change.py | 83 ----------------- .../test_incremental_updates.py | 92 +++++++++++++++++++ 2 files changed, 92 insertions(+), 83 deletions(-) delete mode 100644 tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py create mode 100644 tests/functional/columns_in_relation_tests/test_incremental_updates.py diff --git a/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py b/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py deleted file mode 100644 index a335114bb..000000000 --- a/tests/functional/columns_in_relation_tests/test_incremental_on_schema_change.py +++ /dev/null @@ -1,83 +0,0 @@ -from dbt.tests.util import run_dbt -import pytest - -from tests.functional.utils import update_model - - -SEED = """ -id,col7,col6,occurred_at -1,Cheetara,thunder,'2024-01-01' -2,Tygra,thunder,'2024-01-01' -2,Tygra,THUNDER,'2024-02-01' -3,Lion-O,thunder,'2024-01-01' -3,Lion-O,THUNDER,'2024-02-01' -3,Lion-O,THUNDERCATS,'2024-03-01' -""".strip() - - -MODEL_INITIAL = """ -{{ config( - materialized='incremental', - dist='col6', - on_schema_change='append_new_columns', -) }} -select - id::bigint as id, - col6::varchar(128) as col6, - occurred_at::timestamptz as occurred_at -from {{ ref('my_seed') }} -where occurred_at::timestamptz >= '2024-01-01'::timestamptz -and occurred_at::timestamptz < '2024-02-01'::timestamptz -""" - - -MODEL_UPDATE = """ -{{ config( - materialized='incremental', - dist='col6', - on_schema_change='append_new_columns', -) }} -select - id::bigint as id, - col6::varchar(128) as col6, - occurred_at::timestamptz as occurred_at, - col7::varchar(56) as col7 -from {{ ref('my_seed') }} -where occurred_at::timestamptz >= '2024-02-01'::timestamptz -and occurred_at::timestamptz < '2024-03-01'::timestamptz -""" - - -class TestIncrementalOnSchemaChange: - """ - This addresses: https://github.com/dbt-labs/dbt-redshift/issues/914 - - We test it with the `restrict_direct_pg_catalog_access` flag both off and on since the bug - only emerges when the flag is on (the former is a control). - """ - - @pytest.fixture(scope="class") - def project_config_update(self): - return {"flags": {"restrict_direct_pg_catalog_access": False}} - - @pytest.fixture(scope="class") - def seeds(self): - return {"my_seed.csv": SEED} - - @pytest.fixture(scope="class") - def models(self): - return {"my_model.sql": MODEL_INITIAL} - - def test_columns_in_relation(self, project): - run_dbt(["seed"]) - run_dbt(["run"]) - update_model(project, "my_model", MODEL_UPDATE) - run_dbt(["run"]) - # a successful run is a pass - - -class TestIncrementalOnSchemaChangeFlagOn: - - @pytest.fixture(scope="class") - def project_config_update(self): - return {"flags": {"restrict_direct_pg_catalog_access": True}} diff --git a/tests/functional/columns_in_relation_tests/test_incremental_updates.py b/tests/functional/columns_in_relation_tests/test_incremental_updates.py new file mode 100644 index 000000000..0416a87e7 --- /dev/null +++ b/tests/functional/columns_in_relation_tests/test_incremental_updates.py @@ -0,0 +1,92 @@ +from dbt.tests.util import run_dbt +import pytest + +from tests.functional.utils import update_model + + +SEED = """ +id,col7,col6,occurred_at +1,a,green,'2024-01-01' +2,b,green,'2024-01-01' +3,c,green,'2024-01-01' +""".strip() + + +SEED_UPDATES = """ +id,col7,col6,occurred_at +4,b,red,'2024-02-01' +5,c,red,'2024-02-01' +6,c,blue,'2024-03-01' +""".strip() + + +MODEL = """ +{{ config(materialized='incremental') }} +select * from {{ ref('my_seed') }} +where occurred_at::timestamptz >= '2024-01-01'::timestamptz +and occurred_at::timestamptz < '2024-02-01'::timestamptz +""" + + +MODEL_UPDATES = """ +{{ config(materialized='incremental') }} +select * from {{ ref('my_seed') }} +where occurred_at::timestamptz >= '2024-02-01'::timestamptz +and occurred_at::timestamptz < '2024-03-01'::timestamptz +""" + + +class TestIncrementalUpdates: + """ + This addresses: https://github.com/dbt-labs/dbt-redshift/issues/914 + + We test it with the `restrict_direct_pg_catalog_access` flag both off and on since the bug + only emerges when the flag is on (the former is a control). + """ + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"restrict_direct_pg_catalog_access": False}} + + @pytest.fixture(scope="class") + def seeds(self): + return {"my_seed.csv": SEED, "my_seed_updates.csv": SEED_UPDATES} + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": MODEL} + + def test_columns_in_relation(self, project): + # create the initial table + run_dbt(["seed"]) + run_dbt(["run"]) + + # verify the table starts with the initial records + sql = ( + f"select count(*) as row_count from {project.database}.{project.test_schema}.my_model" + ) + assert project.run_sql(sql, fetch="one")[0] == 3 + + # move forward in time and pick up records in the source that should generate an incremental + sql = f""" + insert into {project.database}.{project.test_schema}.my_seed + select * from {project.database}.{project.test_schema}.my_seed_updates + """ + project.run_sql(sql) + update_model(project, "my_model", MODEL_UPDATES) + + # apply the incremental + run_dbt(["run"]) + + # verify the new records made it into the table + sql = ( + f"select count(*) as row_count from {project.database}.{project.test_schema}.my_model" + ) + assert project.run_sql(sql, fetch="one")[0] == 5 + + +class TestIncrementalUpdatesFlagOn(TestIncrementalUpdates): + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"restrict_direct_pg_catalog_access": True}}