diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index ec7cf59c1..db251c37e 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -1938,35 +1938,34 @@ def _build_custom_offset_time_spine_node( time_spine_source = self._choose_time_spine_source((DataSet.metric_time_dimension_spec(custom_grain),)) time_spine_read_node = self._choose_time_spine_read_node(time_spine_source) if {spec.time_granularity for spec in required_time_spine_specs} == {custom_grain}: - # If querying with only the same grain as is used in the offset_window, can use a simpler plan. - raise NotImplementedError("Support for offset windows with custom granularities is still in progress.") - else: - # For custom offset windows queried with other granularities, first, build CustomGranularityBoundsNode. - # This will be used twice in the output node, and ideally will be turned into a CTE. - bounds_node = CustomGranularityBoundsNode.create( - parent_node=time_spine_read_node, custom_granularity_name=custom_grain.name - ) - # Build a FilterElementsNode from bounds node to get required unique rows. - bounds_data_set = self._node_data_set_resolver.get_output_data_set(bounds_node) - bounds_specs = tuple( - bounds_data_set.instance_from_window_function(window_func).spec - for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE) - ) - custom_grain_spec = bounds_data_set.instance_from_time_dimension_grain_and_date_part( - time_granularity_name=custom_grain.name, date_part=None - ).spec - filter_elements_node = FilterElementsNode.create( - parent_node=bounds_node, - include_specs=InstanceSpecSet(time_dimension_specs=(custom_grain_spec,) + bounds_specs), - distinct=True, - ) - # Pass both the CustomGranularityBoundsNode and the FilterElementsNode into the OffsetByCustomGranularityNode. - return OffsetByCustomGranularityNode.create( - custom_granularity_bounds_node=bounds_node, - filter_elements_node=filter_elements_node, - offset_window=offset_window, - required_time_spine_specs=required_time_spine_specs, - ) + # TODO: If querying with only the same grain as is used in the offset_window, can use a simpler plan. + pass + # For custom offset windows queried with other granularities, first, build CustomGranularityBoundsNode. + # This will be used twice in the output node, and ideally will be turned into a CTE. + bounds_node = CustomGranularityBoundsNode.create( + parent_node=time_spine_read_node, custom_granularity_name=custom_grain.name + ) + # Build a FilterElementsNode from bounds node to get required unique rows. + bounds_data_set = self._node_data_set_resolver.get_output_data_set(bounds_node) + bounds_specs = tuple( + bounds_data_set.instance_from_window_function(window_func).spec + for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE) + ) + custom_grain_spec = bounds_data_set.instance_from_time_dimension_grain_and_date_part( + time_granularity_name=custom_grain.name, date_part=None + ).spec + filter_elements_node = FilterElementsNode.create( + parent_node=bounds_node, + include_specs=InstanceSpecSet(time_dimension_specs=(custom_grain_spec,) + bounds_specs), + distinct=True, + ) + # Pass both the CustomGranularityBoundsNode and the FilterElementsNode into the OffsetByCustomGranularityNode. + return OffsetByCustomGranularityNode.create( + custom_granularity_bounds_node=bounds_node, + filter_elements_node=filter_elements_node, + offset_window=offset_window, + required_time_spine_specs=required_time_spine_specs, + ) def _sort_by_base_granularity(self, time_dimension_specs: Sequence[TimeDimensionSpec]) -> List[TimeDimensionSpec]: """Sort the time dimensions by their base granularity.