Skip to content

Commit

Permalink
PR feedback re: custom vs. standard offset_window
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Jan 22, 2025
1 parent 55eda7e commit 74b183b
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,10 @@ class JoinToTimeSpineDescription:
join_type: SqlJoinType
offset_window: Optional[MetricTimeWindow]
offset_to_grain: Optional[TimeGranularity]

@property
def standard_offset_window(self) -> Optional[MetricTimeWindow]:
"""Return the standard offset window if it is a standard granularity."""
if self.offset_window and self.offset_window.is_standard_granularity:
return self.offset_window
return None
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional

from dbt_semantic_interfaces.implementations.metric import PydanticMetricTimeWindow
from dbt_semantic_interfaces.protocols.metric import MetricTimeWindow
from dbt_semantic_interfaces.references import MetricReference
from dbt_semantic_interfaces.type_enums import TimeGranularity

Expand Down Expand Up @@ -67,3 +68,10 @@ def without_filter_specs(self) -> MetricSpec: # noqa: D102
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
)

@property
def standard_offset_window(self) -> Optional[MetricTimeWindow]:
"""Return the offset window if it exists and uses a standard granularity."""
if self.offset_window and self.offset_window.is_standard_granularity:
return self.offset_window
return None
18 changes: 3 additions & 15 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,7 @@ def _build_derived_metric_output_node(
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
join_on_time_dimension_spec=self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0],
offset_window=(
metric_spec.offset_window if not self._offset_window_is_custom(metric_spec.offset_window) else None
),
standard_offset_window=metric_spec.standard_offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)
Expand Down Expand Up @@ -1685,11 +1683,7 @@ def _build_aggregated_measure_from_measure_source_node(
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=base_queried_agg_time_dimension_specs,
join_on_time_dimension_spec=join_on_time_dimension_spec,
offset_window=(
before_aggregation_time_spine_join_description.offset_window
if not self._offset_window_is_custom(before_aggregation_time_spine_join_description.offset_window)
else None
),
standard_offset_window=(before_aggregation_time_spine_join_description.standard_offset_window),
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
join_type=before_aggregation_time_spine_join_description.join_type,
)
Expand Down Expand Up @@ -1907,7 +1901,7 @@ def _build_time_spine_node(

should_dedupe = False
filter_to_specs = tuple(queried_time_spine_specs)
if offset_window and self._offset_window_is_custom(offset_window):
if offset_window and not offset_window.is_standard_granularity:
time_spine_node = self._build_custom_offset_time_spine_node(
offset_window=offset_window, required_time_spine_specs=required_time_spine_specs
)
Expand Down Expand Up @@ -2012,9 +2006,3 @@ def _determine_time_spine_join_spec(
sample_agg_time_dimension_spec = required_time_spine_specs[0]
join_on_time_dimension_spec = sample_agg_time_dimension_spec.with_grain(time_granularity=join_spec_grain)
return join_on_time_dimension_spec

def _offset_window_is_custom(self, offset_window: Optional[MetricTimeWindow]) -> bool:
return (
offset_window is not None
and offset_window.granularity in self._semantic_model_lookup.custom_granularity_names
)
25 changes: 15 additions & 10 deletions metricflow/dataflow/nodes/join_to_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):
requested_agg_time_dimension_specs: Time dimensions requested in the query.
join_type: Join type to use when joining to time spine.
join_on_time_dimension_spec: The time dimension to use in the join ON condition.
offset_window: Time window to offset the parent dataset by when joining to time spine.
standard_offset_window: Time window to offset the parent dataset by when joining to time spine.
Only standard granularities are accepted for standard_offset_window in this node.
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
"""

Expand All @@ -33,18 +34,22 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec]
join_on_time_dimension_spec: TimeDimensionSpec
join_type: SqlJoinType
offset_window: Optional[MetricTimeWindow]
standard_offset_window: Optional[MetricTimeWindow]
offset_to_grain: Optional[TimeGranularity]

def __post_init__(self) -> None: # noqa: D105
super().__post_init__()

assert not (
self.offset_window and self.offset_to_grain
), "Can't set both offset_window and offset_to_grain when joining to time spine. Choose one or the other."
self.standard_offset_window and self.offset_to_grain
), "Can't set both standard_offset_window and offset_to_grain when joining to time spine. Choose one or the other."
assert (
len(self.requested_agg_time_dimension_specs) > 0
), "Must have at least one value in requested_agg_time_dimension_specs for JoinToTimeSpineNode."
if self.standard_offset_window and not self.standard_offset_window.is_standard_granularity:
raise RuntimeError(
f"JoinToTimeSpineNode should not accept a custom standard_offset_window. Got: {self.standard_offset_window}"
)

@staticmethod
def create( # noqa: D102
Expand All @@ -53,7 +58,7 @@ def create( # noqa: D102
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec],
join_on_time_dimension_spec: TimeDimensionSpec,
join_type: SqlJoinType,
offset_window: Optional[MetricTimeWindow] = None,
standard_offset_window: Optional[MetricTimeWindow] = None,
offset_to_grain: Optional[TimeGranularity] = None,
) -> JoinToTimeSpineNode:
return JoinToTimeSpineNode(
Expand All @@ -63,7 +68,7 @@ def create( # noqa: D102
requested_agg_time_dimension_specs=tuple(requested_agg_time_dimension_specs),
join_on_time_dimension_spec=join_on_time_dimension_spec,
join_type=join_type,
offset_window=offset_window,
standard_offset_window=standard_offset_window,
offset_to_grain=offset_to_grain,
)

Expand All @@ -85,16 +90,16 @@ def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
DisplayedProperty("join_on_time_dimension_spec", self.join_on_time_dimension_spec),
DisplayedProperty("join_type", self.join_type),
)
if self.offset_window:
props += (DisplayedProperty("offset_window", self.offset_window),)
if self.standard_offset_window:
props += (DisplayedProperty("standard_offset_window", self.standard_offset_window),)
if self.offset_to_grain:
props += (DisplayedProperty("offset_to_grain", self.offset_to_grain),)
return props

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102
return (
isinstance(other_node, self.__class__)
and other_node.offset_window == self.offset_window
and other_node.standard_offset_window == self.standard_offset_window
and other_node.offset_to_grain == self.offset_to_grain
and other_node.requested_agg_time_dimension_specs == self.requested_agg_time_dimension_specs
and other_node.join_on_time_dimension_spec == self.join_on_time_dimension_spec
Expand All @@ -107,7 +112,7 @@ def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> Join
metric_source_node=self.metric_source_node,
time_spine_node=self.time_spine_node,
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
offset_window=self.offset_window,
standard_offset_window=self.standard_offset_window,
offset_to_grain=self.offset_to_grain,
join_type=self.join_type,
join_on_time_dimension_spec=self.join_on_time_dimension_spec,
Expand Down
6 changes: 3 additions & 3 deletions metricflow/plan_conversion/sql_join_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,11 @@ def make_join_to_time_spine_join_description(
left_expr: SqlExpressionNode = SqlColumnReferenceExpression.create(
col_ref=SqlColumnReference(table_alias=time_spine_alias, column_name=time_spine_column_name)
)
if node.offset_window:
if node.standard_offset_window:
left_expr = SqlSubtractTimeIntervalExpression.create(
arg=left_expr,
count=node.offset_window.count,
granularity=error_if_not_standard_grain(input_granularity=node.offset_window.granularity),
count=node.standard_offset_window.count,
granularity=error_if_not_standard_grain(input_granularity=node.standard_offset_window.granularity),
)
elif node.offset_to_grain:
left_expr = SqlDateTruncExpression.create(time_granularity=node.offset_to_grain, arg=left_expr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ docstring:
<!-- time_granularity=ExpandedTimeGranularity(name='day', base_granularity=DAY), -->
<!-- ) -->
<!-- join_type = INNER -->
<!-- offset_window = PydanticMetricTimeWindow(count=5, granularity='day') -->
<!-- standard_offset_window = PydanticMetricTimeWindow(count=5, granularity='day') -->
<MetricTimeDimensionTransformNode>
<!-- description = "Metric Time Dimension 'ds'" -->
<!-- node_id = NodeId(id_str='sma_28009') -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ test_filename: test_dataflow_plan_builder.py
<!-- time_granularity=ExpandedTimeGranularity(name='day', base_granularity=DAY), -->
<!-- ) -->
<!-- join_type = INNER -->
<!-- offset_window = PydanticMetricTimeWindow(count=5, granularity='day') -->
<!-- standard_offset_window = PydanticMetricTimeWindow(count=5, granularity='day') -->
<MetricTimeDimensionTransformNode>
<!-- description = "Metric Time Dimension 'ds'" -->
<!-- node_id = NodeId(id_str='sma_28009') -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ test_filename: test_dataflow_plan_builder.py
<!-- time_granularity=ExpandedTimeGranularity(name='day', base_granularity=DAY), -->
<!-- ) -->
<!-- join_type = INNER -->
<!-- offset_window = PydanticMetricTimeWindow(count=2, granularity='day') -->
<!-- standard_offset_window = PydanticMetricTimeWindow(count=2, granularity='day') -->
<JoinOverTimeRangeNode>
<!-- description = 'Join Self Over Time Range' -->
<!-- node_id = NodeId(id_str='jotr_0') -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ test_filename: test_dataflow_plan_builder.py
<!-- time_granularity=ExpandedTimeGranularity(name='day', base_granularity=DAY), -->
<!-- ) -->
<!-- join_type = INNER -->
<!-- offset_window = PydanticMetricTimeWindow(count=14, granularity='day') -->
<!-- standard_offset_window = PydanticMetricTimeWindow(count=14, granularity='day') -->
<MetricTimeDimensionTransformNode>
<!-- description = "Metric Time Dimension 'ds'" -->
<!-- node_id = NodeId(id_str='sma_28009') -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ test_filename: test_dataflow_plan_builder.py
<!-- time_granularity=ExpandedTimeGranularity(name='day', base_granularity=DAY), -->
<!-- ) -->
<!-- join_type = INNER -->
<!-- offset_window = PydanticMetricTimeWindow(count=2, granularity='day') -->
<!-- standard_offset_window = PydanticMetricTimeWindow(count=2, granularity='day') -->
<ComputeMetricsNode>
<!-- description = 'Compute Metrics via Expressions' -->
<!-- node_id = NodeId(id_str='cm_1') -->
Expand Down Expand Up @@ -73,7 +73,7 @@ test_filename: test_dataflow_plan_builder.py
<!-- time_granularity=ExpandedTimeGranularity(name='day', base_granularity=DAY), -->
<!-- ) -->
<!-- join_type = INNER -->
<!-- offset_window = PydanticMetricTimeWindow(count=5, granularity='day') -->
<!-- standard_offset_window = PydanticMetricTimeWindow(count=5, granularity='day') -->
<MetricTimeDimensionTransformNode>
<!-- description = "Metric Time Dimension 'ds'" -->
<!-- node_id = NodeId(id_str='sma_28009') -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ docstring:
<!-- time_granularity=ExpandedTimeGranularity(name='day', base_granularity=DAY), -->
<!-- ) -->
<!-- join_type = INNER -->
<!-- offset_window = PydanticMetricTimeWindow(count=1, granularity='week') -->
<!-- standard_offset_window = PydanticMetricTimeWindow(count=1, granularity='week') -->
<MetricTimeDimensionTransformNode>
<!-- description = "Metric Time Dimension 'ds'" -->
<!-- node_id = NodeId(id_str='sma_28009') -->
Expand Down

0 comments on commit 74b183b

Please sign in to comment.