From f5d8cc8ad3a10ac716c48ff09c0e502c17f67a17 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Mon, 6 Feb 2023 12:01:48 +0100 Subject: [PATCH] chore: upgrade to dbt core v1.4.0 (#134) Co-authored-by: Jeremy Guiselin --- .github/workflows/ci.yml | 2 +- Makefile | 3 +- README.md | 4 +-- dbt/adapters/athena/__version__.py | 2 +- dbt/adapters/athena/connections.py | 6 ++-- dbt/adapters/athena/impl.py | 8 ++--- dbt/adapters/athena/relation.py | 4 +-- dev-requirements.txt | 2 +- requirements.txt | 2 +- setup.py | 6 ++-- tests/conftest.py | 25 ++++++++++++++ tests/unit/test_adapter.py | 54 +++++++++++------------------- 12 files changed, 64 insertions(+), 54 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ec0deacf..39b2feb7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ "3.7", "3.8", "3.9", "3.10" ] + python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11" ] steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} diff --git a/Makefile b/Makefile index fa2a6dc0..8d93e130 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,7 @@ CHANGED_FILES_IN_BRANCH := $(shell git diff --name-only $(shell git merge-base o .PHONY : install_deps setup pre-commit pre-commit-in-branch pre-commit-all test help install_deps: ## Install all dependencies. - pip install -r dev-requirements.txt - pip install -r requirements.txt + pip install -r requirements.txt -r dev-requirements.txt pip install -e . setup: ## Install all dependencies and setup pre-commit diff --git a/README.md b/README.md index a52906ff..dad9e070 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ # dbt-athena -* Supports dbt version `1.3.*` +* Supports dbt version `1.4.*` * Supports [Seeds][seeds] * Correctly detects views and their columns * Supports [table materialization][table] @@ -245,7 +245,7 @@ The only way, from a dbt perspective, is to do a full-refresh of the incremental ### Contributing -This connector works with Python from 3.7 to 3.10. +This connector works with Python from 3.7 to 3.11. #### Getting started diff --git a/dbt/adapters/athena/__version__.py b/dbt/adapters/athena/__version__.py index a19185e4..d619c757 100644 --- a/dbt/adapters/athena/__version__.py +++ b/dbt/adapters/athena/__version__.py @@ -1 +1 @@ -version = "1.3.5" +version = "1.4.0" diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 84a67cf4..9d4c09b1 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -30,7 +30,7 @@ from dbt.adapters.sql import SQLConnectionManager from dbt.contracts.connection import AdapterResponse, Connection, ConnectionState from dbt.events import AdapterLogger -from dbt.exceptions import FailedToConnectException, RuntimeException +from dbt.exceptions import ConnectionError, DbtRuntimeError logger = AdapterLogger("Athena") @@ -141,7 +141,7 @@ def exception_handler(self, sql: str) -> ContextManager: yield except Exception as e: logger.debug(f"Error running SQL: {sql}") - raise RuntimeException(str(e)) from e + raise DbtRuntimeError(str(e)) from e @classmethod def open(cls, connection: Connection) -> Connection: @@ -179,7 +179,7 @@ def open(cls, connection: Connection) -> Connection: logger.exception(f"Got an error when attempting to open a Athena connection due to {exc}") connection.handle = None connection.state = ConnectionState.FAIL - raise FailedToConnectException(str(exc)) + raise ConnectionError(str(exc)) return connection diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 3e2fb434..3f1d4914 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -15,10 +15,10 @@ from dbt.adapters.base.impl import GET_CATALOG_MACRO_NAME from dbt.adapters.base.relation import BaseRelation, InformationSchema from dbt.adapters.sql import SQLAdapter -from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import Manifest +from dbt.contracts.graph.nodes import CompiledNode from dbt.events import AdapterLogger -from dbt.exceptions import RuntimeException +from dbt.exceptions import DbtRuntimeError logger = AdapterLogger("Athena") @@ -183,7 +183,7 @@ def _delete_from_s3(self, client: Any, s3_path: str): bucket_name, ) if is_all_successful is False: - raise RuntimeException("Failed to delete files from S3.") + raise DbtRuntimeError("Failed to delete files from S3.") else: logger.debug("S3 path does not exist") @@ -250,7 +250,7 @@ def _get_one_catalog( def _get_catalog_schemas(self, manifest: Manifest) -> AthenaSchemaSearchMap: info_schema_name_map = AthenaSchemaSearchMap() - nodes: Iterator[CompileResultNode] = chain( + nodes: Iterator[CompiledNode] = chain( [node for node in manifest.nodes.values() if (node.is_relational and not node.is_ephemeral_model)], manifest.sources.values(), ) diff --git a/dbt/adapters/athena/relation.py b/dbt/adapters/athena/relation.py index 3ce9c29d..0c69fd7b 100644 --- a/dbt/adapters/athena/relation.py +++ b/dbt/adapters/athena/relation.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Dict, Optional, Set from dbt.adapters.base.relation import BaseRelation, InformationSchema, Policy @@ -14,7 +14,7 @@ class AthenaIncludePolicy(Policy): @dataclass(frozen=True, eq=False, repr=False) class AthenaRelation(BaseRelation): quote_character: str = "" - include_policy: Policy = AthenaIncludePolicy() + include_policy: Policy = field(default_factory=lambda: AthenaIncludePolicy()) class AthenaSchemaSearchMap(Dict[InformationSchema, Dict[str, Set[Optional[str]]]]): diff --git a/dev-requirements.txt b/dev-requirements.txt index 7f2b1538..b700cc92 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,6 +1,6 @@ autoflake~=1.7 black~=22.12 -dbt-tests-adapter~=1.3.2 +dbt-tests-adapter~=1.4.1 flake8~=5.0 Flake8-pyproject~=1.2 isort~=5.11 diff --git a/requirements.txt b/requirements.txt index fe7384c5..cf99a122 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ boto3~=1.26 -dbt-core~=1.3.2 +dbt-core~=1.4.1 pyathena~=2.20 tenacity~=8.1 diff --git a/setup.py b/setup.py index 03406a9d..8d2b3df7 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ def _get_package_version(): return f'{parts["major"]}.{parts["minor"]}.{parts["patch"]}' -dbt_version = "1.3" +dbt_version = "1.4" package_version = _get_package_version() description = "The athena adapter plugin for dbt (data build tool)" @@ -50,8 +50,8 @@ def _get_package_version(): install_requires=[ # In order to control dbt-core version and package version "boto3~=1.26", - "dbt-core~=1.3.2", - "pyathena~=2.19", + "dbt-core~=1.4.1", + "pyathena~=2.20", "tenacity~=8.1", ], ) diff --git a/tests/conftest.py b/tests/conftest.py index 1f5c0923..2233c410 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,12 @@ import os +from io import StringIO import pytest +from dbt.events.base_types import EventLevel +from dbt.events.eventmgr import NoFilter +from dbt.events.functions import EVENT_MANAGER, _get_stdout_config + # Import the fuctional fixtures as a plugin # Note: fixtures with session scope need to be local @@ -22,3 +27,23 @@ def dbt_profile_target(): "work_group": os.getenv("DBT_TEST_ATHENA_WORK_GROUND"), "aws_profile_name": os.getenv("DBT_TEST_ATHENA_AWS_PROFILE_NAME") or None, } + + +@pytest.fixture(scope="function") +def dbt_error_caplog() -> StringIO: + return _setup_custom_caplog("dbt_error", EventLevel.ERROR) + + +@pytest.fixture(scope="function") +def dbt_debug_caplog() -> StringIO: + return _setup_custom_caplog("dbt_debug", EventLevel.DEBUG) + + +def _setup_custom_caplog(name: str, level: EventLevel): + capture_config = _get_stdout_config(level) + capture_config.name = name + capture_config.filter = NoFilter + stringbuf = StringIO() + capture_config.output_stream = stringbuf + EVENT_MANAGER.add_logger(capture_config) + return stringbuf diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 0ed004ed..4b6120d7 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -1,5 +1,4 @@ import decimal -import logging import os from unittest import mock from unittest.mock import patch @@ -16,9 +15,8 @@ from dbt.clients import agate_helper from dbt.contracts.connection import ConnectionState from dbt.contracts.files import FileHash -from dbt.contracts.graph.compiled import CompiledModelNode -from dbt.contracts.graph.parsed import DependsOn, NodeConfig -from dbt.exceptions import FailedToConnectException, ValidationException +from dbt.contracts.graph.nodes import CompiledNode, DependsOn, NodeConfig +from dbt.exceptions import ConnectionError, DbtRuntimeError from dbt.node_types import NodeType from .constants import AWS_REGION, BUCKET, DATA_CATALOG_NAME, DATABASE_NAME @@ -61,7 +59,7 @@ def setup_method(self, _): self.mock_manifest = mock.MagicMock() self.mock_manifest.get_used_schemas.return_value = {("dbt", "foo"), ("dbt", "quux")} self.mock_manifest.nodes = { - "model.root.model1": CompiledModelNode( + "model.root.model1": CompiledNode( name="model1", database="dbt", schema="foo", @@ -70,7 +68,6 @@ def setup_method(self, _): alias="bar", fqn=["root", "model1"], package_name="root", - root_path="/usr/src/app", refs=[], sources=[], depends_on=DependsOn(), @@ -98,7 +95,7 @@ def setup_method(self, _): raw_code="select * from source_table", language="", ), - "model.root.model2": CompiledModelNode( + "model.root.model2": CompiledNode( name="model2", database="dbt", schema="quux", @@ -107,7 +104,6 @@ def setup_method(self, _): alias="bar", fqn=["root", "model2"], package_name="root", - root_path="/usr/src/app", refs=[], sources=[], depends_on=DependsOn(), @@ -148,7 +144,7 @@ def adapter(self): def test_acquire_connection_validations(self, connection_cls): try: connection = self.adapter.acquire_connection("dummy") - except ValidationException as e: + except DbtRuntimeError as e: pytest.fail(f"got ValidationException: {e}") except BaseException as e: pytest.fail(f"acquiring connection failed with unknown exception: {e}") @@ -182,17 +178,17 @@ def test_acquire_connection(self, connection_cls): connection_cls.assert_called_once() @mock.patch("dbt.adapters.athena.connections.AthenaConnection") - def test_acquire_connection_exc(self, connection_cls, caplog): - caplog.set_level(logging.ERROR) + def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog): connection_cls.side_effect = lambda **_: (_ for _ in ()).throw(Exception("foobar")) connection = self.adapter.acquire_connection("dummy") conn_res = None - with pytest.raises(FailedToConnectException) as exc: + with pytest.raises(ConnectionError) as exc: conn_res = connection.handle + assert conn_res is None assert connection.state == ConnectionState.FAIL - assert exc.value.msg == "foobar" - assert "Got an error when attempting to open a Athena connection due to foobar" in caplog.text + assert exc.value.__str__() == "foobar" + assert "Got an error when attempting to open a Athena connection due to foobar" in dbt_error_caplog.getvalue() @pytest.mark.parametrize( ("s3_data_dir", "s3_data_naming", "external_location", "is_temporary_table", "expected"), @@ -269,8 +265,7 @@ def aws_credentials(self): @mock_glue @mock_s3 @mock_athena - def test_clean_up_partitions_will_work(self, caplog, aws_credentials): - caplog.set_level("DEBUG") + def test_clean_up_partitions_will_work(self, dbt_debug_caplog, aws_credentials): table_name = "table" self.mock_aws_service.create_data_catalog() self.mock_aws_service.create_database() @@ -278,26 +273,18 @@ def test_clean_up_partitions_will_work(self, caplog, aws_credentials): self.mock_aws_service.add_data_in_table(table_name) self.adapter.acquire_connection("dummy") self.adapter.clean_up_partitions(DATABASE_NAME, table_name, "dt < '2022-01-03'") + log_records = dbt_debug_caplog.getvalue() assert ( "Deleting table data: path=" "'s3://test-dbt-athena-test-delete-partitions/tables/table/dt=2022-01-01', " "bucket='test-dbt-athena-test-delete-partitions', " - "prefix='tables/table/dt=2022-01-01/'" in caplog.text - ) - assert ( - "Calling s3:delete_objects with {'Bucket': 'test-dbt-athena-test-delete-partitions', " - "'Delete': {'Objects': [{'Key': 'tables/table/dt=2022-01-01/data1.parquet'}, " - "{'Key': 'tables/table/dt=2022-01-01/data2.parquet'}]}}" in caplog.text + "prefix='tables/table/dt=2022-01-01/'" in log_records ) assert ( "Deleting table data: path=" "'s3://test-dbt-athena-test-delete-partitions/tables/table/dt=2022-01-02', " "bucket='test-dbt-athena-test-delete-partitions', " - "prefix='tables/table/dt=2022-01-02/'" in caplog.text - ) - assert ( - "Calling s3:delete_objects with {'Bucket': 'test-dbt-athena-test-delete-partitions', " - "'Delete': {'Objects': [{'Key': 'tables/table/dt=2022-01-02/data.parquet'}]}}" in caplog.text + "prefix='tables/table/dt=2022-01-02/'" in log_records ) s3 = boto3.client("s3", region_name=AWS_REGION) keys = [obj["Key"] for obj in s3.list_objects_v2(Bucket=BUCKET)["Contents"]] @@ -305,19 +292,18 @@ def test_clean_up_partitions_will_work(self, caplog, aws_credentials): @mock_glue @mock_athena - def test_clean_up_table_table_does_not_exist(self, caplog, aws_credentials): - caplog.set_level("DEBUG") + def test_clean_up_table_table_does_not_exist(self, dbt_debug_caplog, aws_credentials): self.mock_aws_service.create_data_catalog() self.mock_aws_service.create_database() self.adapter.acquire_connection("dummy") - self.adapter.clean_up_table(DATABASE_NAME, "table") - assert "Table 'table' does not exists - Ignoring" in caplog.text + result = self.adapter.clean_up_table(DATABASE_NAME, "table") + assert result is None + assert "Table 'table' does not exists - Ignoring" in dbt_debug_caplog.getvalue() @mock_glue @mock_s3 @mock_athena - def test_clean_up_table_delete_table(self, caplog, aws_credentials): - caplog.set_level("DEBUG") + def test_clean_up_table_delete_table(self, dbt_debug_caplog, aws_credentials): self.mock_aws_service.create_data_catalog() self.mock_aws_service.create_database() self.mock_aws_service.create_table("table") @@ -327,7 +313,7 @@ def test_clean_up_table_delete_table(self, caplog, aws_credentials): assert ( "Deleting table data: path='s3://test-dbt-athena-test-delete-partitions/tables/table', " "bucket='test-dbt-athena-test-delete-partitions', " - "prefix='tables/table/'" in caplog.text + "prefix='tables/table/'" in dbt_debug_caplog.getvalue() ) s3 = boto3.client("s3", region_name=AWS_REGION) objs = s3.list_objects_v2(Bucket=BUCKET)