Skip to content

Commit

Permalink
fix: return only "current" iceberg columns (#2982)
Browse files Browse the repository at this point in the history
* fix: do not return hidden iceberg columns

* add parameter

* add test case

* fix parentheses

* pr feedback

* rename param
  • Loading branch information
kukushking authored Oct 2, 2024
1 parent ed1abde commit cc77561
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 4 deletions.
8 changes: 7 additions & 1 deletion awswrangler/athena/_write_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ def _determine_differences(

catalog_column_types = typing.cast(
Dict[str, str],
catalog.get_table_types(database=database, table=table, catalog_id=catalog_id, boto3_session=boto3_session),
catalog.get_table_types(
database=database,
table=table,
catalog_id=catalog_id,
filter_iceberg_current=True,
boto3_session=boto3_session,
),
)

original_column_names = set(catalog_column_types)
Expand Down
9 changes: 8 additions & 1 deletion awswrangler/catalog/_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def get_table_types(
database: str,
table: str,
catalog_id: str | None = None,
filter_iceberg_current: bool = False,
boto3_session: boto3.Session | None = None,
) -> dict[str, str] | None:
"""Get all columns and types from a table.
Expand All @@ -120,6 +121,9 @@ def get_table_types(
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
filter_iceberg_current
If True, returns only current iceberg fields (fields marked with iceberg.field.current: true).
Otherwise, returns the all fields. False by default (return all fields).
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Expand All @@ -139,7 +143,10 @@ def get_table_types(
response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
except client_glue.exceptions.EntityNotFoundException:
return None
return _extract_dtypes_from_table_details(response=response)
return _extract_dtypes_from_table_details(
response=response,
filter_iceberg_current=filter_iceberg_current,
)


def get_databases(
Expand Down
10 changes: 8 additions & 2 deletions awswrangler/catalog/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ def _sanitize_name(name: str) -> str:
return re.sub("[^A-Za-z0-9_]+", "_", name).lower() # Replacing non alphanumeric characters by underscore


def _extract_dtypes_from_table_details(response: "GetTableResponseTypeDef") -> dict[str, str]:
def _extract_dtypes_from_table_details(
response: "GetTableResponseTypeDef",
filter_iceberg_current: bool = False,
) -> dict[str, str]:
dtypes: dict[str, str] = {}
for col in response["Table"]["StorageDescriptor"]["Columns"]:
dtypes[col["Name"]] = col["Type"]
# Only return current fields if flag is enabled
if not filter_iceberg_current or col.get("Parameters", {}).get("iceberg.field.current") == "true":
dtypes[col["Name"]] = col["Type"]
# Add partition keys as columns
if "PartitionKeys" in response["Table"]:
for par in response["Table"]["PartitionKeys"]:
dtypes[par["Name"]] = par["Type"]
Expand Down
52 changes: 52 additions & 0 deletions tests/unit/test_athena_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,3 +1159,55 @@ def test_to_iceberg_fill_missing_columns_with_complex_types(
schema_evolution=True,
fill_missing_columns_in_df=True,
)


def test_athena_to_iceberg_alter_schema(
path: str,
path2: str,
glue_database: str,
glue_table: str,
) -> None:
df = pd.DataFrame(
{
"id": pd.Series([1, 2, 3, 4, 5], dtype="Int64"),
"name": pd.Series(["a", "b", "c", "d", "e"], dtype="string"),
},
).reset_index(drop=True)

split_index = 3

wr.athena.to_iceberg(
df=df[:split_index],
database=glue_database,
table=glue_table,
table_location=path,
temp_path=path2,
schema_evolution=True,
keep_files=False,
)

wr.athena.start_query_execution(
sql=f"ALTER TABLE {glue_table} CHANGE COLUMN id new_id bigint",
database=glue_database,
wait=True,
)

df = df.rename(columns={"id": "new_id"})

wr.athena.to_iceberg(
df=df[split_index:],
database=glue_database,
table=glue_table,
table_location=path,
temp_path=path2,
schema_evolution=True,
keep_files=False,
)

df_actual = wr.athena.read_sql_query(
sql=f"SELECT new_id, name FROM {glue_table} ORDER BY new_id",
database=glue_database,
ctas_approach=False,
)

assert_pandas_equals(df, df_actual)

0 comments on commit cc77561

Please sign in to comment.