Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/materialized views/adap 608 #7985

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
fd02a34
added updates from snowflake to core, pushed down to postgres, draft
mikealfare Jun 27, 2023
0146343
added updates from snowflake to core, pushed down to postgres, draft
mikealfare Jun 27, 2023
96cac81
added updates from snowflake to core, pushed down to postgres, working
mikealfare Jun 28, 2023
6b4fbec
added updates from snowflake to core, pushed down to postgres, final …
mikealfare Jun 29, 2023
6dfe0f3
added updates from snowflake to core, pushed down to postgres, final …
mikealfare Jun 29, 2023
459d7ff
changie
mikealfare Jun 29, 2023
c0f52b5
docs
mikealfare Jun 29, 2023
7e63255
created a Materialization class, analogous to BaseRelation
mikealfare Jun 29, 2023
f9bcd8c
updated docs
mikealfare Jun 29, 2023
82b54cb
create relation-materialization framework - final draft
mikealfare Jul 11, 2023
bc98787
Merge branch 'main' into feature/materialized-views/adap-608
mikealfare Jul 11, 2023
9400cdc
fixed using actual OrderDict for typehint
mikealfare Jul 11, 2023
e7df122
mypy
mikealfare Jul 11, 2023
1cd4e6d
removed centralized testing logic or moved it to `util`
mikealfare Jul 11, 2023
0e66a67
rename `RelationStub` to `RelationRef`
mikealfare Jul 11, 2023
83f1393
remove quote_policy and include_policy from PostgresRelation (restore…
mikealfare Jul 11, 2023
0de3d29
update `assert_message_in_logs` signature to be more intuitive
mikealfare Jul 12, 2023
a360045
updated test fixture to correctly indicated relations which can be re…
mikealfare Jul 12, 2023
e66da43
moved common logic from `MaterializedViewRelation` up into `Relation`
mikealfare Jul 12, 2023
cff8612
remove unnecessary abstraction of relations._materialized_view, pushe…
mikealfare Jul 12, 2023
a71403e
sensible defaults for relations, materializations, and materialized v…
mikealfare Jul 12, 2023
c2bc44d
updated readme
mikealfare Jul 12, 2023
44d5c52
fixed test collector issue for unit tests
mikealfare Jul 12, 2023
fe1f498
naming conventions, move from RuntimeConfigObject to CompiledNode, ad…
mikealfare Jul 13, 2023
e75886b
loosen typing to allow for snowflake custom DTs and align with typing…
mikealfare Jul 13, 2023
1a14011
Merge branch 'main' into feature/materialized-views/adap-608
mikealfare Jul 13, 2023
35c14be
loosen typing to allow for snowflake custom DTs and align with typing…
mikealfare Jul 14, 2023
045ba27
fixing index.html
mikealfare Jul 14, 2023
119ef04
fixing index.html
mikealfare Jul 14, 2023
71731db
remove accidentally added test data file
mikealfare Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230629-033005.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Establish framework for materialized views and materialization change management
time: 2023-06-29T03:30:05.527325-04:00
custom:
Author: mikealfare
Issue: "6911"
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ ignore =
E741
E501 # long line checking is done in black
exclude = test/
per-file-ignores =
*/__init__.py: F401
16 changes: 7 additions & 9 deletions core/dbt/adapters/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
# these are all just exports, #noqa them so flake8 will be happy

# TODO: Should we still include this in the `adapters` namespace?
from dbt.contracts.connection import Credentials # noqa: F401
from dbt.adapters.base.meta import available # noqa: F401
from dbt.adapters.base.connections import BaseConnectionManager # noqa: F401
from dbt.adapters.base.relation import ( # noqa: F401
from dbt.contracts.connection import Credentials
from dbt.adapters.base.meta import available
from dbt.adapters.base.connections import BaseConnectionManager
from dbt.adapters.base.relation import (
BaseRelation,
RelationType,
SchemaSearchMap,
)
from dbt.adapters.base.column import Column # noqa: F401
from dbt.adapters.base.impl import ( # noqa: F401
from dbt.adapters.base.column import Column
from dbt.adapters.base.impl import (
AdapterConfig,
BaseAdapter,
PythonJobHelper,
ConstraintSupport,
)
from dbt.adapters.base.plugin import AdapterPlugin # noqa: F401
from dbt.adapters.base.plugin import AdapterPlugin
219 changes: 186 additions & 33 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from contextlib import contextmanager
from datetime import datetime
from enum import Enum
import time
from itertools import chain
import time
from typing import (
Any,
Callable,
Expand All @@ -20,11 +20,42 @@
Union,
)

from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint

import agate
import pytz

from dbt import deprecations
from dbt.adapters.base import Credentials, Column as BaseColumn
from dbt.adapters.base.connections import AdapterResponse, Connection
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
ComponentName,
BaseRelation,
InformationSchema,
SchemaSearchMap,
)
from dbt.adapters.cache import RelationsCache, _make_ref_key_dict
from dbt.adapters.materialization import MaterializationFactory, models as materialization_models
from dbt.adapters.protocol import AdapterConfig, ConnectionManagerProtocol
from dbt.adapters.relation import RelationFactory, models as relation_models
from dbt.clients.agate_helper import empty_table, merge_tables, table_from_rows
from dbt.clients.jinja import MacroGenerator
from dbt.contracts.graph.manifest import MacroManifest, Manifest
from dbt.contracts.graph.nodes import (
ColumnLevelConstraint,
ConstraintType,
ModelLevelConstraint,
ResultNode,
)
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import (
CacheMiss,
CatalogGenerationError,
CodeExecution,
CodeExecutionStatus,
ConstraintNotEnforced,
ConstraintNotSupported,
ListRelations,
)
from dbt.exceptions import (
DbtInternalError,
DbtRuntimeError,
Expand All @@ -42,36 +73,8 @@
UnexpectedNonTimestampError,
UnexpectedNullError,
)
from dbt.utils import AttrDict, cast_to_str, filter_null_values, executor

from dbt.adapters.protocol import AdapterConfig, ConnectionManagerProtocol
from dbt.clients.agate_helper import empty_table, merge_tables, table_from_rows
from dbt.clients.jinja import MacroGenerator
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import (
CacheMiss,
ListRelations,
CodeExecution,
CodeExecutionStatus,
CatalogGenerationError,
ConstraintNotSupported,
ConstraintNotEnforced,
)
from dbt.utils import filter_null_values, executor, cast_to_str, AttrDict

from dbt.adapters.base.connections import Connection, AdapterResponse
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
ComponentName,
BaseRelation,
InformationSchema,
SchemaSearchMap,
)
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.base import Credentials
from dbt.adapters.cache import RelationsCache, _make_ref_key_dict
from dbt import deprecations

GET_CATALOG_MACRO_NAME = "get_catalog"
FRESHNESS_MACRO_NAME = "collect_freshness"
Expand Down Expand Up @@ -222,6 +225,14 @@ class BaseAdapter(metaclass=AdapterMeta):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

@property
def relation_factory(self):
return RelationFactory(relation_models={})

@property
def materialization_factory(self):
return MaterializationFactory(relation_factory=self.relation_factory)

def __init__(self, config):
self.config = config
self.cache = RelationsCache()
Expand Down Expand Up @@ -1168,7 +1179,7 @@ def post_model_hook(self, config: Mapping[str, Any], context: Any) -> None:
available in the materialization context). It should be considered
read-only.

The second parameter is the value returned by pre_mdoel_hook.
The second parameter is the value returned by pre_model_hook.
"""
pass

Expand Down Expand Up @@ -1420,6 +1431,148 @@ def render_model_constraint(cls, constraint: ModelLevelConstraint) -> Optional[s
else:
return None

"""
Pass-through methods to access `MaterializationFactory` and `RelationFactory` functionality
"""

@available
def make_materialization_from_runtime_config(
self, runtime_config, materialization_type: materialization_models.MaterializationType
) -> materialization_models.Materialization:
"""
Produce a `Materialization` instance along with whatever associated `Relation` and `RelationRef`
instances are needed.

Args:
runtime_config: the `config` (`RuntimeConfigObject`) in the global jinja context
materialization_type: the name of the materialization

Returns:
a `Materialization` instance that contains all the information required to execute the materialization
"""
existing_relation_ref = self._get_existing_relation_ref_from_model_node(
runtime_config.model
)

materialization = self.materialization_factory.make_from_runtime_config(
runtime_config, materialization_type, existing_relation_ref
)

return materialization

def _get_existing_relation_ref_from_model_node(
self, model_node
) -> Optional[relation_models.RelationRef]:
"""
We need to get `existing_relation_ref` from `Adapter` because we need access to a bunch of `cache`
things, in particular `get_relations`.

TODO: if we refactor the interaction between `Adapter` and `cache`, the calculation of `existing_relation_ref`
could be moved here, which is a more intuitive spot (like `target_relation`) for it
(and removes the concern of creating a `RelationRef` from `Adapter` where it doesn't belong
"""
existing_base_relation: BaseRelation = self.get_relation(
database=model_node.database,
schema=model_node.schema,
identifier=model_node.identifier,
)

if existing_base_relation:
existing_relation_ref = self.relation_factory.make_ref(
name=existing_base_relation.identifier,
schema_name=existing_base_relation.schema,
database_name=existing_base_relation.database,
relation_type=existing_base_relation.type,
)
else:
existing_relation_ref = None

return existing_relation_ref

@available
def make_changeset(
self,
existing_relation: relation_models.Relation,
target_relation: relation_models.Relation,
) -> relation_models.RelationChangeset:
"""

Args:
existing_relation: the current implementation of the relation in the database
target_relation: the new implementation that should exist in the database going forward

Returns:
a `RelationChangeset` instance that collects all the changes required to turn `existing_relation`
into `target_relation`
"""
return self.relation_factory.make_changeset(existing_relation, target_relation)

"""
Implementation of cache methods for `Relation` instances (versus `BaseRelation` instances)
"""

@available
def cache_created_relation_model(self, relation: relation_models.Relation) -> str:
base_relation = self.base_relation_from_relation_model(relation)
return self.cache_added(base_relation)

@available
def cache_dropped_relation_model(self, relation: relation_models.Relation) -> str:
base_relation = self.base_relation_from_relation_model(relation)
return self.cache_dropped(base_relation)

@available
def cache_renamed_relation_model(
self, relation: relation_models.Relation, new_name: str
) -> str:
from_relation = self.base_relation_from_relation_model(relation)
to_relation = from_relation.incorporate(path={"identifier": new_name})
return self.cache_renamed(from_relation, to_relation)

"""
Methods to swap back and forth between `Relation` and `BaseRelation` instances
"""

@available
def is_base_relation(self, relation: Union[BaseRelation, relation_models.Relation]) -> bool:
"""
Convenient for templating, given the mix of `BaseRelation` and `Relation`
"""
return isinstance(relation, BaseRelation)

@available
def is_relation_model(self, relation: Union[BaseRelation, relation_models.Relation]) -> bool:
"""
Convenient for templating, given the mix of `BaseRelation` and `Relation`
"""
return isinstance(relation, relation_models.Relation)

@available
def base_relation_from_relation_model(
self, relation: relation_models.Relation
) -> BaseRelation:
"""
Produce a `BaseRelation` instance from a `Relation` instance. This is primarily done to
reuse existing functionality based on `BaseRelation` while working with `Relation` instances.

Useful in combination with `is_relation_model`/`is_base_relation`

Args:
relation: a `Relation` instance or subclass to be converted

Returns:
a converted `BaseRelation` instance
"""
base_relation: BaseRelation = self.Relation.create(
database=relation.database_name,
schema=relation.schema_name,
identifier=relation.name,
quote_policy=self.relation_factory.render_policy.quote_policy,
type=relation.type,
)
assert isinstance(base_relation, BaseRelation) # mypy
return base_relation


COLUMNS_EQUAL_SQL = """
with diff_count as (
Expand Down
Loading