diff --git a/metrics_layer/core/model/filter.py b/metrics_layer/core/model/filter.py index 9be8867..1cb9abd 100644 --- a/metrics_layer/core/model/filter.py +++ b/metrics_layer/core/model/filter.py @@ -59,8 +59,12 @@ class MetricsLayerFilterExpressionType(str, Enum): IsNotNull = "is_not_null" IsIn = "isin" IsNotIn = "isnotin" + IsInQuery = "is_in_query" + IsNotInQuery = "is_not_in_query" BooleanTrue = "boolean_true" BooleanFalse = "boolean_false" + IsTrue = "is_true" + IsFalse = "is_false" Matches = "matches" def __hash__(self): @@ -575,6 +579,8 @@ def sql_query(sql_to_compare: str, expression_type: str, value, field_datatype: MetricsLayerFilterExpressionType.IsNotIn: lambda f: f.isin(value).negate(), MetricsLayerFilterExpressionType.BooleanTrue: lambda f: LiteralValueCriterion(f), MetricsLayerFilterExpressionType.BooleanFalse: lambda f: f.negate(), + MetricsLayerFilterExpressionType.IsTrue: lambda f: LiteralValueCriterion(f), + MetricsLayerFilterExpressionType.IsFalse: lambda f: f.negate(), } try: diff --git a/metrics_layer/core/sql/query_filter.py b/metrics_layer/core/sql/query_filter.py index 9a8d4d2..fdf8b0d 100644 --- a/metrics_layer/core/sql/query_filter.py +++ b/metrics_layer/core/sql/query_filter.py @@ -57,7 +57,6 @@ def __init__( else: self.query_type = definition["query_type"] self.filter_type = filter_type - self._extra_group_by_filter_conditions = [] self.validate(definition) @@ -80,7 +79,10 @@ def conditions(self): @property def is_group_by(self): - return self.group_by is not None + return self.group_by is not None or self.expression in { + MetricsLayerFilterExpressionType.IsInQuery.value, + MetricsLayerFilterExpressionType.IsNotInQuery.value, + } @property def is_funnel(self): @@ -243,6 +245,33 @@ def _handle_cte_alias_replacement( def isin_sql_query(self): cte_alias = self.group_by_filter_cte_lookup[hash(self)] + if self.group_by: + return self._create_legacy_group_by_is_in_query(cte_alias) + else: + return self._create_is_in_query(cte_alias) + + def _create_is_in_query(self, cte_alias): + connection_field_id = self.value["field"] + connection_field = self.design.get_field(connection_field_id) + base = query_lookup[self.query_type] + subquery = base.from_(Table(cte_alias)).select(connection_field.alias(with_view=True)).distinct() + if self.expression == MetricsLayerFilterExpressionType.IsNotInQuery.value: + expression = MetricsLayerFilterExpressionType.IsNotIn.value + elif self.expression == MetricsLayerFilterExpressionType.IsInQuery.value: + expression = MetricsLayerFilterExpressionType.IsIn.value + else: + raise QueryError(f"Invalid expression for subquery filter: {self.expression}") + + definition = { + "query_type": self.query_type, + "field": self.field.id(), + "expression": expression, + "value": subquery, + } + f = MetricsLayerFilter(definition=definition, design=None, filter_type="where") + return f.criterion(self.field.sql_query(self.query_type)) + + def _create_legacy_group_by_is_in_query(self, cte_alias): group_by_field = self.design.get_field(self.group_by) base = query_lookup[self.query_type] subquery = base.from_(Table(cte_alias)).select(group_by_field.alias(with_view=True)).distinct() @@ -292,33 +321,31 @@ def criterion(self, field_sql: str) -> Criterion: field_datatype = "unknown" return Filter.sql_query(field_sql, self.expression_type, self.value, field_datatype) - def consolidate_group_by_filter(self, filter_class_to_consolidate: "MetricsLayerFilter") -> None: - """ - Consolidate a group_by filter with another filter - """ - if not self.is_group_by: - raise QueryError("A group_by filter is invalid for a filter with no group_by property") - - if self.group_by != filter_class_to_consolidate.group_by: - raise QueryError("The group_by field must be the same for both filters") - - joinable_graphs = [jg for jg in self.field.join_graphs() if "merged_result" not in jg] - consolidate_joinable_graphs = [ - jg for jg in filter_class_to_consolidate.field.join_graphs() if "merged_result" not in jg - ] - join_overlap = set.intersection(*map(set, [joinable_graphs, consolidate_joinable_graphs])) - if len(join_overlap) == 0: - raise QueryError("The filters must have a join path in common to be consolidated") - - self._extra_group_by_filter_conditions.append(filter_class_to_consolidate) - def cte(self, query_class, design_class): if not self.is_group_by: - raise QueryError("A CTE is invalid for a filter with no group_by property") + raise QueryError( + "A CTE is invalid for a filter with no group_by property or is_in_query/is_not_in_query" + " expression" + ) + if self.group_by: + return self._create_subquery_from_group_by_property(query_class, design_class) + elif self.expression in { + MetricsLayerFilterExpressionType.IsInQuery.value, + MetricsLayerFilterExpressionType.IsNotInQuery.value, + }: + return self._create_subquery_from_query_property() + else: + raise QueryError( + "A CTE is invalid for a filter with no group_by property or is_in_query/is_not_in_query" + " expression" + ) + + def _create_subquery_from_query_property(self): + # This is a subquery that's compiled in the `resolve.py` file in the initial parsing step. + return self.value["sql_query"] + def _create_subquery_from_group_by_property(self, query_class, design_class): group_by_filters = [{k: v for k, v in self._definition.items() if k != "group_by"}] - for f in self._extra_group_by_filter_conditions: - group_by_filters.append({k: v for k, v in f._definition.items() if k != "group_by"}) field_lookup = {} group_by_field = self.design.get_field(self.group_by) diff --git a/metrics_layer/core/sql/query_generator.py b/metrics_layer/core/sql/query_generator.py index 97327d7..cb17972 100644 --- a/metrics_layer/core/sql/query_generator.py +++ b/metrics_layer/core/sql/query_generator.py @@ -49,7 +49,10 @@ def parse_definition(self, definition: dict): access_filter_literal, _ = self.design.get_access_filter() if where or access_filter_literal: wheres, group_by_wheres, group_by_where_cte_lookup = self._parse_filter_object( - where, "where", access_filter=access_filter_literal + where, + "where", + access_filter=access_filter_literal, + nesting_depth=definition.get("nesting_depth", 0), ) self.where_filters.extend([f for f in wheres if not f.is_funnel]) self.funnel_filters.extend([f for f in wheres if f.is_funnel]) @@ -102,7 +105,9 @@ def parse_definition(self, definition: dict): } ) - def _parse_filter_object(self, filter_object, filter_type: str, access_filter: str = None): + def _parse_filter_object( + self, filter_object, filter_type: str, access_filter: str = None, nesting_depth: int = 0 + ): results, group_by_results, group_by_cte_lookup = [], [], {} extra_kwargs = dict(filter_type=filter_type, design=self.design) @@ -121,10 +126,14 @@ def _parse_filter_object(self, filter_object, filter_type: str, access_filter: s for filter_dict in filter_object: flattened_filters = flatten_filters(filter_dict) for sub_filter in flattened_filters: - if "group_by" in sub_filter: - gb_f = MetricsLayerFilter(definition=sub_filter, **extra_kwargs) - group_by_cte_lookup[hash(gb_f)] = f"filter_subquery_{cte_counter}" - group_by_results.append(gb_f) + f = MetricsLayerFilter(definition=sub_filter, **extra_kwargs) + if f.is_group_by: + if nesting_depth > 0: + cte_alias = f"filter_subquery_{nesting_depth}_{cte_counter}" + else: + cte_alias = f"filter_subquery_{cte_counter}" + group_by_cte_lookup[hash(f)] = cte_alias + group_by_results.append(f) cte_counter += 1 for filter_dict in filter_object: diff --git a/metrics_layer/core/sql/resolve.py b/metrics_layer/core/sql/resolve.py index 83d9bd5..d06db5a 100644 --- a/metrics_layer/core/sql/resolve.py +++ b/metrics_layer/core/sql/resolve.py @@ -3,7 +3,7 @@ from typing import List, Union from metrics_layer.core.exceptions import JoinError, QueryError -from metrics_layer.core.model.filter import Filter +from metrics_layer.core.model.filter import Filter, MetricsLayerFilterExpressionType from metrics_layer.core.model.project import Project from metrics_layer.core.sql.merged_query_resolve import MergedSQLQueryResolver from metrics_layer.core.sql.query_base import QueryKindTypes @@ -314,7 +314,9 @@ def _replace_field_value_in_group_by_filter(self): optimal_join_graph_connection = [ o for o in optimal_join_graph_connection if "merged_result" not in o ] - flattened_conditions = SingleSQLQueryResolver.flatten_filters(self.where) + flattened_conditions = SingleSQLQueryResolver.flatten_filters( + self.where, return_nesting_depth=True + ) for cond in flattened_conditions: if "group_by" in cond: # Only the group by field needs to be joinable or merge-able to the query @@ -332,6 +334,55 @@ def _replace_field_value_in_group_by_filter(self): ) self.field_id_mapping[cond["field"]] = replace_with.id() cond["field"] = replace_with.id() + elif cond["expression"] in { + MetricsLayerFilterExpressionType.IsInQuery.value, + MetricsLayerFilterExpressionType.IsNotInQuery.value, + }: + defaults = { + "project": self.project, + "connections": self.connections, + "model_name": self.model.name, + "return_pypika_query": False, + } + if "query_type" in self.kwargs: + defaults["query_type"] = self.kwargs["query_type"] + + # This handles the case where the passed filter is incomplete, and + # does not apply the filter + if "query" not in cond["value"]: + continue + + if "query" in cond["value"] and not isinstance(cond["value"]["query"], dict): + raise QueryError( + "Subquery filter value for the key 'query' must be a dictionary. It was" + f" {cond['value']['query']}" + ) + + if "apply_limit" in cond["value"] and not bool(cond["value"]["apply_limit"]): + cond["value"]["query"]["limit"] = None + + if "nesting_depth" in cond and cond["nesting_depth"] > 0: + defaults["nesting_depth"] = cond["nesting_depth"] + + resolver = SQLQueryResolver(**cond["value"]["query"], **defaults) + jg_connection = set.intersection(*map(set, resolver.field_lookup.values())) + optimal_jg_connection = [o for o in jg_connection if "merged_result" not in o] + + mapped_field = self.project.get_mapped_field(cond["value"]["field"], model=self.model) + if mapped_field: + field = self.determine_field_to_replace_with( + mapped_field, optimal_jg_connection, jg_connection + ) + self.field_id_mapping[cond["value"]["field"]] = field.id() + cond["value"]["field"] = field.id() + else: + field = self.project.get_field(cond["value"]["field"]) + if field.id() not in {self.project.get_field(d).id() for d in resolver.dimensions}: + raise QueryError( + f"Field {field.id()} not found in subquery dimensions {resolver.dimensions}. You" + " must specify a dimension that is present in the subquery." + ) + cond["value"]["sql_query"] = resolver.get_query(semicolon=False) def _get_field_from_lookup(self, field_name: str, only_search_lookup: bool = False): if field_name in self.field_object_lookup: @@ -538,6 +589,7 @@ def _deduplicate_always_where_filters(filters: list): def _clean_conditional_filter_syntax(self, filters: Union[str, None, List]): if not filters or isinstance(filters, str): return filters + if isinstance(filters, dict): return [filters] diff --git a/metrics_layer/core/sql/single_query_resolve.py b/metrics_layer/core/sql/single_query_resolve.py index 3acc8d9..1b8f320 100644 --- a/metrics_layer/core/sql/single_query_resolve.py +++ b/metrics_layer/core/sql/single_query_resolve.py @@ -36,6 +36,7 @@ def __init__( self.funnel, self.is_funnel_query = self.parse_funnel(funnel) self.parse_field_names(where, having, order_by) self.model = model + self.nesting_depth = kwargs.get("nesting_depth", 0) self.query_type = kwargs.get("query_type") if self.query_type is None: raise QueryError( @@ -64,6 +65,7 @@ def get_query(self, semicolon: bool = True): "select_raw_sql": self.select_raw_sql, "limit": self.limit, "return_pypika_query": self.return_pypika_query, + "nesting_depth": self.nesting_depth, } if self.has_cumulative_metric and self.is_funnel_query: raise QueryError("Cumulative metrics cannot be used with funnel queries") @@ -242,8 +244,8 @@ def parse_identifiers_from_dicts(conditions: list): raise QueryError(f"Identifier was missing required 'field' key: {cond}") @staticmethod - def flatten_filters(filters: list): - return flatten_filters(filters) + def flatten_filters(filters: list, return_nesting_depth: bool = False): + return flatten_filters(filters, return_nesting_depth=return_nesting_depth) @staticmethod def _check_for_dict(conditions: list): diff --git a/metrics_layer/core/utils.py b/metrics_layer/core/utils.py index 89d1dd4..c8d9646 100644 --- a/metrics_layer/core/utils.py +++ b/metrics_layer/core/utils.py @@ -16,19 +16,25 @@ def generate_random_password(length): return result_str -def flatten_filters(filters: list): +def flatten_filters(filters: list, return_nesting_depth: bool = False): + nesting_depth = 0 flat_list = [] - def recurse(filter_obj): + def recurse(filter_obj, return_nesting_depth: bool): + nonlocal nesting_depth if isinstance(filter_obj, dict): if "conditions" in filter_obj: + nesting_depth += 1 for f in filter_obj["conditions"]: - recurse(f) + recurse(f, return_nesting_depth) else: + if return_nesting_depth: + filter_obj["nesting_depth"] = nesting_depth flat_list.append(filter_obj) elif isinstance(filter_obj, list): + nesting_depth += 1 for item in filter_obj: - recurse(item) + recurse(item, return_nesting_depth) - recurse(filters) + recurse(filters, return_nesting_depth=return_nesting_depth) return flat_list diff --git a/pyproject.toml b/pyproject.toml index d44e575..0ee19b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metrics_layer" -version = "0.12.47" +version = "0.12.48" description = "The open source metrics layer." authors = ["Paul Blankley "] keywords = ["Metrics Layer", "Business Intelligence", "Analytics"] diff --git a/tests/test_subquery_filter_query.py b/tests/test_subquery_filter_query.py new file mode 100644 index 0000000..5b9aa91 --- /dev/null +++ b/tests/test_subquery_filter_query.py @@ -0,0 +1,618 @@ +# from datetime import datetime + +import pytest + +from metrics_layer.core.exceptions import ( + AccessDeniedOrDoesNotExistException, + JoinError, + QueryError, +) + +# from metrics_layer.core.model import Definitions +# from metrics_layer.core.sql.query_errors import ParseError + + +@pytest.mark.query +def test_query_subquery_filter(connection): + query = connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=["region"], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "orders.order_id", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["orders.order_id"], + "where": [ + { + "field": "channel", + "expression": "contains_case_insensitive", + "value": "social", + } + ], + }, + "field": "orders.order_id", + }, + }, + ], + "logical_operator": "AND", + } + }, + {"field": "product_name", "expression": "not_equal_to", "value": "Shipping Protection"}, + ], + having=[{"field": "total_item_revenue", "expression": "less_than", "value": 300_000}], + ) + + correct = ( + "WITH filter_subquery_0 AS (SELECT orders.id as orders_order_id FROM analytics.order_line_items" + " order_lines LEFT JOIN analytics.orders orders ON order_lines.order_unique_id=orders.id WHERE" + " LOWER(order_lines.sales_channel) LIKE LOWER('%social%') GROUP BY orders.id ORDER BY orders_order_id" + " ASC NULLS LAST) SELECT customers.region as customers_region,NULLIF(COUNT(DISTINCT CASE WHEN " + " (orders.id) IS NOT NULL THEN orders.id ELSE NULL END), 0) as orders_number_of_orders FROM" + " analytics.order_line_items order_lines LEFT JOIN analytics.orders orders ON" + " order_lines.order_unique_id=orders.id LEFT JOIN analytics.customers customers ON" + " order_lines.customer_id=customers.customer_id WHERE orders.id IN (SELECT DISTINCT orders_order_id" + " FROM filter_subquery_0) AND order_lines.product_name<>'Shipping Protection' GROUP BY" + " customers.region HAVING SUM(order_lines.revenue)<300000 ORDER BY orders_number_of_orders DESC NULLS" + " LAST;" + ) + assert query == correct + + +@pytest.mark.query +def test_query_subquery_filter_with_or_syntax(connection): + query = connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=["region"], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "orders.order_id", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["orders.order_id"], + "where": [ + { + "field": "channel", + "expression": "contains_case_insensitive", + "value": "social", + } + ], + }, + "field": "orders.order_id", + }, + }, + { + "field": "orders.order_id", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["orders.order_id"], + "where": [ + { + "field": "channel", + "expression": "contains_case_insensitive", + "value": "email", + }, + { + "field": "orders.order_date", + "expression": "greater_than", + "value": "2024-01-01", + }, + { + "field": "orders.order_date", + "expression": "less_than", + "value": "2024-01-31", + }, + ], + }, + "field": "orders.order_id", + }, + }, + { + "field": "product_name", + "expression": "not_equal_to", + "value": "Shipping Protection", + }, + ], + "logical_operator": "OR", + } + }, + ], + ) + + correct = ( + "WITH filter_subquery_0 AS (SELECT orders.id as orders_order_id FROM analytics.order_line_items" + " order_lines LEFT JOIN analytics.orders orders ON order_lines.order_unique_id=orders.id WHERE" + " LOWER(order_lines.sales_channel) LIKE LOWER('%social%') GROUP BY orders.id ORDER BY orders_order_id" + " ASC NULLS LAST) ,filter_subquery_1 AS (SELECT orders.id as orders_order_id FROM" + " analytics.order_line_items order_lines LEFT JOIN analytics.orders orders ON" + " order_lines.order_unique_id=orders.id WHERE LOWER(order_lines.sales_channel) LIKE LOWER('%email%')" + " AND DATE_TRUNC('DAY', orders.order_date)>'2024-01-01' AND DATE_TRUNC('DAY'," + " orders.order_date)<'2024-01-31' GROUP BY orders.id ORDER BY orders_order_id ASC NULLS LAST) SELECT" + " customers.region as customers_region,NULLIF(COUNT(DISTINCT CASE WHEN (orders.id) IS NOT NULL THEN" + " orders.id ELSE NULL END), 0) as orders_number_of_orders FROM analytics.order_line_items" + " order_lines LEFT JOIN analytics.orders orders ON order_lines.order_unique_id=orders.id LEFT JOIN" + " analytics.customers customers ON order_lines.customer_id=customers.customer_id WHERE orders.id IN" + " (SELECT DISTINCT orders_order_id FROM filter_subquery_0) OR orders.id IN (SELECT DISTINCT" + " orders_order_id FROM filter_subquery_1) OR order_lines.product_name<>'Shipping Protection' GROUP BY" + " customers.region ORDER BY orders_number_of_orders DESC NULLS LAST;" + ) + assert query == correct + + +@pytest.mark.query +def test_query_subquery_filter_with_is_not_in_query(connection): + query = connection.get_sql_query( + metrics=["total_revenue"], + dimensions=["new_vs_repeat"], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "orders.customer_id", + "expression": "is_not_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["customers.customer_id"], + "where": [{"field": "gender", "expression": "equal_to", "value": "F"}], + }, + "field": "customers.customer_id", + }, + }, + { + "logical_operator": "OR", + "conditions": [ + {"field": "date", "expression": "less_than", "value": "2023-09-02"}, + {"field": "new_vs_repeat", "expression": "equal_to", "value": "New"}, + { + "field": "customers.customer_id", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["customers.customer_id"], + "where": [ + {"field": "gender", "expression": "equal_to", "value": "M"} + ], + }, + "field": "customers.customer_id", + }, + }, + ], + }, + ], + "logical_operator": "AND", + } + }, + ], + ) + + correct = ( + "WITH filter_subquery_0 AS (SELECT customers.customer_id as customers_customer_id FROM" + " analytics.customers customers WHERE customers.gender='F' GROUP BY customers.customer_id ORDER BY" + " customers_customer_id ASC NULLS LAST) ,filter_subquery_1 AS (SELECT customers.customer_id as" + " customers_customer_id FROM analytics.customers customers WHERE customers.gender='M' GROUP BY" + " customers.customer_id ORDER BY customers_customer_id ASC NULLS LAST) SELECT orders.new_vs_repeat as" + " orders_new_vs_repeat,SUM(orders.revenue) as orders_total_revenue FROM analytics.orders orders LEFT" + " JOIN analytics.customers customers ON orders.customer_id=customers.customer_id WHERE" + " orders.customer_id NOT IN (SELECT DISTINCT customers_customer_id FROM filter_subquery_0) AND" + " (DATE_TRUNC('DAY', orders.order_date)<'2023-09-02' OR orders.new_vs_repeat='New' OR" + " customers.customer_id IN (SELECT DISTINCT customers_customer_id FROM filter_subquery_1)) GROUP BY" + " orders.new_vs_repeat ORDER BY orders_total_revenue DESC NULLS LAST;" + ) + assert query == correct + + +@pytest.mark.query +@pytest.mark.parametrize("apply_limit", [True, False]) +def test_query_subquery_filter_limit_and_non_limit(connection, apply_limit): + query = connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=[], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "channel", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["channel"], + "having": [ + { + "field": "orders.total_revenue", + "expression": "greater_than", + "value": 100000, + } + ], + "order_by": [{"field": "orders.total_revenue", "expression": "desc"}], + "limit": 3, + }, + "apply_limit": apply_limit, # defaults to True + "field": "channel", + }, + }, + ], + "logical_operator": "AND", + } + }, + ], + ) + + if apply_limit: + correct = ( + "WITH filter_subquery_0 AS (SELECT order_lines.sales_channel as order_lines_channel FROM" + " analytics.order_line_items order_lines LEFT JOIN analytics.orders orders ON" + " order_lines.order_unique_id=orders.id GROUP BY order_lines.sales_channel HAVING" + " COALESCE(CAST((SUM(DISTINCT (CAST(FLOOR(COALESCE(orders.revenue, 0) * (1000000 * 1.0)) AS" + " DECIMAL(38,0))) + (TO_NUMBER(MD5(orders.id), 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') %" + " 1.0e27)::NUMERIC(38, 0)) - SUM(DISTINCT (TO_NUMBER(MD5(orders.id)," + " 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') % 1.0e27)::NUMERIC(38, 0))) AS DOUBLE PRECISION) /" + " CAST((1000000*1.0) AS DOUBLE PRECISION), 0)>100000 ORDER BY orders_total_revenue ASC NULLS LAST" + " LIMIT 3) SELECT NULLIF(COUNT(DISTINCT CASE WHEN (orders.id) IS NOT NULL THEN orders.id ELSE" + " NULL END), 0) as orders_number_of_orders FROM analytics.order_line_items order_lines LEFT JOIN" + " analytics.orders orders ON order_lines.order_unique_id=orders.id WHERE" + " order_lines.sales_channel IN (SELECT DISTINCT order_lines_channel FROM filter_subquery_0) ORDER" + " BY orders_number_of_orders DESC NULLS LAST;" + ) + else: + correct = ( + "WITH filter_subquery_0 AS (SELECT order_lines.sales_channel as order_lines_channel FROM" + " analytics.order_line_items order_lines LEFT JOIN analytics.orders orders ON" + " order_lines.order_unique_id=orders.id GROUP BY order_lines.sales_channel HAVING" + " COALESCE(CAST((SUM(DISTINCT (CAST(FLOOR(COALESCE(orders.revenue, 0) * (1000000 * 1.0)) AS" + " DECIMAL(38,0))) + (TO_NUMBER(MD5(orders.id), 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') %" + " 1.0e27)::NUMERIC(38, 0)) - SUM(DISTINCT (TO_NUMBER(MD5(orders.id)," + " 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') % 1.0e27)::NUMERIC(38, 0))) AS DOUBLE PRECISION) /" + " CAST((1000000*1.0) AS DOUBLE PRECISION), 0)>100000 ORDER BY orders_total_revenue ASC NULLS" + " LAST) SELECT NULLIF(COUNT(DISTINCT CASE WHEN (orders.id) IS NOT NULL THEN orders.id ELSE" + " NULL END), 0) as orders_number_of_orders FROM analytics.order_line_items order_lines LEFT JOIN" + " analytics.orders orders ON order_lines.order_unique_id=orders.id WHERE" + " order_lines.sales_channel IN (SELECT DISTINCT order_lines_channel FROM filter_subquery_0) ORDER" + " BY orders_number_of_orders DESC NULLS LAST;" + ) + + assert query == correct + + +@pytest.mark.query +def test_query_subquery_filter_with_mapping(connection): + query = connection.get_sql_query( + metrics=["number_of_sessions"], + dimensions=[], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "campaign", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["campaign", "orders.order_id"], + "having": [ + { + "field": "orders.total_revenue", + "expression": "greater_than", + "value": 100, + } + ], + }, + "field": "campaign", + }, + }, + ], + "logical_operator": "AND", + } + }, + ], + ) + + correct = ( + "WITH filter_subquery_0 AS (SELECT orders.campaign as orders_campaign,orders.id as orders_order_id" + " FROM analytics.orders orders GROUP BY orders.campaign,orders.id HAVING SUM(orders.revenue)>100" + " ORDER BY orders_campaign ASC NULLS LAST) SELECT COUNT(sessions.id) as sessions_number_of_sessions" + " FROM analytics.sessions sessions WHERE sessions.utm_campaign IN (SELECT DISTINCT orders_campaign" + " FROM filter_subquery_0) ORDER BY sessions_number_of_sessions DESC NULLS LAST;" + ) + + assert query == correct + + +@pytest.mark.query +def test_query_subquery_filter_invalid_query(connection): + with pytest.raises(TypeError) as exc_info: + connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=[], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "channel", + "expression": "is_in_query", + "value": { + "query": { + "measures": [], + "dims": ["channel"], + "having": [ + { + "field": "orders.total_revenue", + "expression": "greater_than", + "value": 100000, + } + ], + "order_by": [{"field": "orders.total_revenue", "expression": "desc"}], + }, + "field": "channel", + }, + }, + ], + "logical_operator": "AND", + } + }, + ], + ) + + assert exc_info.value + assert "missing 1 required positional argument: 'metrics'" in str(exc_info.value) + + +@pytest.mark.query +def test_query_subquery_filter_invalid_field(connection): + with pytest.raises(AccessDeniedOrDoesNotExistException) as exc_info: + connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=[], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "channel", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["channel"], + "having": [ + { + "field": "orders.total_revenue", + "expression": "greater_than", + "value": 100000, + } + ], + "order_by": [{"field": "orders.total_revenue", "expression": "desc"}], + }, + "field": "orders.fake_field", + }, + }, + ], + "logical_operator": "AND", + } + }, + ], + ) + + assert exc_info.value + assert "Field fake_field not found in view orders" in str(exc_info.value) + + +@pytest.mark.query +def test_query_subquery_filter_invalid_type(connection): + with pytest.raises(QueryError) as exc_info: + connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=[], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "channel", + "expression": "is_in_query", + "value": { + "query": "select * from myquery", + }, + }, + ], + "logical_operator": "AND", + } + }, + ], + ) + + assert exc_info.value + assert "Subquery filter value for the key 'query' must be a dict" in str(exc_info.value) + + +@pytest.mark.query +def test_query_subquery_filter_field_not_in_query(connection): + with pytest.raises(QueryError) as exc_info: + connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=[], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "orders.new_vs_repeat", + "expression": "is_in_query", + "value": { + "query": { + "metrics": ["orders.total_revenue"], + "dimensions": ["orders.order_date"], + "having": [ + { + "field": "orders.new_vs_repeat", + "expression": "equal_to", + "value": "New", + } + ], + }, + "field": "orders.new_vs_repeat", + }, + }, + ], + "logical_operator": "AND", + } + }, + ], + ) + + assert exc_info.value + assert "Field orders.new_vs_repeat not found in subquery dimensions" in str(exc_info.value) + + +@pytest.mark.query +def test_query_subquery_filter_nested_simple_case(connection): + query = connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=["region"], + where=[ + { + "conditional_filter_logic": { + "conditions": [ + { + "field": "orders.order_id", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["orders.order_id"], + "where": [ + { + "field": "orders.order_id", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["orders.order_id"], + "where": [ + { + "field": "channel", + "expression": "contains_case_insensitive", + "value": "email", + } + ], + }, + "field": "orders.order_id", + }, + }, + { + "field": "product_name", + "expression": "not_equal_to", + "value": "Shipping Protection", + }, + ], + }, + "field": "orders.order_id", + }, + }, + ], + "logical_operator": "AND", + } + }, + ], + ) + + correct = ( + "WITH filter_subquery_0 AS (WITH filter_subquery_2_0 AS (SELECT orders.id as orders_order_id FROM" + " analytics.order_line_items order_lines LEFT JOIN analytics.orders orders ON" + " order_lines.order_unique_id=orders.id WHERE LOWER(order_lines.sales_channel) LIKE LOWER('%email%')" + " GROUP BY orders.id ORDER BY orders_order_id ASC NULLS LAST) SELECT orders.id as orders_order_id" + " FROM analytics.order_line_items order_lines LEFT JOIN analytics.orders orders ON" + " order_lines.order_unique_id=orders.id WHERE orders.id IN (SELECT DISTINCT orders_order_id FROM" + " filter_subquery_2_0) AND order_lines.product_name<>'Shipping Protection' GROUP BY orders.id ORDER" + " BY orders_order_id ASC NULLS LAST) SELECT customers.region as customers_region,COUNT(orders.id) as" + " orders_number_of_orders FROM analytics.orders orders LEFT JOIN analytics.customers customers ON" + " orders.customer_id=customers.customer_id WHERE orders.id IN (SELECT DISTINCT orders_order_id FROM" + " filter_subquery_0) GROUP BY customers.region ORDER BY orders_number_of_orders DESC NULLS LAST;" + ) + assert query == correct + + +@pytest.mark.query +def test_query_subquery_filter_nested(connection): + query = connection.get_sql_query( + metrics=["number_of_orders"], + dimensions=["region"], + where=[ + { + "field": "orders.order_id", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["orders.order_id"], + "where": [ + { + "conditional_filter_logic": { + "logical_operator": "AND", + "conditions": [ + { + "field": "orders.order_id", + "expression": "is_in_query", + "value": { + "query": { + "metrics": [], + "dimensions": ["orders.order_id"], + "where": [ + { + "field": "channel", + "expression": "contains_case_insensitive", + "value": "email", + } + ], + }, + "field": "orders.order_id", + }, + }, + { + "field": "product_name", + "expression": "not_equal_to", + "value": "Shipping Protection", + }, + ], + }, + } + ], + }, + "field": "orders.order_id", + }, + }, + ], + ) + + correct = ( + "WITH filter_subquery_0 AS (WITH filter_subquery_1_0 AS (SELECT orders.id as orders_order_id FROM" + " analytics.order_line_items order_lines LEFT JOIN analytics.orders orders ON" + " order_lines.order_unique_id=orders.id WHERE LOWER(order_lines.sales_channel) LIKE LOWER('%email%')" + " GROUP BY orders.id ORDER BY orders_order_id ASC NULLS LAST) SELECT orders.id as orders_order_id" + " FROM analytics.order_line_items order_lines LEFT JOIN analytics.orders orders ON" + " order_lines.order_unique_id=orders.id WHERE orders.id IN (SELECT DISTINCT orders_order_id FROM" + " filter_subquery_1_0) AND order_lines.product_name<>'Shipping Protection' GROUP BY orders.id ORDER" + " BY orders_order_id ASC NULLS LAST) SELECT customers.region as customers_region,COUNT(orders.id) as" + " orders_number_of_orders FROM analytics.orders orders LEFT JOIN analytics.customers customers ON" + " orders.customer_id=customers.customer_id WHERE orders.id IN (SELECT DISTINCT orders_order_id FROM" + " filter_subquery_0) GROUP BY customers.region ORDER BY orders_number_of_orders DESC NULLS LAST;" + ) + assert query == correct