Skip to content

Commit

Permalink
fixup! Build DataflowPlan for custom offset window with most grains
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Dec 19, 2024
1 parent 81ae325 commit e01ed6e
Showing 1 changed file with 28 additions and 29 deletions.
57 changes: 28 additions & 29 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1938,35 +1938,34 @@ def _build_custom_offset_time_spine_node(
time_spine_source = self._choose_time_spine_source((DataSet.metric_time_dimension_spec(custom_grain),))
time_spine_read_node = self._choose_time_spine_read_node(time_spine_source)
if {spec.time_granularity for spec in required_time_spine_specs} == {custom_grain}:
# If querying with only the same grain as is used in the offset_window, can use a simpler plan.
raise NotImplementedError("Support for offset windows with custom granularities is still in progress.")
else:
# For custom offset windows queried with other granularities, first, build CustomGranularityBoundsNode.
# This will be used twice in the output node, and ideally will be turned into a CTE.
bounds_node = CustomGranularityBoundsNode.create(
parent_node=time_spine_read_node, custom_granularity_name=custom_grain.name
)
# Build a FilterElementsNode from bounds node to get required unique rows.
bounds_data_set = self._node_data_set_resolver.get_output_data_set(bounds_node)
bounds_specs = tuple(
bounds_data_set.instance_from_window_function(window_func).spec
for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE)
)
custom_grain_spec = bounds_data_set.instance_from_time_dimension_grain_and_date_part(
time_granularity_name=custom_grain.name, date_part=None
).spec
filter_elements_node = FilterElementsNode.create(
parent_node=bounds_node,
include_specs=InstanceSpecSet(time_dimension_specs=(custom_grain_spec,) + bounds_specs),
distinct=True,
)
# Pass both the CustomGranularityBoundsNode and the FilterElementsNode into the OffsetByCustomGranularityNode.
return OffsetByCustomGranularityNode.create(
custom_granularity_bounds_node=bounds_node,
filter_elements_node=filter_elements_node,
offset_window=offset_window,
required_time_spine_specs=required_time_spine_specs,
)
# TODO: If querying with only the same grain as is used in the offset_window, can use a simpler plan.
pass
# For custom offset windows queried with other granularities, first, build CustomGranularityBoundsNode.
# This will be used twice in the output node, and ideally will be turned into a CTE.
bounds_node = CustomGranularityBoundsNode.create(
parent_node=time_spine_read_node, custom_granularity_name=custom_grain.name
)
# Build a FilterElementsNode from bounds node to get required unique rows.
bounds_data_set = self._node_data_set_resolver.get_output_data_set(bounds_node)
bounds_specs = tuple(
bounds_data_set.instance_from_window_function(window_func).spec
for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE)
)
custom_grain_spec = bounds_data_set.instance_from_time_dimension_grain_and_date_part(
time_granularity_name=custom_grain.name, date_part=None
).spec
filter_elements_node = FilterElementsNode.create(
parent_node=bounds_node,
include_specs=InstanceSpecSet(time_dimension_specs=(custom_grain_spec,) + bounds_specs),
distinct=True,
)
# Pass both the CustomGranularityBoundsNode and the FilterElementsNode into the OffsetByCustomGranularityNode.
return OffsetByCustomGranularityNode.create(
custom_granularity_bounds_node=bounds_node,
filter_elements_node=filter_elements_node,
offset_window=offset_window,
required_time_spine_specs=required_time_spine_specs,
)

def _sort_by_base_granularity(self, time_dimension_specs: Sequence[TimeDimensionSpec]) -> List[TimeDimensionSpec]:
"""Sort the time dimensions by their base granularity.
Expand Down

0 comments on commit e01ed6e

Please sign in to comment.