Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Snowflake Implementation of Catalog Support #792

Draft
wants to merge 15 commits into
base: 02-05-add_base_implementation_of_catalog_support
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dbt.adapters.snowflake.catalogs._aws_glue import AWSGlueCatalog
from dbt.adapters.snowflake.catalogs._iceberg_rest import IcebergRESTCatalog


# these are the valid values for `catalog_type`
CATALOG_INTEGRATIONS = {
"iceberg_rest": IcebergRESTCatalog,
"glue": AWSGlueCatalog,
}
76 changes: 76 additions & 0 deletions dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_aws_glue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from dataclasses import dataclass

from typing import Optional, Union

from dbt.adapters.base import BaseRelation
from dbt.adapters.catalogs import CatalogIntegration, CatalogIntegrationConfig
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.exceptions import DbtInternalError

from dbt.adapters.snowflake.utils import set_boolean


@dataclass
class IcebergGlueConfig(CatalogIntegrationConfig):
name: str
table_name: str
catalog_type: str = "glue"
external_volume: Optional[str] = None
namespace: Optional[str] = None
replace_invalid_characters: Optional[Union[bool, str]] = None
auto_refresh: Optional[Union[bool, str]] = None


class AWSGlueCatalog(CatalogIntegration):
"""
Implement Snowflake's AWS Glue Catalog Integration:
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-aws-glue
https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration-glue
"""

catalog_type = "glue"
table_format = "iceberg"

def __init__(self, config: CatalogIntegrationConfig):
super().__init__(config)
if config.catalog_type != "iceberg_rest":
raise DbtInternalError(
f"Attempting to create AWS Glue catalog integration for catalog {self.name} with catalog type {config.catalog_type}."
)
if isinstance(config, IcebergGlueConfig):
self.table_name = config.table_name
self.external_volume = config.external_volume
self.namespace = config.namespace
self.auto_refresh = config.auto_refresh # type:ignore
self.replace_invalid_characters = config.replace_invalid_characters # type:ignore

@property
def auto_refresh(self) -> bool:
return self._auto_refresh

@auto_refresh.setter
def auto_refresh(self, value: Union[bool, str]) -> None:
self._auto_refresh = set_boolean("auto_refresh", value, default=False)

@property
def replace_invalid_characters(self) -> bool:
return self._replace_invalid_characters

@replace_invalid_characters.setter
def replace_invalid_characters(self, value: Union[bool, str]) -> None:
self._replace_invalid_characters = set_boolean(
"replace_invalid_characters", value, default=False
)

def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str:
ddl_predicate = f"""
external_volume = '{self.external_volume}'
catalog = '{self.integration_name}'
"""
if self.namespace:
ddl_predicate += f"CATALOG_NAMESPACE = '{self.namespace}'\n"
if self.auto_refresh:
ddl_predicate += f"auto_refresh = {self.auto_refresh}\n"
if self.replace_invalid_characters:
ddl_predicate += f"replace_invalid_characters = {self.replace_invalid_characters}\n"
return ddl_predicate
119 changes: 119 additions & 0 deletions dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from dataclasses import dataclass
import textwrap
from typing import Any, Dict, Optional, Union, TYPE_CHECKING

from dbt.adapters.base import BaseRelation
from dbt.adapters.catalogs import CatalogIntegration, CatalogIntegrationConfig
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.relation_configs import RelationResults
from dbt_common.exceptions import DbtInternalError

from dbt.adapters.snowflake.utils import set_boolean

if TYPE_CHECKING:
import agate


@dataclass
class IcebergRESTConfig(CatalogIntegrationConfig):
name: str
table_name: str
catalog_type: str = "iceberg_rest"
external_volume: Optional[str] = None
namespace: Optional[str] = None
replace_invalid_characters: Optional[Union[bool, str]] = None
auto_refresh: Optional[Union[bool, str]] = None


class IcebergRESTCatalog(CatalogIntegration):
"""
Implement Snowflake's Iceberg REST Catalog Integration:
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-rest
https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration-rest
"""

catalog_type: str = "iceberg_rest"
table_format: str = "iceberg"

def __init__(self, config: CatalogIntegrationConfig):
super().__init__(config)
if config.catalog_type != "iceberg_rest":
raise DbtInternalError(
f"Attempting to create IcebergREST catalog integration for catalog {self.name} with catalog type {config.catalog_type}."
)
if isinstance(config, IcebergRESTConfig):
self.table_name = config.table_name
self.external_volume = config.external_volume
self.namespace = config.namespace
self.auto_refresh = config.auto_refresh # type:ignore
self.replace_invalid_characters = config.replace_invalid_characters # type:ignore

@property
def auto_refresh(self) -> bool:
return self._auto_refresh

@auto_refresh.setter
def auto_refresh(self, value: Union[bool, str]) -> None:
self._auto_refresh = set_boolean("auto_refresh", value, default=False)

@property
def replace_invalid_characters(self) -> bool:
return self._replace_invalid_characters

@replace_invalid_characters.setter
def replace_invalid_characters(self, value: Union[bool, str]) -> None:
self._replace_invalid_characters = set_boolean(
"replace_invalid_characters", value, default=False
)

def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str:
"""
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
:param config:
:param relation:
:return:
"""
base_location: str = f"{config.get('base_location_root', '_dbt')}"
base_location += f"/{relation.schema}/{relation.name}"

if sub_path := config.get("base_location_subpath"):
base_location += f"/{sub_path}"

ddl_predicate = f"""
external_volume = '{self.external_volume}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
if self.auto_refresh:
ddl_predicate += f"auto_refresh = {self.auto_refresh}\n"
if self.replace_invalid_characters:
ddl_predicate += f"replace_invalid_characters = {self.replace_invalid_characters}\n"
return textwrap.indent(textwrap.dedent(ddl_predicate), " " * 10)

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
# this try block can be removed once enable_iceberg_materializations is retired
try:
catalog_results: "agate.Table" = relation_results["catalog"]
except KeyError:
# this happens when `enable_iceberg_materializations` is turned off
return {}

if len(catalog_results) == 0:
# this happens when the dynamic table is a standard dynamic table (e.g. not iceberg)
return {}

# for now, if we get catalog results, it's because this is an iceberg table
# this is because we only run `show iceberg tables` to get catalog metadata
# this will need to be updated once this is in `show objects`
catalog: "agate.Row" = catalog_results.rows[0]
config_dict = {
"table_format": "iceberg",
"name": catalog.get("catalog_name"),
"external_volume": catalog.get("external_volume_name"),
"base_location": catalog.get("base_location"),
}

return config_dict
4 changes: 3 additions & 1 deletion dbt-snowflake/src/dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dbt.adapters.base.meta import available
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.snowflake.catalogs import CATALOG_INTEGRATIONS
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.sql.impl import (
LIST_SCHEMAS_MACRO_NAME,
Expand Down Expand Up @@ -56,6 +57,7 @@ class SnowflakeConfig(AdapterConfig):
external_volume: Optional[str] = None
base_location_root: Optional[str] = None
base_location_subpath: Optional[str] = None
catalog_name: Optional[str] = None


class SnowflakeAdapter(SQLAdapter):
Expand All @@ -64,7 +66,7 @@ class SnowflakeAdapter(SQLAdapter):
ConnectionManager = SnowflakeConnectionManager

AdapterSpecificConfigs = SnowflakeConfig

CATALOG_INTEGRATIONS = CATALOG_INTEGRATIONS
CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.not_null: ConstraintSupport.ENFORCED,
Expand Down
47 changes: 29 additions & 18 deletions dbt-snowflake/src/dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import textwrap

from dataclasses import dataclass, field
from typing import FrozenSet, Optional, Type, Iterator, Tuple


from dbt.adapters.clients import catalogs as catalogs_client
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.catalogs import CatalogIntegrationConfig, CatalogIntegrationType
from dbt.adapters.contracts.relation import ComponentName, RelationConfig
from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.snowflake.catalogs.managed_iceberg import (
SnowflakeManagedIcebergCatalogIntegration,
)
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.events.functions import fire_event, warn_or_error
Expand Down Expand Up @@ -64,6 +66,10 @@ def is_dynamic_table(self) -> bool:

@property
def is_iceberg_format(self) -> bool:
if self.catalog_name:
return (
catalogs_client.get_catalog(self.catalog_name).table_format == TableFormat.ICEBERG
)
return self.table_format == TableFormat.ICEBERG

@classproperty
Expand Down Expand Up @@ -168,6 +174,12 @@ def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) ->

transient_explicitly_set_true: bool = config.get("transient", False) # type:ignore

catalog_name = config.get("catalog_name", None)
if catalog_name:
catalog = catalogs_client.get_catalog(catalog_name)
if catalog.table_format == TableFormat.ICEBERG:
return "iceberg"

# Temporary tables are a Snowflake feature that do not exist in the
# Iceberg framework. We ignore the Iceberg status of the model.
if temporary:
Expand Down Expand Up @@ -203,22 +215,21 @@ def get_ddl_prefix_for_alter(self) -> str:
else:
return ""

def get_iceberg_ddl_options(self, config: RelationConfig) -> str:
# If the base_location_root config is supplied, overwrite the default value ("_dbt/")
base_location: str = (
f"{config.get('base_location_root', '_dbt')}/{self.schema}/{self.name}" # type:ignore
def add_managed_catalog_integration(self, config: RelationConfig) -> str:
catalog_name = "snowflake_managed"
external_volume = config.get("external_volume")
integration_config = CatalogIntegrationConfig(
catalog_name=catalog_name,
integration_name=catalog_name,
table_format=self.table_format,
catalog_type=CatalogIntegrationType.managed.value,
external_volume=external_volume,
)

if subpath := config.get("base_location_subpath"): # type:ignore
base_location += f"/{subpath}"

external_volume = config.get("external_volume") # type:ignore
iceberg_ddl_predicates: str = f"""
external_volume = '{external_volume}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)
catalogs_client.add_catalog(
SnowflakeManagedIcebergCatalogIntegration(integration_config),
catalog_name=catalog_name,
)
return catalog_name

def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]:
drop_view_message: str = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any

return config_dict

@classmethod
def from_relation_config(cls, relation_config: RelationConfig) -> Self:
return cls.from_dict(cls.parse_relation_config(relation_config))

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
# this try block can be removed once enable_iceberg_materializations is retired
Expand Down
Loading
Loading