Skip to content

Commit

Permalink
chore: upgrade to dbt core v1.4.0 (Tomme#134)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeremy Guiselin <[email protected]>
  • Loading branch information
nicor88 and Jrmyy authored Feb 6, 2023
1 parent f23c4c2 commit f5d8cc8
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ "3.7", "3.8", "3.9", "3.10" ]
python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11" ]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ CHANGED_FILES_IN_BRANCH := $(shell git diff --name-only $(shell git merge-base o
.PHONY : install_deps setup pre-commit pre-commit-in-branch pre-commit-all test help

install_deps: ## Install all dependencies.
pip install -r dev-requirements.txt
pip install -r requirements.txt
pip install -r requirements.txt -r dev-requirements.txt
pip install -e .

setup: ## Install all dependencies and setup pre-commit
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

# dbt-athena

* Supports dbt version `1.3.*`
* Supports dbt version `1.4.*`
* Supports [Seeds][seeds]
* Correctly detects views and their columns
* Supports [table materialization][table]
Expand Down Expand Up @@ -245,7 +245,7 @@ The only way, from a dbt perspective, is to do a full-refresh of the incremental

### Contributing

This connector works with Python from 3.7 to 3.10.
This connector works with Python from 3.7 to 3.11.

#### Getting started

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/athena/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.3.5"
version = "1.4.0"
6 changes: 3 additions & 3 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import AdapterResponse, Connection, ConnectionState
from dbt.events import AdapterLogger
from dbt.exceptions import FailedToConnectException, RuntimeException
from dbt.exceptions import ConnectionError, DbtRuntimeError

logger = AdapterLogger("Athena")

Expand Down Expand Up @@ -141,7 +141,7 @@ def exception_handler(self, sql: str) -> ContextManager:
yield
except Exception as e:
logger.debug(f"Error running SQL: {sql}")
raise RuntimeException(str(e)) from e
raise DbtRuntimeError(str(e)) from e

@classmethod
def open(cls, connection: Connection) -> Connection:
Expand Down Expand Up @@ -179,7 +179,7 @@ def open(cls, connection: Connection) -> Connection:
logger.exception(f"Got an error when attempting to open a Athena connection due to {exc}")
connection.handle = None
connection.state = ConnectionState.FAIL
raise FailedToConnectException(str(exc))
raise ConnectionError(str(exc))

return connection

Expand Down
8 changes: 4 additions & 4 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from dbt.adapters.base.impl import GET_CATALOG_MACRO_NAME
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.sql import SQLAdapter
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import CompiledNode
from dbt.events import AdapterLogger
from dbt.exceptions import RuntimeException
from dbt.exceptions import DbtRuntimeError

logger = AdapterLogger("Athena")

Expand Down Expand Up @@ -183,7 +183,7 @@ def _delete_from_s3(self, client: Any, s3_path: str):
bucket_name,
)
if is_all_successful is False:
raise RuntimeException("Failed to delete files from S3.")
raise DbtRuntimeError("Failed to delete files from S3.")
else:
logger.debug("S3 path does not exist")

Expand Down Expand Up @@ -250,7 +250,7 @@ def _get_one_catalog(

def _get_catalog_schemas(self, manifest: Manifest) -> AthenaSchemaSearchMap:
info_schema_name_map = AthenaSchemaSearchMap()
nodes: Iterator[CompileResultNode] = chain(
nodes: Iterator[CompiledNode] = chain(
[node for node in manifest.nodes.values() if (node.is_relational and not node.is_ephemeral_model)],
manifest.sources.values(),
)
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/athena/relation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Dict, Optional, Set

from dbt.adapters.base.relation import BaseRelation, InformationSchema, Policy
Expand All @@ -14,7 +14,7 @@ class AthenaIncludePolicy(Policy):
@dataclass(frozen=True, eq=False, repr=False)
class AthenaRelation(BaseRelation):
quote_character: str = ""
include_policy: Policy = AthenaIncludePolicy()
include_policy: Policy = field(default_factory=lambda: AthenaIncludePolicy())


class AthenaSchemaSearchMap(Dict[InformationSchema, Dict[str, Set[Optional[str]]]]):
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
autoflake~=1.7
black~=22.12
dbt-tests-adapter~=1.3.2
dbt-tests-adapter~=1.4.1
flake8~=5.0
Flake8-pyproject~=1.2
isort~=5.11
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
boto3~=1.26
dbt-core~=1.3.2
dbt-core~=1.4.1
pyathena~=2.20
tenacity~=8.1
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _get_package_version():
return f'{parts["major"]}.{parts["minor"]}.{parts["patch"]}'


dbt_version = "1.3"
dbt_version = "1.4"
package_version = _get_package_version()
description = "The athena adapter plugin for dbt (data build tool)"

Expand All @@ -50,8 +50,8 @@ def _get_package_version():
install_requires=[
# In order to control dbt-core version and package version
"boto3~=1.26",
"dbt-core~=1.3.2",
"pyathena~=2.19",
"dbt-core~=1.4.1",
"pyathena~=2.20",
"tenacity~=8.1",
],
)
25 changes: 25 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import os
from io import StringIO

import pytest

from dbt.events.base_types import EventLevel
from dbt.events.eventmgr import NoFilter
from dbt.events.functions import EVENT_MANAGER, _get_stdout_config

# Import the fuctional fixtures as a plugin
# Note: fixtures with session scope need to be local

Expand All @@ -22,3 +27,23 @@ def dbt_profile_target():
"work_group": os.getenv("DBT_TEST_ATHENA_WORK_GROUND"),
"aws_profile_name": os.getenv("DBT_TEST_ATHENA_AWS_PROFILE_NAME") or None,
}


@pytest.fixture(scope="function")
def dbt_error_caplog() -> StringIO:
return _setup_custom_caplog("dbt_error", EventLevel.ERROR)


@pytest.fixture(scope="function")
def dbt_debug_caplog() -> StringIO:
return _setup_custom_caplog("dbt_debug", EventLevel.DEBUG)


def _setup_custom_caplog(name: str, level: EventLevel):
capture_config = _get_stdout_config(level)
capture_config.name = name
capture_config.filter = NoFilter
stringbuf = StringIO()
capture_config.output_stream = stringbuf
EVENT_MANAGER.add_logger(capture_config)
return stringbuf
54 changes: 20 additions & 34 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import decimal
import logging
import os
from unittest import mock
from unittest.mock import patch
Expand All @@ -16,9 +15,8 @@
from dbt.clients import agate_helper
from dbt.contracts.connection import ConnectionState
from dbt.contracts.files import FileHash
from dbt.contracts.graph.compiled import CompiledModelNode
from dbt.contracts.graph.parsed import DependsOn, NodeConfig
from dbt.exceptions import FailedToConnectException, ValidationException
from dbt.contracts.graph.nodes import CompiledNode, DependsOn, NodeConfig
from dbt.exceptions import ConnectionError, DbtRuntimeError
from dbt.node_types import NodeType

from .constants import AWS_REGION, BUCKET, DATA_CATALOG_NAME, DATABASE_NAME
Expand Down Expand Up @@ -61,7 +59,7 @@ def setup_method(self, _):
self.mock_manifest = mock.MagicMock()
self.mock_manifest.get_used_schemas.return_value = {("dbt", "foo"), ("dbt", "quux")}
self.mock_manifest.nodes = {
"model.root.model1": CompiledModelNode(
"model.root.model1": CompiledNode(
name="model1",
database="dbt",
schema="foo",
Expand All @@ -70,7 +68,6 @@ def setup_method(self, _):
alias="bar",
fqn=["root", "model1"],
package_name="root",
root_path="/usr/src/app",
refs=[],
sources=[],
depends_on=DependsOn(),
Expand Down Expand Up @@ -98,7 +95,7 @@ def setup_method(self, _):
raw_code="select * from source_table",
language="",
),
"model.root.model2": CompiledModelNode(
"model.root.model2": CompiledNode(
name="model2",
database="dbt",
schema="quux",
Expand All @@ -107,7 +104,6 @@ def setup_method(self, _):
alias="bar",
fqn=["root", "model2"],
package_name="root",
root_path="/usr/src/app",
refs=[],
sources=[],
depends_on=DependsOn(),
Expand Down Expand Up @@ -148,7 +144,7 @@ def adapter(self):
def test_acquire_connection_validations(self, connection_cls):
try:
connection = self.adapter.acquire_connection("dummy")
except ValidationException as e:
except DbtRuntimeError as e:
pytest.fail(f"got ValidationException: {e}")
except BaseException as e:
pytest.fail(f"acquiring connection failed with unknown exception: {e}")
Expand Down Expand Up @@ -182,17 +178,17 @@ def test_acquire_connection(self, connection_cls):
connection_cls.assert_called_once()

@mock.patch("dbt.adapters.athena.connections.AthenaConnection")
def test_acquire_connection_exc(self, connection_cls, caplog):
caplog.set_level(logging.ERROR)
def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
connection_cls.side_effect = lambda **_: (_ for _ in ()).throw(Exception("foobar"))
connection = self.adapter.acquire_connection("dummy")
conn_res = None
with pytest.raises(FailedToConnectException) as exc:
with pytest.raises(ConnectionError) as exc:
conn_res = connection.handle

assert conn_res is None
assert connection.state == ConnectionState.FAIL
assert exc.value.msg == "foobar"
assert "Got an error when attempting to open a Athena connection due to foobar" in caplog.text
assert exc.value.__str__() == "foobar"
assert "Got an error when attempting to open a Athena connection due to foobar" in dbt_error_caplog.getvalue()

@pytest.mark.parametrize(
("s3_data_dir", "s3_data_naming", "external_location", "is_temporary_table", "expected"),
Expand Down Expand Up @@ -269,55 +265,45 @@ def aws_credentials(self):
@mock_glue
@mock_s3
@mock_athena
def test_clean_up_partitions_will_work(self, caplog, aws_credentials):
caplog.set_level("DEBUG")
def test_clean_up_partitions_will_work(self, dbt_debug_caplog, aws_credentials):
table_name = "table"
self.mock_aws_service.create_data_catalog()
self.mock_aws_service.create_database()
self.mock_aws_service.create_table(table_name)
self.mock_aws_service.add_data_in_table(table_name)
self.adapter.acquire_connection("dummy")
self.adapter.clean_up_partitions(DATABASE_NAME, table_name, "dt < '2022-01-03'")
log_records = dbt_debug_caplog.getvalue()
assert (
"Deleting table data: path="
"'s3://test-dbt-athena-test-delete-partitions/tables/table/dt=2022-01-01', "
"bucket='test-dbt-athena-test-delete-partitions', "
"prefix='tables/table/dt=2022-01-01/'" in caplog.text
)
assert (
"Calling s3:delete_objects with {'Bucket': 'test-dbt-athena-test-delete-partitions', "
"'Delete': {'Objects': [{'Key': 'tables/table/dt=2022-01-01/data1.parquet'}, "
"{'Key': 'tables/table/dt=2022-01-01/data2.parquet'}]}}" in caplog.text
"prefix='tables/table/dt=2022-01-01/'" in log_records
)
assert (
"Deleting table data: path="
"'s3://test-dbt-athena-test-delete-partitions/tables/table/dt=2022-01-02', "
"bucket='test-dbt-athena-test-delete-partitions', "
"prefix='tables/table/dt=2022-01-02/'" in caplog.text
)
assert (
"Calling s3:delete_objects with {'Bucket': 'test-dbt-athena-test-delete-partitions', "
"'Delete': {'Objects': [{'Key': 'tables/table/dt=2022-01-02/data.parquet'}]}}" in caplog.text
"prefix='tables/table/dt=2022-01-02/'" in log_records
)
s3 = boto3.client("s3", region_name=AWS_REGION)
keys = [obj["Key"] for obj in s3.list_objects_v2(Bucket=BUCKET)["Contents"]]
assert set(keys) == {"tables/table/dt=2022-01-03/data1.parquet", "tables/table/dt=2022-01-03/data2.parquet"}

@mock_glue
@mock_athena
def test_clean_up_table_table_does_not_exist(self, caplog, aws_credentials):
caplog.set_level("DEBUG")
def test_clean_up_table_table_does_not_exist(self, dbt_debug_caplog, aws_credentials):
self.mock_aws_service.create_data_catalog()
self.mock_aws_service.create_database()
self.adapter.acquire_connection("dummy")
self.adapter.clean_up_table(DATABASE_NAME, "table")
assert "Table 'table' does not exists - Ignoring" in caplog.text
result = self.adapter.clean_up_table(DATABASE_NAME, "table")
assert result is None
assert "Table 'table' does not exists - Ignoring" in dbt_debug_caplog.getvalue()

@mock_glue
@mock_s3
@mock_athena
def test_clean_up_table_delete_table(self, caplog, aws_credentials):
caplog.set_level("DEBUG")
def test_clean_up_table_delete_table(self, dbt_debug_caplog, aws_credentials):
self.mock_aws_service.create_data_catalog()
self.mock_aws_service.create_database()
self.mock_aws_service.create_table("table")
Expand All @@ -327,7 +313,7 @@ def test_clean_up_table_delete_table(self, caplog, aws_credentials):
assert (
"Deleting table data: path='s3://test-dbt-athena-test-delete-partitions/tables/table', "
"bucket='test-dbt-athena-test-delete-partitions', "
"prefix='tables/table/'" in caplog.text
"prefix='tables/table/'" in dbt_debug_caplog.getvalue()
)
s3 = boto3.client("s3", region_name=AWS_REGION)
objs = s3.list_objects_v2(Bucket=BUCKET)
Expand Down

0 comments on commit f5d8cc8

Please sign in to comment.