Skip to content

Commit

Permalink
CT-2691 Fix the populating of a Metric's depends_on property (#8015)
Browse files Browse the repository at this point in the history
* Add metrics from metric type params to a metric's depends_on

* Add Lookup utility for finding `SemanticModel`s by measure names

* Add the `SemanticModel` of a `Metric`'s measure property to the `Metric`'s `depends_on`

* Add `SemanticModelConfig` to `SemanticModel`

Some tests were failing due to `Metric`'s referencing `SemanticModels`.
Specifically there was a check to see if a referenced node was disabled,
and because `SemanticModel`'s didn't have a `config` holding the `enabled`
boolean attr, core would blow up.

* Checkpoint on test fixing

* Correct metricflow_time_spine_sql in test fixtures

* Add check for `SemanticModel` nodes in `Linker.link_node`

Now that `Metrics` depend on `SemanticModels` and `SemanticModels`
have their own dependencies on `Models` they need to be checked for
in the `Linker.link_node`. I forget the details but things blow up
without it. Basically it adds the SemanticModels to the dependency
graph.

* Fix artifacts/test_previous_version_state.py tests

* fix access/test_access.py tests

* Fix function metric tests

* Fix functional partial_parsing tests

* Add time dimension to semantic model in exposures fixture

* Bump DSI version to a minimum of 0.1.0dev10

DSI 0.1.0dev10 fixes an incoherence issue in DSI around `agg_time_dimension`
setting. This incoherence was that `measure.agg_time_dimension` was being
required, even though it was no longer supposed to be a required attribute
(it's specificially typed as optional in the protocol). This was causing
a handful of tests to fail because the `semantic_model.defaults.agg_time_dimension`
value wasn't being respected. Pulling in the fix from DSI 0.1.0dev10 fixes
the issue.

Interestingly after bumping the DSI version, the integration tests were
still failing. If I ran the tests individually they passed though. To get
`make integration` to run properly I ended up having to clear my `.tox`
cache, as it seems some outdated state was being persisted.

* Add test specifically for checking the `depends_on` of `Metric` nodes

* Re-enable test asserting calling metric nodes in models

* Migrate `checked_agg_time_dimension` to `checked_agg_time_dimension_for_measure`

DSI 0.1.0dev10 moved `checked_agg_time_dimension` from the `Measure`
protocol to the `SemanticModel` protocol as `checked_agg_time_dimension_for_measure`.
This finishes a change where for a given measure either the `Measure.agg_time_dimension`
or the measure's parent `SemanticModel.defaults.agg_time_dimension` needs to be
set, instead of always require the measure's `Measure.agg_time_dimension`.

* Add changie doc for populating metric

---------

Co-authored-by: Gerda Shank <[email protected]>
  • Loading branch information
QMalcolm and gshank authored Jul 12, 2023
1 parent 6bdf983 commit 5310d37
Show file tree
Hide file tree
Showing 20 changed files with 462 additions and 34 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230712-123724.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Begin populating `depends_on` of metric nodes
time: 2023-07-12T12:37:24.01449-07:00
custom:
Author: QMalcolm gshank
Issue: "7854"
2 changes: 2 additions & 0 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def link_node(self, node: GraphMemberNode, manifest: Manifest):
self.dependency(node.unique_id, (manifest.sources[dependency].unique_id))
elif dependency in manifest.metrics:
self.dependency(node.unique_id, (manifest.metrics[dependency].unique_id))
elif dependency in manifest.semantic_models:
self.dependency(node.unique_id, (manifest.semantic_models[dependency].unique_id))
else:
raise GraphDependencyNotFoundError(node, dependency)

Expand Down
75 changes: 75 additions & 0 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import enum
from collections import defaultdict
from dataclasses import dataclass, field
from itertools import chain, islice
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
from multiprocessing.synchronize import Lock
from typing import (
DefaultDict,
Dict,
List,
Optional,
Expand Down Expand Up @@ -297,6 +299,49 @@ def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> Metric:
return manifest.metrics[unique_id]


class SemanticModelByMeasureLookup(dbtClassMixin):
"""Lookup utility for finding SemanticModel by measure
This is possible because measure names are supposed to be unique across
the semantic models in a manifest.
"""

def __init__(self, manifest: "Manifest"):
self.storage: DefaultDict[str, Dict[PackageName, UniqueID]] = defaultdict(dict)
self.populate(manifest)

def get_unique_id(self, search_name: str, package: Optional[PackageName]):
return find_unique_id_for_package(self.storage, search_name, package)

def find(
self, search_name: str, package: Optional[PackageName], manifest: "Manifest"
) -> Optional[SemanticModel]:
"""Tries to find a SemanticModel based on a measure name"""
unique_id = self.get_unique_id(search_name, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None

def add(self, semantic_model: SemanticModel):
"""Sets all measures for a SemanticModel as paths to the SemanticModel's `unique_id`"""
for measure in semantic_model.measures:
self.storage[measure.name][semantic_model.package_name] = semantic_model.unique_id

def populate(self, manifest: "Manifest"):
"""Populate storage with all the measure + package paths to the Manifest's SemanticModels"""
for semantic_model in manifest.semantic_models.values():
self.add(semantic_model=semantic_model)

def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> SemanticModel:
"""Tries to get a SemanticModel from the Manifest"""
semantic_model = manifest.semantic_models.get(unique_id)
if semantic_model is None:
raise dbt.exceptions.DbtInternalError(
f"Semantic model `{unique_id}` found in cache but not found in manifest"
)
return semantic_model


# This handles both models/seeds/snapshots and sources/metrics/exposures
class DisabledLookup(dbtClassMixin):
def __init__(self, manifest: "Manifest"):
Expand Down Expand Up @@ -710,6 +755,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
_metric_lookup: Optional[MetricLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_semantic_model_by_measure_lookup: Optional[SemanticModelByMeasureLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_disabled_lookup: Optional[DisabledLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
Expand Down Expand Up @@ -960,6 +1008,13 @@ def metric_lookup(self) -> MetricLookup:
self._metric_lookup = MetricLookup(self)
return self._metric_lookup

@property
def semantic_model_by_measure_lookup(self) -> SemanticModelByMeasureLookup:
"""Gets (and creates if necessary) the lookup utility for getting SemanticModels by measures"""
if self._semantic_model_by_measure_lookup is None:
self._semantic_model_by_measure_lookup = SemanticModelByMeasureLookup(self)
return self._semantic_model_by_measure_lookup

def rebuild_ref_lookup(self):
self._ref_lookup = RefableLookup(self)

Expand Down Expand Up @@ -1087,6 +1142,25 @@ def resolve_metric(
return Disabled(disabled[0])
return None

def resolve_semantic_model_for_measure(
self,
target_measure_name: str,
current_project: str,
node_package: str,
target_package: Optional[str] = None,
) -> Optional[SemanticModel]:
"""Tries to find the SemanticModel that a measure belongs to"""
candidates = _packages_to_search(current_project, node_package, target_package)

for pkg in candidates:
semantic_model = self.semantic_model_by_measure_lookup.find(
target_measure_name, pkg, self
)
if semantic_model is not None:
return semantic_model

return None

# Called by DocsRuntimeContext.doc
def resolve_doc(
self,
Expand Down Expand Up @@ -1328,6 +1402,7 @@ def __reduce_ex__(self, protocol):
self._source_lookup,
self._ref_lookup,
self._metric_lookup,
self._semantic_model_by_measure_lookup,
self._disabled_lookup,
self._analysis_lookup,
)
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ def replace(self, **kwargs):
return self.from_dict(dct)


@dataclass
class SemanticModelConfig(BaseConfig):
enabled: bool = True


@dataclass
class MetricConfig(BaseConfig):
enabled: bool = True
Expand Down
26 changes: 26 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
MeasureReference,
LinkableElementReference,
SemanticModelReference,
TimeDimensionReference,
)
from dbt_semantic_interfaces.references import MetricReference as DSIMetricReference
from dbt_semantic_interfaces.type_enums import MetricType, TimeGranularity
Expand All @@ -65,6 +66,7 @@
ExposureConfig,
EmptySnapshotConfig,
SnapshotConfig,
SemanticModelConfig,
)


Expand Down Expand Up @@ -1487,6 +1489,7 @@ class SemanticModel(GraphNode):
depends_on: DependsOn = field(default_factory=DependsOn)
refs: List[RefArgs] = field(default_factory=list)
created_at: float = field(default_factory=lambda: time.time())
config: SemanticModelConfig = field(default_factory=SemanticModelConfig)

@property
def entity_references(self) -> List[LinkableElementReference]:
Expand Down Expand Up @@ -1545,6 +1548,29 @@ def depends_on_nodes(self):
def depends_on_macros(self):
return self.depends_on.macros

def checked_agg_time_dimension_for_measure(
self, measure_reference: MeasureReference
) -> TimeDimensionReference:
measure: Optional[Measure] = None
for measure in self.measures:
if measure.reference == measure_reference:
measure = measure

assert (
measure is not None
), f"No measure with name ({measure_reference.element_name}) in semantic_model with name ({self.name})"

if self.defaults is not None:
default_agg_time_dimesion = self.defaults.agg_time_dimension

agg_time_dimension_name = measure.agg_time_dimension or default_agg_time_dimesion
assert agg_time_dimension_name is not None, (
f"Aggregation time dimension for measure {measure.name} is not set! This should either be set directly on "
f"the measure specification in the model, or else defaulted to the primary time dimension in the data "
f"source containing the measure."
)
return TimeDimensionReference(element_name=agg_time_dimension_name)


# ====================================
# Patches
Expand Down
7 changes: 0 additions & 7 deletions core/dbt/contracts/graph/semantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,6 @@ class Measure(dbtClassMixin):
non_additive_dimension: Optional[NonAdditiveDimension] = None
agg_time_dimension: Optional[str] = None

@property
def checked_agg_time_dimension(self) -> TimeDimensionReference:
if self.agg_time_dimension is not None:
return TimeDimensionReference(element_name=self.agg_time_dimension)
else:
raise Exception("Measure is missing agg_time_dimension!")

@property
def reference(self) -> MeasureReference:
return MeasureReference(element_name=self.name)
15 changes: 14 additions & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,7 @@ def _process_metric_node(
current_project: str,
metric: Metric,
) -> None:
"""Sets a metric's input_measures"""
"""Sets a metric's `input_measures` and `depends_on` properties"""

# This ensures that if this metrics input_measures have already been set
# we skip the work. This could happen either due to recursion or if multiple
Expand All @@ -1468,6 +1468,18 @@ def _process_metric_node(
metric.type_params.measure is not None
), f"{metric} should have a measure defined, but it does not."
metric.type_params.input_measures.append(metric.type_params.measure)
target_semantic_model = manifest.resolve_semantic_model_for_measure(
target_measure_name=metric.type_params.measure.name,
current_project=current_project,
node_package=metric.package_name,
)
if target_semantic_model is None:
raise dbt.exceptions.ParsingError(
f"A semantic model having a measure `{metric.type_params.measure.name}` does not exist but was referenced.",
node=metric,
)

metric.depends_on.add_node(target_semantic_model.unique_id)

elif metric.type is MetricType.DERIVED or metric.type is MetricType.RATIO:
input_metrics = metric.input_metrics
Expand Down Expand Up @@ -1502,6 +1514,7 @@ def _process_metric_node(
manifest=manifest, current_project=current_project, metric=target_metric
)
metric.type_params.input_measures.extend(target_metric.type_params.input_measures)
metric.depends_on.add_node(target_metric.unique_id)
else:
assert_values_exhausted(metric.type)

Expand Down
2 changes: 1 addition & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
"minimal-snowplow-tracker~=0.0.2",
# DSI is under active development, so we're pinning to specific dev versions for now.
# TODO: Before RC/final release, update to use ~= pinning.
"dbt-semantic-interfaces==0.1.0.dev8",
"dbt-semantic-interfaces~=0.1.0.dev10",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
"packaging>20.9",
Expand Down
35 changes: 33 additions & 2 deletions tests/functional/access/test_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
group: analytics
- name: people_model
description: "some people"
access: private
access: public
group: analytics
"""

Expand All @@ -124,6 +124,31 @@
select 1 as id, 'Callum' as first_name, 'McCann' as last_name, 'emerald' as favorite_color, true as loves_dbt, 0 as tenure, current_timestamp as created_at
"""

people_semantic_model_yml = """
semantic_models:
- name: semantic_people
model: ref('people_model')
dimensions:
- name: favorite_color
type: categorical
- name: created_at
type: TIME
type_params:
time_granularity: day
measures:
- name: years_tenure
agg: SUM
expr: tenure
- name: people
agg: count
expr: id
entities:
- name: id
type: primary
defaults:
agg_time_dimension: created_at
"""

people_metric_yml = """
metrics:
Expand Down Expand Up @@ -203,6 +228,10 @@
group: package
"""

metricflow_time_spine_sql = """
SELECT to_date('02/20/2023', 'mm/dd/yyyy') as date_day
"""


class TestAccess:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -278,10 +307,12 @@ def test_access_attribute(self, project):
write_file(v5_schema_yml, project.project_root, "models", "schema.yml")
rm_file(project.project_root, "models", "simple_exposure.yml")
write_file(people_model_sql, "models", "people_model.sql")
write_file(people_semantic_model_yml, "models", "people_semantic_model.yml")
write_file(people_metric_yml, "models", "people_metric.yml")
write_file(metricflow_time_spine_sql, "models", "metricflow_time_spine.sql")
# Should succeed
manifest = run_dbt(["parse"])
assert len(manifest.nodes) == 4
assert len(manifest.nodes) == 5
manifest = get_manifest(project.project_root)
metric_id = "metric.test.number_of_people"
assert manifest.metrics[metric_id].group == "analytics"
Expand Down
Loading

0 comments on commit 5310d37

Please sign in to comment.