Skip to content

Commit

Permalink
fix: prune static s3 locations before CTA (Tomme#88)
Browse files Browse the repository at this point in the history
* prune static s3 locations when CTA

* refactor s3 deletion methods

* feat: prune s3 location prior to CTA for all table_types except iceberg

* test: add test for s3 path parsing

* test: move test to right class

* test: use pytest parametrize instead of loop

* refactor: move debug logger into delete from s3 method

* chore: remove logger from delete table method
  • Loading branch information
antruo authored Dec 21, 2022
1 parent ba1cbd2 commit ef6e7c5
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 43 deletions.
103 changes: 66 additions & 37 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import re
from itertools import chain
from os import path
from threading import Lock
from typing import Dict, Iterator, List, Optional, Set
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple
from urllib.parse import urlparse
from uuid import uuid4

import agate
Expand Down Expand Up @@ -100,7 +100,6 @@ def clean_up_partitions(self, database_name: str, table_name: str, where_conditi

with boto3_client_lock:
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())
s3_resource = client.session.resource("s3", region_name=client.region_name, config=get_boto3_config())
paginator = glue_client.get_paginator("get_partitions")
partition_params = {
"DatabaseName": database_name,
Expand All @@ -110,32 +109,8 @@ def clean_up_partitions(self, database_name: str, table_name: str, where_conditi
}
partition_pg = paginator.paginate(**partition_params)
partitions = partition_pg.build_full_result().get("Partitions")
s3_rg = re.compile("s3://([^/]*)/(.*)")
for partition in partitions:
logger.debug(
f"Deleting objects for partition '{partition['Values']}' "
f"at '{partition['StorageDescriptor']['Location']}'"
)
m = s3_rg.match(partition["StorageDescriptor"]["Location"])
if m is not None:
bucket_name = m.group(1)
prefix = m.group(2)
s3_bucket = s3_resource.Bucket(bucket_name)
response = s3_bucket.objects.filter(Prefix=prefix).delete()
is_all_successful = True
for res in response:
if "Errors" in res:
for err in res["Errors"]:
is_all_successful = False
logger.error(
"Failed to clean up partitions: Key='{}', Code='{}', Message='{}', s3_bucket='{}'",
err["Key"],
err["Code"],
err["Message"],
bucket_name,
)
if is_all_successful is False:
raise RuntimeException("Failed to clean up table partitions.")
self._delete_from_s3(client, partition["StorageDescriptor"]["Location"])

@available
def clean_up_table(self, database_name: str, table_name: str):
Expand All @@ -152,20 +127,74 @@ def clean_up_table(self, database_name: str, table_name: str):
return

if table is not None:
p = re.compile("s3://([^/]*)/(.*)")
m = p.match(table["Table"]["StorageDescriptor"]["Location"])
if m is not None:
bucket_name = m.group(1)
prefix = m.group(2).rstrip("/") + "/"
logger.debug(f"Deleting table data from 's3://{bucket_name}/{prefix}'")
s3_resource = client.session.resource("s3", region_name=client.region_name, config=get_boto3_config())
s3_bucket = s3_resource.Bucket(bucket_name)
s3_bucket.objects.filter(Prefix=prefix).delete()
s3_location = table["Table"]["StorageDescriptor"]["Location"]
self._delete_from_s3(client, s3_location)

@available
def prune_s3_table_location(self, s3_table_location: str):
"""
Prunes an s3 table location.
This is ncessary resolve the HIVE_PARTITION_ALREADY_EXISTS error
that occurs during retrying after receiving a 503 Slow Down error
during a CTA command, if partial files have already been written to s3.
"""
conn = self.connections.get_thread_connection()
client = conn.handle
self._delete_from_s3(client, s3_table_location)

@available
def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str:
return super().quote_seed_column(column, False)

def _delete_from_s3(self, client: Any, s3_path: str):
"""
Deletes files from s3.
Additionally parses the response from the s3 delete request and raises
a RunTimeException in case it included errors.
"""
bucket_name, prefix = self._parse_s3_path(s3_path)
if self._s3_path_exists(client, bucket_name, prefix):
s3_resource = client.session.resource("s3", region_name=client.region_name, config=get_boto3_config())
s3_bucket = s3_resource.Bucket(bucket_name)
logger.debug(f"Deleting table data: path='{s3_path}', bucket='{bucket_name}', prefix='{prefix}'")
response = s3_bucket.objects.filter(Prefix=prefix).delete()
is_all_successful = True
for res in response:
if "Errors" in res:
for err in res["Errors"]:
is_all_successful = False
logger.error(
"Failed to delete files: Key='{}', Code='{}', Message='{}', s3_bucket='{}'",
err["Key"],
err["Code"],
err["Message"],
bucket_name,
)
if is_all_successful is False:
raise RuntimeException("Failed to delete files from S3.")
else:
logger.debug("S3 path does not exist")

@staticmethod
def _parse_s3_path(s3_path: str) -> Tuple[str, str]:
"""
Parses and splits an s3 path into bucket name and prefix.
This assumes that s3_path is a prefix instead of a URI. It adds a
trailing slash to the prefix, if there is none.
"""
o = urlparse(s3_path, allow_fragments=False)
bucket_name = o.netloc
prefix = o.path.lstrip("/").rstrip("/") + "/"
return bucket_name, prefix

@staticmethod
def _s3_path_exists(client: Any, s3_bucket: str, s3_prefix: str) -> bool:
"""Checks whether a given s3 path exists."""
response = client.session.client(
"s3", region_name=client.region_name, config=get_boto3_config()
).list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
return True if "Contents" in response else False

def _join_catalog_table_owners(self, table: agate.Table, manifest: Manifest) -> agate.Table:
owners = []
# Get the owner for each model from the manifest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

{%- set location_property = 'external_location' -%}
{%- set partition_property = 'partitioned_by' -%}
{%- set location = adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier, external_location, temporary) -%}

{%- if table_type == 'iceberg' -%}
{%- set location_property = 'location' -%}
Expand All @@ -35,12 +36,16 @@
{%- endif -%}
{%- endif %}

{%- if table_type != 'iceberg' -%}
{% do adapter.prune_s3_table_location(location) %}
{%- endif -%}

create table
{{ relation }}
with (
table_type='{{ table_type }}',
is_external={%- if table_type == 'iceberg' -%}false{%- else -%}true{%- endif %},
{{ location_property }}='{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier, external_location, temporary) }}',
{{ location_property }}='{{ location }}',
{%- if partitioned_by is not none %}
{{ partition_property }}=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }},
{%- endif %}
Expand Down
28 changes: 23 additions & 5 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,21 @@ def test_clean_up_partitions_will_work(self, caplog, aws_credentials):
self.adapter.acquire_connection("dummy")
self.adapter.clean_up_partitions(DATABASE_NAME, table_name, "dt < '2022-01-03'")
assert (
"Deleting objects for partition '['2022-01-01']' at "
"'s3://test-dbt-athena-test-delete-partitions/tables/table/dt=2022-01-01'" in caplog.text
"Deleting table data: path="
"'s3://test-dbt-athena-test-delete-partitions/tables/table/dt=2022-01-01', "
"bucket='test-dbt-athena-test-delete-partitions', "
"prefix='tables/table/dt=2022-01-01/'" in caplog.text
)
assert (
"Calling s3:delete_objects with {'Bucket': 'test-dbt-athena-test-delete-partitions', "
"'Delete': {'Objects': [{'Key': 'tables/table/dt=2022-01-01/data1.parquet'}, "
"{'Key': 'tables/table/dt=2022-01-01/data2.parquet'}]}}" in caplog.text
)
assert (
"Deleting objects for partition '['2022-01-02']' at "
"'s3://test-dbt-athena-test-delete-partitions/tables/table/dt=2022-01-02'" in caplog.text
"Deleting table data: path="
"'s3://test-dbt-athena-test-delete-partitions/tables/table/dt=2022-01-02', "
"bucket='test-dbt-athena-test-delete-partitions', "
"prefix='tables/table/dt=2022-01-02/'" in caplog.text
)
assert (
"Calling s3:delete_objects with {'Bucket': 'test-dbt-athena-test-delete-partitions', "
Expand Down Expand Up @@ -320,7 +324,11 @@ def test_clean_up_table_delete_table(self, caplog, aws_credentials):
self.mock_aws_service.add_data_in_table("table")
self.adapter.acquire_connection("dummy")
self.adapter.clean_up_table(DATABASE_NAME, "table")
assert "Deleting table data from 's3://test-dbt-athena-test-delete-partitions/tables/table/'" in caplog.text
assert (
"Deleting table data: path='s3://test-dbt-athena-test-delete-partitions/tables/table', "
"bucket='test-dbt-athena-test-delete-partitions', "
"prefix='tables/table/'" in caplog.text
)
s3 = boto3.client("s3", region_name=AWS_REGION)
objs = s3.list_objects_v2(Bucket=BUCKET)
assert objs["KeyCount"] == 0
Expand Down Expand Up @@ -463,6 +471,16 @@ def test_list_relations_without_caching_with_non_glue_data_catalog(self, parent_
self.adapter.list_relations_without_caching(schema_relation)
parent_list_relations_without_caching.assert_called_once_with(schema_relation)

@pytest.mark.parametrize(
"s3_path,expected",
[
("s3://my-bucket/test-dbt/tables/schema/table", ("my-bucket", "test-dbt/tables/schema/table/")),
("s3://my-bucket/test-dbt/tables/schema/table/", ("my-bucket", "test-dbt/tables/schema/table/")),
],
)
def test_parse_s3_path(self, s3_path, expected):
assert self.adapter._parse_s3_path(s3_path) == expected


class TestAthenaFilterCatalog:
def test__catalog_filter_table(self):
Expand Down

0 comments on commit ef6e7c5

Please sign in to comment.