From 53fac28d9e3989645c35dc084c2101ee93afeb56 Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Fri, 17 Jan 2025 11:18:03 -0800 Subject: [PATCH] WIP - combine 2 nodes into 1 --- .../metricflow_semantics/sql/sql_exprs.py | 6 + .../dataflow/builder/dataflow_plan_builder.py | 25 +- metricflow/dataflow/dataflow_plan_visitor.py | 9 - .../nodes/custom_granularity_bounds.py | 64 ---- .../nodes/offset_by_custom_granularity.py | 33 +- .../optimizer/predicate_pushdown_optimizer.py | 6 - .../source_scan/cm_branch_combiner.py | 7 - .../source_scan/source_scan_optimizer.py | 7 - metricflow/execution/dataflow_to_execution.py | 5 - metricflow/plan_conversion/dataflow_to_sql.py | 309 +++++++----------- metricflow/sql/sql_plan.py | 9 + .../source_scan/test_source_scan_optimizer.py | 4 - .../integration/test_configured_cases.py | 2 +- .../test_custom_granularity.py | 4 +- 14 files changed, 149 insertions(+), 341 deletions(-) delete mode 100644 metricflow/dataflow/nodes/custom_granularity_bounds.py diff --git a/metricflow-semantics/metricflow_semantics/sql/sql_exprs.py b/metricflow-semantics/metricflow_semantics/sql/sql_exprs.py index be7a366ced..1301153e77 100644 --- a/metricflow-semantics/metricflow_semantics/sql/sql_exprs.py +++ b/metricflow-semantics/metricflow_semantics/sql/sql_exprs.py @@ -561,6 +561,12 @@ def matches(self, other: SqlExpressionNode) -> bool: # noqa: D102 def from_table_and_column_names(table_alias: str, column_name: str) -> SqlColumnReferenceExpression: # noqa: D102 return SqlColumnReferenceExpression.create(SqlColumnReference(table_alias=table_alias, column_name=column_name)) + def with_new_table_alias(self, new_table_alias: str) -> SqlColumnReferenceExpression: + """Returns a new column reference expression with the same column name but a new table alias.""" + return SqlColumnReferenceExpression.from_table_and_column_names( + table_alias=new_table_alias, column_name=self.col_ref.column_name + ) + @dataclass(frozen=True, eq=False) class SqlColumnAliasReferenceExpression(SqlExpressionNode): diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 9720709925..7582603519 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -54,7 +54,6 @@ from metricflow_semantics.specs.where_filter.where_filter_spec import WhereFilterSpec from metricflow_semantics.specs.where_filter.where_filter_spec_set import WhereFilterSpecSet from metricflow_semantics.specs.where_filter.where_filter_transform import WhereSpecFactory -from metricflow_semantics.sql.sql_exprs import SqlWindowFunction from metricflow_semantics.sql.sql_join_type import SqlJoinType from metricflow_semantics.sql.sql_table import SqlTable from metricflow_semantics.time.dateutil_adjuster import DateutilTimePeriodAdjuster @@ -85,7 +84,6 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode -from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -1951,29 +1949,8 @@ def _build_custom_offset_time_spine_node( if {spec.time_granularity for spec in required_time_spine_specs} == {custom_grain}: # TODO: If querying with only the same grain as is used in the offset_window, can use a simpler plan. pass - # For custom offset windows queried with other granularities, first, build CustomGranularityBoundsNode. - # This will be used twice in the output node, and ideally will be turned into a CTE. - bounds_node = CustomGranularityBoundsNode.create( - parent_node=time_spine_read_node, custom_granularity_name=custom_grain.name - ) - # Build a FilterElementsNode from bounds node to get required unique rows. - bounds_data_set = self._node_data_set_resolver.get_output_data_set(bounds_node) - bounds_specs = tuple( - bounds_data_set.instance_from_window_function(window_func).spec - for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE) - ) - custom_grain_spec = bounds_data_set.instance_from_time_dimension_grain_and_date_part( - time_granularity_name=custom_grain.name, date_part=None - ).spec - filter_elements_node = FilterElementsNode.create( - parent_node=bounds_node, - include_specs=InstanceSpecSet(time_dimension_specs=(custom_grain_spec,) + bounds_specs), - distinct=True, - ) - # Pass both the CustomGranularityBoundsNode and the FilterElementsNode into the OffsetByCustomGranularityNode. return OffsetByCustomGranularityNode.create( - custom_granularity_bounds_node=bounds_node, - filter_elements_node=filter_elements_node, + time_spine_node=time_spine_read_node, offset_window=offset_window, required_time_spine_specs=required_time_spine_specs, ) diff --git a/metricflow/dataflow/dataflow_plan_visitor.py b/metricflow/dataflow/dataflow_plan_visitor.py index 1e3a86bfef..b52af4217d 100644 --- a/metricflow/dataflow/dataflow_plan_visitor.py +++ b/metricflow/dataflow/dataflow_plan_visitor.py @@ -15,7 +15,6 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode - from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -128,10 +127,6 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod def visit_alias_specs_node(self, node: AliasSpecsNode) -> VisitorOutputT: # noqa: D102 raise NotImplementedError - @abstractmethod - def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> VisitorOutputT: # noqa: D102 - raise NotImplementedError - @abstractmethod def visit_offset_by_custom_granularity_node( # noqa: D102 self, node: OffsetByCustomGranularityNode @@ -235,10 +230,6 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod def visit_alias_specs_node(self, node: AliasSpecsNode) -> VisitorOutputT: # noqa: D102 return self._default_handler(node) - @override - def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> VisitorOutputT: # noqa: D102 - return self._default_handler(node) - @override def visit_offset_by_custom_granularity_node( # noqa: D102 self, node: OffsetByCustomGranularityNode diff --git a/metricflow/dataflow/nodes/custom_granularity_bounds.py b/metricflow/dataflow/nodes/custom_granularity_bounds.py deleted file mode 100644 index 5dbde2a886..0000000000 --- a/metricflow/dataflow/nodes/custom_granularity_bounds.py +++ /dev/null @@ -1,64 +0,0 @@ -from __future__ import annotations - -from abc import ABC -from dataclasses import dataclass -from typing import Sequence - -from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix -from metricflow_semantics.dag.mf_dag import DisplayedProperty -from metricflow_semantics.visitor import VisitorOutputT - -from metricflow.dataflow.dataflow_plan import DataflowPlanNode -from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor - - -@dataclass(frozen=True, eq=False) -class CustomGranularityBoundsNode(DataflowPlanNode, ABC): - """Calculate the start and end of a custom granularity period and each row number within that period.""" - - custom_granularity_name: str - - def __post_init__(self) -> None: # noqa: D105 - super().__post_init__() - assert len(self.parent_nodes) == 1 - - @staticmethod - def create( # noqa: D102 - parent_node: DataflowPlanNode, custom_granularity_name: str - ) -> CustomGranularityBoundsNode: - return CustomGranularityBoundsNode(parent_nodes=(parent_node,), custom_granularity_name=custom_granularity_name) - - @classmethod - def id_prefix(cls) -> IdPrefix: # noqa: D102 - return StaticIdPrefix.DATAFLOW_NODE_CUSTOM_GRANULARITY_BOUNDS_ID_PREFIX - - def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 - return visitor.visit_custom_granularity_bounds_node(self) - - @property - def description(self) -> str: # noqa: D102 - return """Calculate Custom Granularity Bounds""" - - @property - def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 - return tuple(super().displayed_properties) + ( - DisplayedProperty("custom_granularity_name", self.custom_granularity_name), - ) - - @property - def parent_node(self) -> DataflowPlanNode: # noqa: D102 - return self.parent_nodes[0] - - def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102 - return ( - isinstance(other_node, self.__class__) - and other_node.custom_granularity_name == self.custom_granularity_name - ) - - def with_new_parents( # noqa: D102 - self, new_parent_nodes: Sequence[DataflowPlanNode] - ) -> CustomGranularityBoundsNode: - assert len(new_parent_nodes) == 1 - return CustomGranularityBoundsNode.create( - parent_node=new_parent_nodes[0], custom_granularity_name=self.custom_granularity_name - ) diff --git a/metricflow/dataflow/nodes/offset_by_custom_granularity.py b/metricflow/dataflow/nodes/offset_by_custom_granularity.py index f31322f76c..efb777bc47 100644 --- a/metricflow/dataflow/nodes/offset_by_custom_granularity.py +++ b/metricflow/dataflow/nodes/offset_by_custom_granularity.py @@ -2,7 +2,7 @@ from abc import ABC from dataclasses import dataclass -from typing import Optional, Sequence +from typing import Sequence from dbt_semantic_interfaces.protocols.metric import MetricTimeWindow from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix @@ -12,36 +12,31 @@ from metricflow.dataflow.dataflow_plan import DataflowPlanNode from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor -from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode -from metricflow.dataflow.nodes.filter_elements import FilterElementsNode @dataclass(frozen=True, eq=False) class OffsetByCustomGranularityNode(DataflowPlanNode, ABC): """For a given custom grain, offset its base grain by the requested number of custom grain periods. - Only accepts CustomGranularityBoundsNode as parent node. + Only accepts DataflowPlanNode as parent node. """ offset_window: MetricTimeWindow required_time_spine_specs: Sequence[TimeDimensionSpec] - custom_granularity_bounds_node: CustomGranularityBoundsNode - filter_elements_node: FilterElementsNode + time_spine_node: DataflowPlanNode def __post_init__(self) -> None: # noqa: D105 super().__post_init__() @staticmethod def create( # noqa: D102 - custom_granularity_bounds_node: CustomGranularityBoundsNode, - filter_elements_node: FilterElementsNode, + time_spine_node: DataflowPlanNode, offset_window: MetricTimeWindow, required_time_spine_specs: Sequence[TimeDimensionSpec], ) -> OffsetByCustomGranularityNode: return OffsetByCustomGranularityNode( - parent_nodes=(custom_granularity_bounds_node, filter_elements_node), - custom_granularity_bounds_node=custom_granularity_bounds_node, - filter_elements_node=filter_elements_node, + parent_nodes=(time_spine_node,), + time_spine_node=time_spine_node, offset_window=offset_window, required_time_spine_specs=required_time_spine_specs, ) @@ -74,22 +69,10 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: def with_new_parents( # noqa: D102 self, new_parent_nodes: Sequence[DataflowPlanNode] ) -> OffsetByCustomGranularityNode: - custom_granularity_bounds_node: Optional[CustomGranularityBoundsNode] = None - filter_elements_node: Optional[FilterElementsNode] = None - for parent_node in new_parent_nodes: - if isinstance(parent_node, CustomGranularityBoundsNode): - custom_granularity_bounds_node = parent_node - elif isinstance(parent_node, FilterElementsNode): - filter_elements_node = parent_node - assert custom_granularity_bounds_node and filter_elements_node, ( - "Can't rewrite OffsetByCustomGranularityNode because the node requires a CustomGranularityBoundsNode and a " - f"FilterElementsNode as parents. Instead, got: {new_parent_nodes}" - ) - + assert len(new_parent_nodes) == 1 return OffsetByCustomGranularityNode( parent_nodes=tuple(new_parent_nodes), - custom_granularity_bounds_node=custom_granularity_bounds_node, - filter_elements_node=filter_elements_node, + time_spine_node=new_parent_nodes[0], offset_window=self.offset_window, required_time_spine_specs=self.required_time_spine_specs, ) diff --git a/metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py b/metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py index 97a9eae41c..960ecba967 100644 --- a/metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py +++ b/metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py @@ -23,7 +23,6 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode -from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -474,11 +473,6 @@ def visit_join_to_custom_granularity_node( # noqa: D102 def visit_alias_specs_node(self, node: AliasSpecsNode) -> OptimizeBranchResult: # noqa: D102 raise NotImplementedError - def visit_custom_granularity_bounds_node( # noqa: D102 - self, node: CustomGranularityBoundsNode - ) -> OptimizeBranchResult: - raise NotImplementedError - def visit_offset_by_custom_granularity_node( # noqa: D102 self, node: OffsetByCustomGranularityNode ) -> OptimizeBranchResult: diff --git a/metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py b/metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py index d153899d95..e724df5533 100644 --- a/metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py +++ b/metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py @@ -17,7 +17,6 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode -from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -475,12 +474,6 @@ def visit_alias_specs_node(self, node: AliasSpecsNode) -> ComputeMetricsBranchCo self._log_visit_node_type(node) return self._default_handler(node) - def visit_custom_granularity_bounds_node( # noqa: D102 - self, node: CustomGranularityBoundsNode - ) -> ComputeMetricsBranchCombinerResult: - self._log_visit_node_type(node) - return self._default_handler(node) - def visit_offset_by_custom_granularity_node( # noqa: D102 self, node: OffsetByCustomGranularityNode ) -> ComputeMetricsBranchCombinerResult: diff --git a/metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py b/metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py index 4fea885d5f..14a6aaec0c 100644 --- a/metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py +++ b/metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py @@ -19,7 +19,6 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode -from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -366,12 +365,6 @@ def visit_alias_specs_node(self, node: AliasSpecsNode) -> OptimizeBranchResult: self._log_visit_node_type(node) return self._default_base_output_handler(node) - def visit_custom_granularity_bounds_node( # noqa: D102 - self, node: CustomGranularityBoundsNode - ) -> OptimizeBranchResult: - self._log_visit_node_type(node) - return self._default_base_output_handler(node) - def visit_offset_by_custom_granularity_node( # noqa: D102 self, node: OffsetByCustomGranularityNode ) -> OptimizeBranchResult: diff --git a/metricflow/execution/dataflow_to_execution.py b/metricflow/execution/dataflow_to_execution.py index 4d8807738b..95add97437 100644 --- a/metricflow/execution/dataflow_to_execution.py +++ b/metricflow/execution/dataflow_to_execution.py @@ -16,7 +16,6 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode -from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -208,10 +207,6 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod def visit_alias_specs_node(self, node: AliasSpecsNode) -> ConvertToExecutionPlanResult: raise NotImplementedError - @override - def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> ConvertToExecutionPlanResult: - raise NotImplementedError - @override def visit_offset_by_custom_granularity_node( self, node: OffsetByCustomGranularityNode diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index 9eeb6e0eff..4df4a78070 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -84,7 +84,6 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode -from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -544,7 +543,7 @@ def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDat agg_time_dimension_instances = parent_data_set.instances_for_time_dimensions( node.queried_agg_time_dimension_specs ) - time_spine_data_set_alias = self._next_unique_table_alias() + time_spine_alias = self._next_unique_table_alias() time_spine_data_set = self._make_time_spine_data_set( agg_time_dimension_instances=agg_time_dimension_instances, time_range_constraint=node.time_range_constraint ) @@ -552,13 +551,13 @@ def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDat # Build the join description. join_spec = self._choose_instance_for_time_spine_join(agg_time_dimension_instances).spec annotated_parent = parent_data_set.annotate(alias=parent_data_set_alias, metric_time_spec=join_spec) - annotated_time_spine = time_spine_data_set.annotate(alias=time_spine_data_set_alias, metric_time_spec=join_spec) + annotated_time_spine = time_spine_data_set.annotate(alias=time_spine_alias, metric_time_spec=join_spec) join_desc = SqlPlanJoinBuilder.make_cumulative_metric_time_range_join_description( node=node, metric_data_set=annotated_parent, time_spine_data_set=annotated_time_spine ) # Build select columns, replacing agg_time_dimensions from the parent node with columns from the time spine. - table_alias_to_instance_set[time_spine_data_set_alias] = time_spine_data_set.instance_set + table_alias_to_instance_set[time_spine_alias] = time_spine_data_set.instance_set table_alias_to_instance_set[parent_data_set_alias] = parent_data_set.instance_set.transform( FilterElements(exclude_specs=InstanceSpecSet(time_dimension_specs=node.queried_agg_time_dimension_specs)) ) @@ -572,7 +571,7 @@ def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDat description=node.description, select_columns=select_columns, from_source=time_spine_data_set.checked_sql_select_node, - from_source_alias=time_spine_data_set_alias, + from_source_alias=time_spine_alias, join_descs=(join_desc,), ), ) @@ -1282,9 +1281,9 @@ def visit_metric_time_dimension_transform_node(self, node: MetricTimeDimensionTr spec=metric_time_dimension_spec, ) ) - output_column_to_input_column[metric_time_dimension_column_association.column_name] = ( - matching_time_dimension_instance.associated_column.column_name - ) + output_column_to_input_column[ + metric_time_dimension_column_association.column_name + ] = matching_time_dimension_instance.associated_column.column_name output_instance_set = InstanceSet( measure_instances=tuple(output_measure_instances), @@ -1462,9 +1461,9 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet node.join_on_time_dimension_spec ).column_name time_spine_jon_column_name = time_spine_data_set.instance_from_time_dimension_grain_and_date_part( - time_granularity_name=node.join_on_time_dimension_spec.time_granularity.name, date_part=None + time_granularity_name=node.join_on_time_dimension_spec.time_granularity_name, date_part=None ).associated_column.column_name - join_description = SqlQueryPlanJoinBuilder.make_join_to_time_spine_join_description( + join_description = SqlPlanJoinBuilder.make_join_to_time_spine_join_description( node=node, time_spine_alias=time_spine_alias, time_spine_column_name=time_spine_jon_column_name, @@ -2053,7 +2052,8 @@ def strip_time_from_dt(ts: dt.datetime) -> dt.datetime: ), ) - def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> SqlDataSet: + def visit_offset_by_custom_granularity_node(self, node: OffsetByCustomGranularityNode) -> SqlDataSet: + # TODO: update this docstring with thorough SQL example. """Build columns that will be needed for custom offset windows. This includes columns that represent the start and end of a custom grain period, as well as the row number of the base @@ -2066,36 +2066,48 @@ def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode ROW_NUMBER() OVER (PARTITION BY fiscal_quarter ORDER BY ds) AS ds__day__row_number FROM time_spine_read_node """ - parent_data_set = node.parent_node.accept(self) - parent_instance_set = parent_data_set.instance_set - parent_data_set_alias = self._next_unique_table_alias() + """For a given custom grain, offset its base grain by the requested number of custom grain periods. + + Example: if the custom grain is `fiscal_quarter` with a base grain of DAY and we're offsetting by 1 period, the + output SQL should look something like this: - custom_granularity_name = node.custom_granularity_name - time_spine = self._get_time_spine_for_custom_granularity(custom_granularity_name) - custom_grain_instance_from_parent = parent_data_set.instance_from_time_dimension_grain_and_date_part( - time_granularity_name=custom_granularity_name, date_part=None + SELECT + CASE + WHEN DATEADD(day, ds__day__row_number - 1, ds__fiscal_quarter__first_value__offset) <= ds__fiscal_quarter__last_value__offset + THEN DATEADD(day, ds__day__row_number - 1, ds__fiscal_quarter__first_value__offset) + ELSE ds__fiscal_quarter__last_value__offset + END AS date_day + FROM custom_granularity_bounds_node + INNER JOIN filter_elements_node ON filter_elements_node.fiscal_quarter = custom_granularity_bounds_node.fiscal_quarter + """ + time_spine_data_set = node.time_spine_node.accept(self) + time_spine_alias = self._next_unique_table_alias() + offset_window = node.offset_window + custom_grain_name = offset_window.granularity + base_grain = ExpandedTimeGranularity.from_time_granularity( + self._get_time_spine_for_custom_granularity(custom_grain_name).base_granularity + ) + time_spine = self._get_time_spine_for_custom_granularity(custom_grain_name) + custom_grain_instance = time_spine_data_set.instance_from_time_dimension_grain_and_date_part( + time_granularity_name=custom_grain_name, date_part=None ) - base_grain_instance_from_parent = parent_data_set.instance_from_time_dimension_grain_and_date_part( + custom_grain_column_name = custom_grain_instance.associated_column.column_name + base_grain_instance = time_spine_data_set.instance_from_time_dimension_grain_and_date_part( time_granularity_name=time_spine.base_granularity.value, date_part=None ) + base_grain_column_name = base_grain_instance.associated_column.column_name + + # Build columns that get start and end of the custom grain period. + # Ex: FIRST_VALUE(ds) OVER (PARTITION BY fiscal_quarter ORDER BY ds) AS ds__fiscal_quarter__first_value + new_select_columns: Tuple[SqlSelectColumn, ...] = tuple() + bounds_columns: Tuple[SqlSelectColumn, ...] = () custom_column_expr = SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=parent_data_set_alias, - column_name=custom_grain_instance_from_parent.associated_column.column_name, + table_alias=time_spine_alias, column_name=custom_grain_column_name ) base_column_expr = SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=parent_data_set_alias, column_name=base_grain_instance_from_parent.associated_column.column_name + table_alias=time_spine_alias, column_name=base_grain_column_name ) - - new_instances: Tuple[TimeDimensionInstance, ...] = tuple() - new_select_columns: Tuple[SqlSelectColumn, ...] = tuple() - - # Build columns that get start and end of the custom grain period. - # Ex: "FIRST_VALUE(ds) OVER (PARTITION BY martian_day ORDER BY ds) AS ds__fiscal_quarter__first_value" for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE): - new_instance = custom_grain_instance_from_parent.with_new_spec( - new_spec=custom_grain_instance_from_parent.spec.with_window_function(window_func), - column_association_resolver=self._column_association_resolver, - ) select_column = SqlSelectColumn( expr=SqlWindowFunctionExpression.create( sql_function=window_func, @@ -2103,152 +2115,93 @@ def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode partition_by_args=(custom_column_expr,), order_by_args=(SqlWindowOrderByArgument(base_column_expr),), ), - column_alias=new_instance.associated_column.column_name, + column_alias=self._column_association_resolver.resolve_spec( + custom_grain_instance.spec.with_window_function(window_func) + ).column_name, ) - new_instances += (new_instance,) + bounds_columns += (select_column,) new_select_columns += (select_column,) # Build a column that tracks the row number for the base grain column within the custom grain period. # This will be offset by 1 to represent the number of base grain periods since the start of the custom grain period. - # Ex: "ROW_NUMBER() OVER (PARTITION BY martian_day ORDER BY ds) AS ds__day__row_number" - new_instance = base_grain_instance_from_parent.with_new_spec( - new_spec=base_grain_instance_from_parent.spec.with_window_function(SqlWindowFunction.ROW_NUMBER), - column_association_resolver=self._column_association_resolver, - ) - window_func_expr = SqlWindowFunctionExpression.create( - sql_function=SqlWindowFunction.ROW_NUMBER, - partition_by_args=(custom_column_expr,), - order_by_args=(SqlWindowOrderByArgument(base_column_expr),), - ) - new_select_column = SqlSelectColumn( - expr=window_func_expr, - column_alias=new_instance.associated_column.column_name, - ) - new_instances += (new_instance,) - new_select_columns += (new_select_column,) - - return SqlDataSet( - instance_set=InstanceSet.merge([InstanceSet(time_dimension_instances=new_instances), parent_instance_set]), - sql_select_node=SqlSelectStatementNode.create( - description=node.description, - select_columns=parent_data_set.checked_sql_select_node.select_columns + new_select_columns, - from_source=parent_data_set.checked_sql_select_node, - from_source_alias=parent_data_set_alias, + # Ex: ROW_NUMBER() OVER (PARTITION BY fiscal_quarter ORDER BY ds) AS ds__day__row_number + row_number_column = SqlSelectColumn( + expr=SqlWindowFunctionExpression.create( + sql_function=SqlWindowFunction.ROW_NUMBER, + partition_by_args=(custom_column_expr,), + order_by_args=(SqlWindowOrderByArgument(base_column_expr),), + ), + column_alias=self._column_association_resolver.resolve_spec( + base_grain_instance.spec.with_window_function(SqlWindowFunction.ROW_NUMBER) + ).column_name, + ) + new_select_columns += (row_number_column,) + + # Built a CTE for the new select statement. + cte_alias = self._next_unique_table_alias() # TODO: do we use a different alias scheme for CTEs? + cte = SqlCteNode.create( + SqlSelectStatementNode.create( + description="Get Custom Granularity Bounds", + select_columns=time_spine_data_set.checked_sql_select_node.select_columns + new_select_columns, + from_source=time_spine_data_set.checked_sql_select_node, + from_source_alias=time_spine_alias, ), + cte_alias=cte_alias, ) - def visit_offset_by_custom_granularity_node(self, node: OffsetByCustomGranularityNode) -> SqlDataSet: - """For a given custom grain, offset its base grain by the requested number of custom grain periods. - - Example: if the custom grain is `fiscal_quarter` with a base grain of DAY and we're offsetting by 1 period, the - output SQL should look something like this: - - SELECT - CASE - WHEN DATEADD(day, ds__day__row_number - 1, ds__fiscal_quarter__first_value__offset) <= ds__fiscal_quarter__last_value__offset - THEN DATEADD(day, ds__day__row_number - 1, ds__fiscal_quarter__first_value__offset) - ELSE ds__fiscal_quarter__last_value__offset - END AS date_day - FROM custom_granularity_bounds_node - INNER JOIN filter_elements_node ON filter_elements_node.fiscal_quarter = custom_granularity_bounds_node.fiscal_quarter - """ - bounds_data_set = node.custom_granularity_bounds_node.accept(self) - bounds_instance_set = bounds_data_set.instance_set - bounds_data_set_alias = self._next_unique_table_alias() - filter_elements_data_set = node.filter_elements_node.accept(self) - filter_elements_instance_set = filter_elements_data_set.instance_set - filter_elements_data_set_alias = self._next_unique_table_alias() - offset_window = node.offset_window - custom_grain_name = offset_window.granularity - base_grain = ExpandedTimeGranularity.from_time_granularity( - self._get_time_spine_for_custom_granularity(custom_grain_name).base_granularity + # Create a subquery that gets unique rows for the custom grain and bounds columns. + # Ex: SELECT first_value, last_value, custom_grain FROM cte GROUP BY first_value, last_value, custom_grain + unique_rows_columns = tuple( + SqlSelectColumn.from_table_and_column_names(column_name=column_name, table_alias=cte_alias) + for column_name in ([custom_grain_column_name] + [col.column_alias for col in bounds_columns]) ) - - # Find the required instances in the parent data sets. - first_value_instance: Optional[TimeDimensionInstance] = None - last_value_instance: Optional[TimeDimensionInstance] = None - row_number_instance: Optional[TimeDimensionInstance] = None - custom_grain_instance: Optional[TimeDimensionInstance] = None - base_grain_instance: Optional[TimeDimensionInstance] = None - for instance in filter_elements_instance_set.time_dimension_instances: - if instance.spec.window_function is SqlWindowFunction.FIRST_VALUE: - first_value_instance = instance - elif instance.spec.window_function is SqlWindowFunction.LAST_VALUE: - last_value_instance = instance - elif instance.spec.time_granularity and instance.spec.time_granularity.name == custom_grain_name: - custom_grain_instance = instance - if custom_grain_instance and first_value_instance and last_value_instance: - break - for instance in bounds_instance_set.time_dimension_instances: - if instance.spec.window_function is SqlWindowFunction.ROW_NUMBER: - row_number_instance = instance - elif ( - instance.spec.time_granularity - and instance.spec.time_granularity == base_grain - and instance.spec.date_part is None - ): - base_grain_instance = instance - if base_grain_instance and row_number_instance: - break - assert ( - custom_grain_instance - and base_grain_instance - and first_value_instance - and last_value_instance - and row_number_instance - ), ( - "Did not find all required time dimension instances in parent data sets for OffsetByCustomGranularityNode. " - f"This indicates internal misconfiguration. Got custom grain instance: {custom_grain_instance}; base grain " - f"instance: {base_grain_instance}; first value instance: {first_value_instance}; last value instance: " - f"{last_value_instance}; row number instance: {row_number_instance}\n" - f"Available instances:{bounds_instance_set.time_dimension_instances}." + unique_rows_alias = self._next_unique_table_alias() + unique_rows_subquery = SqlSelectStatementNode.create( + description="Get Unique Rows for Custom Granularity Bounds", + select_columns=unique_rows_columns, + from_source=SqlTableNode.create(sql_table=SqlTable(schema_name=None, table_name=cte_alias)), + cte_sources=(cte,), + from_source_alias=cte_alias, + group_bys=unique_rows_columns, ) - # First, build a subquery that offsets the first and last value columns. - custom_grain_column_name = custom_grain_instance.associated_column.column_name + # Build a subquery that offsets the first and last value columns. + # Ex: LEAD(ds__fiscal_quarter__first_value, 1) OVER (ORDER BY ds__fiscal_quarter) AS ds__fiscal_quarter__first_value__offset custom_grain_column = SqlSelectColumn.from_table_and_column_names( - column_name=custom_grain_column_name, table_alias=filter_elements_data_set_alias + column_name=custom_grain_column_name, table_alias=unique_rows_alias ) first_value_offset_column, last_value_offset_column = tuple( SqlSelectColumn( expr=SqlWindowFunctionExpression.create( sql_function=SqlWindowFunction.LEAD, sql_function_args=( - SqlColumnReferenceExpression.from_table_and_column_names( - column_name=instance.associated_column.column_name, - table_alias=filter_elements_data_set_alias, - ), + bounds_column.ref_with_new_table_alias(unique_rows_alias), SqlIntegerExpression.create(node.offset_window.count), ), order_by_args=(SqlWindowOrderByArgument(custom_grain_column.expr),), ), - column_alias=f"{instance.associated_column.column_name}{DUNDER}offset", + column_alias=f"{bounds_column.column_alias}{DUNDER}offset", ) - for instance in (first_value_instance, last_value_instance) + for bounds_column in bounds_columns ) offset_bounds_subquery_alias = self._next_unique_table_alias() offset_bounds_subquery = SqlSelectStatementNode.create( description="Offset Custom Granularity Bounds", select_columns=(custom_grain_column, first_value_offset_column, last_value_offset_column), - from_source=filter_elements_data_set.checked_sql_select_node, - from_source_alias=filter_elements_data_set_alias, + from_source=unique_rows_subquery, + from_source_alias=unique_rows_alias, ) - offset_bounds_subquery_alias = self._next_unique_table_alias() - # Offset the base column by the requested window. If the offset date is not within the offset custom grain period, - # default to the last value in that period. - first_value_offset_expr, last_value_offset_expr = [ - SqlColumnReferenceExpression.from_table_and_column_names( - column_name=offset_column.column_alias, table_alias=offset_bounds_subquery_alias - ) - for offset_column in (first_value_offset_column, last_value_offset_column) - ] + # Offset the base column by the requested window. If offset date is not within the offset grain period, use NULL. Ex: + # CASE + # WHEN DATEADD(day, (ds__day__row_number - 1), ds__fiscal_quarter__first_value__offset) + # <= ds__fiscal_quarter__last_value__offset + # THEN DATEADD(day, (ds__day__row_number - 1), ds__fiscal_quarter__first_value__offset) + # ELSE NULL offset_base_grain_expr = SqlAddTimeExpression.create( - arg=first_value_offset_expr, + arg=first_value_offset_column.ref_with_new_table_alias(offset_bounds_subquery_alias), count_expr=SqlArithmeticExpression.create( - left_expr=SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=bounds_data_set_alias, column_name=row_number_instance.associated_column.column_name - ), + left_expr=row_number_column.ref_with_new_table_alias(time_spine_alias), operator=SqlArithmeticOperator.SUBTRACT, right_expr=SqlIntegerExpression.create(1), ), @@ -2257,89 +2210,77 @@ def visit_offset_by_custom_granularity_node(self, node: OffsetByCustomGranularit is_below_last_value_expr = SqlComparisonExpression.create( left_expr=offset_base_grain_expr, comparison=SqlComparison.LESS_THAN_OR_EQUALS, - right_expr=last_value_offset_expr, - ) - offset_base_instance = base_grain_instance.with_new_spec( - # LEAD isn't quite accurate here, but this will differentiate the offset instance (and column) from the original one. - new_spec=base_grain_instance.spec.with_window_function(SqlWindowFunction.LEAD), - column_association_resolver=self._column_association_resolver, + right_expr=last_value_offset_column.ref_with_new_table_alias(offset_bounds_subquery_alias), ) + # LEAD isn't quite accurate here, but this will differentiate the offset instance (and column) from the original one. + offset_base_column_name = self._column_association_resolver.resolve_spec( + base_grain_instance.spec.with_window_function(SqlWindowFunction.LEAD) + ).column_name offset_base_column = SqlSelectColumn( expr=SqlCaseExpression.create( when_to_then_exprs={is_below_last_value_expr: offset_base_grain_expr}, else_expr=SqlNullExpression.create(), ), - column_alias=offset_base_instance.associated_column.column_name, + column_alias=offset_base_column_name, ) original_base_grain_column = SqlSelectColumn.from_table_and_column_names( - column_name=base_grain_instance.associated_column.column_name, table_alias=bounds_data_set_alias + column_name=base_grain_column_name, table_alias=time_spine_alias ) join_desc = SqlJoinDescription( right_source=offset_bounds_subquery, right_source_alias=offset_bounds_subquery_alias, join_type=SqlJoinType.INNER, on_condition=SqlComparisonExpression.create( - left_expr=SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=bounds_data_set_alias, column_name=custom_grain_column_name - ), + left_expr=custom_grain_column.ref_with_new_table_alias(cte_alias), comparison=SqlComparison.EQUALS, - right_expr=SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=offset_bounds_subquery_alias, column_name=custom_grain_column_name - ), + right_expr=custom_grain_column.ref_with_new_table_alias(offset_bounds_subquery_alias), ), ) offset_base_grain_subquery = SqlSelectStatementNode.create( description=node.description, select_columns=(original_base_grain_column, offset_base_column), - from_source=bounds_data_set.checked_sql_select_node, - from_source_alias=bounds_data_set_alias, + from_source=time_spine_data_set.checked_sql_select_node, + from_source_alias=time_spine_alias, join_descs=(join_desc,), ) offset_base_grain_subquery_alias = self._next_unique_table_alias() # Apply standard grains & date parts requested in the query. Use base grain for any custom grains. - standard_grain_instances: Tuple[TimeDimensionInstance, ...] = () - standard_grain_columns: Tuple[SqlSelectColumn, ...] = () - offset_base_column_ref = SqlSelectColumn( - expr=SqlColumnReferenceExpression.from_table_and_column_names( - column_name=offset_base_instance.associated_column.column_name, - table_alias=offset_base_grain_subquery_alias, - ), - column_alias=base_grain_instance.associated_column.column_name, - ) + requested_instances: Tuple[TimeDimensionInstance, ...] = () + requested_columns: Tuple[SqlSelectColumn, ...] = () + offset_base_column_ref = offset_base_column.ref_with_new_table_alias(offset_base_grain_subquery_alias) for spec in node.required_time_spine_specs: new_instance = base_grain_instance.with_new_spec( new_spec=spec, column_association_resolver=self._column_association_resolver ) - standard_grain_instances += (new_instance,) if spec.date_part: expr: SqlExpressionNode = SqlExtractExpression.create( - date_part=spec.date_part, arg=offset_base_column_ref.expr + date_part=spec.date_part, arg=offset_base_column_ref ) else: assert ( spec.time_granularity is not None ), "Got no time granularity or date part for required time spine spec." if spec.time_granularity.base_granularity == base_grain.base_granularity: - expr = offset_base_column_ref.expr + expr = offset_base_column_ref else: expr = SqlDateTruncExpression.create( - time_granularity=spec.time_granularity.base_granularity, arg=offset_base_column_ref.expr + time_granularity=spec.time_granularity.base_granularity, arg=offset_base_column_ref ) - standard_grain_columns += ( - SqlSelectColumn(expr=expr, column_alias=new_instance.associated_column.column_name), - ) + requested_columns += (SqlSelectColumn(expr=expr, column_alias=new_instance.associated_column.column_name),) + requested_instances += (new_instance,) # Need to keep the non-offset base grain column in the output. This will be used to join to the source data set. + # TODO: how do we differenciate this column name from the offset ones? non_offset_base_grain_column = SqlSelectColumn.from_table_and_column_names( - column_name=base_grain_instance.associated_column.column_name, table_alias=offset_base_grain_subquery_alias + column_name=base_grain_column_name, table_alias=offset_base_grain_subquery_alias ) return SqlDataSet( - instance_set=InstanceSet(time_dimension_instances=(base_grain_instance,) + standard_grain_instances), + instance_set=InstanceSet(time_dimension_instances=(base_grain_instance,) + requested_instances), sql_select_node=SqlSelectStatementNode.create( description="Apply Requested Granularities", - select_columns=(non_offset_base_grain_column,) + standard_grain_columns, + select_columns=(non_offset_base_grain_column,) + requested_columns, from_source=offset_base_grain_subquery, from_source_alias=offset_base_grain_subquery_alias, ), @@ -2540,12 +2481,6 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod def visit_alias_specs_node(self, node: AliasSpecsNode) -> SqlDataSet: # noqa: D102 return self._default_handler(node=node, node_to_select_subquery_function=super().visit_alias_specs_node) - @override - def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> SqlDataSet: # noqa: D102 - return self._default_handler( - node=node, node_to_select_subquery_function=super().visit_custom_granularity_bounds_node - ) - @override def visit_offset_by_custom_granularity_node(self, node: OffsetByCustomGranularityNode) -> SqlDataSet: # noqa: D102 return self._default_handler( diff --git a/metricflow/sql/sql_plan.py b/metricflow/sql/sql_plan.py index d1d24d2dc6..e01280b5ee 100644 --- a/metricflow/sql/sql_plan.py +++ b/metricflow/sql/sql_plan.py @@ -112,6 +112,15 @@ def from_table_and_column_names(table_alias: str, column_name: str) -> SqlSelect column_alias=column_name, ) + def ref_with_new_table_alias(self, new_table_alias: str) -> SqlColumnReferenceExpression: + """Return a column reference expression for this column with a new table alias. + + Useful when you already have access to the select column from a subquery and want to reference it in an outer query. + """ + return SqlColumnReferenceExpression.from_table_and_column_names( + column_name=self.column_alias, table_alias=new_table_alias + ) + @dataclass(frozen=True) class SqlJoinDescription: diff --git a/tests_metricflow/dataflow/optimizer/source_scan/test_source_scan_optimizer.py b/tests_metricflow/dataflow/optimizer/source_scan/test_source_scan_optimizer.py index a66e5a9e51..d267ec2aeb 100644 --- a/tests_metricflow/dataflow/optimizer/source_scan/test_source_scan_optimizer.py +++ b/tests_metricflow/dataflow/optimizer/source_scan/test_source_scan_optimizer.py @@ -24,7 +24,6 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode -from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -116,9 +115,6 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod def visit_alias_specs_node(self, node: AliasSpecsNode) -> int: # noqa: D102 return self._sum_parents(node) - def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> int: # noqa: D102 - return self._sum_parents(node) - def visit_offset_by_custom_granularity_node(self, node: OffsetByCustomGranularityNode) -> int: # noqa: D102 return self._sum_parents(node) diff --git a/tests_metricflow/integration/test_configured_cases.py b/tests_metricflow/integration/test_configured_cases.py index 069d902088..7f540b02b1 100644 --- a/tests_metricflow/integration/test_configured_cases.py +++ b/tests_metricflow/integration/test_configured_cases.py @@ -111,7 +111,7 @@ def render_date_add( count_expr=SqlStringExpression.create(sql_expr=count_column, requires_parenthesis=False), granularity=granularity, ) - return self._sql_client.sql_query_plan_renderer.expr_renderer.render_sql_expr(expr).sql + return self._sql_client.sql_plan_renderer.expr_renderer.render_sql_expr(expr).sql def render_date_trunc(self, expr: str, granularity: TimeGranularity) -> str: """Return the DATE_TRUNC() call that can be used for converting the given expr to the granularity.""" diff --git a/tests_metricflow/query_rendering/test_custom_granularity.py b/tests_metricflow/query_rendering/test_custom_granularity.py index fad3f56ccb..1e0badf069 100644 --- a/tests_metricflow/query_rendering/test_custom_granularity.py +++ b/tests_metricflow/query_rendering/test_custom_granularity.py @@ -619,7 +619,7 @@ def test_custom_offset_window( # noqa: D103 request: FixtureRequest, mf_test_configuration: MetricFlowTestConfiguration, dataflow_plan_builder: DataflowPlanBuilder, - dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter, + dataflow_to_sql_converter: DataflowToSqlPlanConverter, sql_client: SqlClient, query_parser: MetricFlowQueryParser, ) -> None: @@ -643,7 +643,7 @@ def test_custom_offset_window_with_granularity_and_date_part( # noqa: D103 request: FixtureRequest, mf_test_configuration: MetricFlowTestConfiguration, dataflow_plan_builder: DataflowPlanBuilder, - dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter, + dataflow_to_sql_converter: DataflowToSqlPlanConverter, sql_client: SqlClient, query_parser: MetricFlowQueryParser, ) -> None: