Skip to content

Commit

Permalink
[SPARK-46526][SQL] Support LIMIT over correlated subqueries where pre…
Browse files Browse the repository at this point in the history
…dicates only reference outer table

### What changes were proposed in this pull request?

The type of query that this PR addresses is the following:

```
SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT t2d
              FROM   t2
              WHERE t1a IS NOT NULL
              LIMIT 10);
```

Here, the predicate in the subquery `t1a IS NOT NULL` does not reference the inner table at all, so our standard decorrelation technique of "compute 10 values of t2d per every value of the inner table" does not work. In fact, such predicates can be lifted above the limit 10. This PR achieves exactly that.

### Why are the changes needed?

Fixed the bug.

### Does this PR introduce _any_ user-facing change?

Some broken queries are now working.

### How was this patch tested?

Query tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#44514 from agubichev/SPARK-46526_limit_corr.

Authored-by: Andrey Gubichev <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
agubichev authored and cloud-fan committed Feb 7, 2024
1 parent a95aa7a commit dc73a8d
Show file tree
Hide file tree
Showing 10 changed files with 431 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -673,20 +673,28 @@ object DecorrelateInnerQuery extends PredicateHelper {
decorrelate(child, parentOuterReferences, aggregated = true, underSetOp)
val collectedChildOuterReferences = collectOuterReferencesInPlanTree(child)
// Add outer references to the PARTITION BY clause
val partitionFields = collectedChildOuterReferences.map(outerReferenceMap(_)).toSeq
val orderByFields = replaceOuterReferences(ordering, outerReferenceMap)
val partitionFields = collectedChildOuterReferences
.filter(outerReferenceMap.contains(_))
.map(outerReferenceMap(_)).toSeq
if (partitionFields.isEmpty) {
// Underlying subquery has no predicates connecting inner and outer query.
// In this case, limit can be computed over the inner query directly.
(Limit(limit, newChild), joinCond, outerReferenceMap)
} else {
val orderByFields = replaceOuterReferences(ordering, outerReferenceMap)

val rowNumber = WindowExpression(RowNumber(),
WindowSpecDefinition(partitionFields, orderByFields,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))
val rowNumberAlias = Alias(rowNumber, "rn")()
// Window function computes row_number() when partitioning by correlated references,
// and projects all the other fields from the input.
val window = Window(Seq(rowNumberAlias),
partitionFields, orderByFields, newChild)
val filter = Filter(LessThanOrEqual(rowNumberAlias.toAttribute, limit), window)
val project = Project(newChild.output, filter)
(project, joinCond, outerReferenceMap)
val rowNumber = WindowExpression(RowNumber(),
WindowSpecDefinition(partitionFields, orderByFields,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))
val rowNumberAlias = Alias(rowNumber, "rn")()
// Window function computes row_number() when partitioning by correlated references,
// and projects all the other fields from the input.
val window = Window(Seq(rowNumberAlias),
partitionFields, orderByFields, newChild)
val filter = Filter(LessThanOrEqual(rowNumberAlias.toAttribute, limit), window)
val project = Project(newChild.output, filter)
(project, joinCond, outerReferenceMap)
}

case w @ Window(projectList, partitionSpec, orderSpec, child) =>
val outerReferences = collectOuterReferences(w.expressions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,63 @@ Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]


-- !query
SELECT *
FROM emp
WHERE EXISTS (SELECT max(dept.dept_id)
FROM dept
WHERE emp.salary > 200
LIMIT 1)
-- !query analysis
Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+- Filter exists#x [salary#x]
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Aggregate [max(dept_id#x) AS max(dept_id)#x]
: +- Filter (outer(salary#x) > cast(200 as double))
: +- SubqueryAlias dept
: +- View (`DEPT`, [dept_id#x, dept_name#x, state#x])
: +- Project [cast(dept_id#x as int) AS dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) AS state#x]
: +- Project [dept_id#x, dept_name#x, state#x]
: +- SubqueryAlias DEPT
: +- LocalRelation [dept_id#x, dept_name#x, state#x]
+- SubqueryAlias emp
+- View (`EMP`, [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x])
+- Project [cast(id#x as int) AS id#x, cast(emp_name#x as string) AS emp_name#x, cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS salary#x, cast(dept_id#x as int) AS dept_id#x]
+- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+- SubqueryAlias EMP
+- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]


-- !query
SELECT *
FROM emp
WHERE EXISTS (SELECT state, max(dept.dept_name)
FROM dept
WHERE emp.salary > 200
GROUP BY state
LIMIT 1)
-- !query analysis
Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+- Filter exists#x [salary#x]
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Aggregate [state#x], [state#x, max(dept_name#x) AS max(dept_name)#x]
: +- Filter (outer(salary#x) > cast(200 as double))
: +- SubqueryAlias dept
: +- View (`DEPT`, [dept_id#x, dept_name#x, state#x])
: +- Project [cast(dept_id#x as int) AS dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) AS state#x]
: +- Project [dept_id#x, dept_name#x, state#x]
: +- SubqueryAlias DEPT
: +- LocalRelation [dept_id#x, dept_name#x, state#x]
+- SubqueryAlias emp
+- View (`EMP`, [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x])
+- Project [cast(id#x as int) AS id#x, cast(emp_name#x as string) AS emp_name#x, cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS salary#x, cast(dept_id#x as int) AS dept_id#x]
+- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+- SubqueryAlias EMP
+- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]


-- !query
SELECT *
FROM emp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,91 @@ Offset 1
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]


-- !query
SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT t2d
FROM t2
WHERE t1a IS NOT NULL
LIMIT 10)
-- !query analysis
Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
+- Filter t1d#xL IN (list#x [t1a#x])
: +- GlobalLimit 10
: +- LocalLimit 10
: +- Project [t2d#xL]
: +- Filter isnotnull(outer(t1a#x))
: +- SubqueryAlias t2
: +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x])
: +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
: +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
: +- SubqueryAlias t2
: +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+- SubqueryAlias t1
+- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x])
+- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x]
+- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]
+- SubqueryAlias t1
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]


-- !query
SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT MAX(t2d)
FROM t2
WHERE t1a IS NOT NULL
LIMIT 10)
-- !query analysis
Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
+- Filter t1d#xL IN (list#x [t1a#x])
: +- GlobalLimit 10
: +- LocalLimit 10
: +- Aggregate [max(t2d#xL) AS max(t2d)#xL]
: +- Filter isnotnull(outer(t1a#x))
: +- SubqueryAlias t2
: +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x])
: +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
: +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
: +- SubqueryAlias t2
: +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+- SubqueryAlias t1
+- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x])
+- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x]
+- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]
+- SubqueryAlias t1
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]


-- !query
SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT DISTINCT t2d
FROM t2
WHERE t1a IS NOT NULL
LIMIT 10)
-- !query analysis
Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
+- Filter t1d#xL IN (list#x [t1a#x])
: +- GlobalLimit 10
: +- LocalLimit 10
: +- Distinct
: +- Project [t2d#xL]
: +- Filter isnotnull(outer(t1a#x))
: +- SubqueryAlias t2
: +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x])
: +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
: +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
: +- SubqueryAlias t2
: +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+- SubqueryAlias t1
+- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x])
+- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x]
+- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]
+- SubqueryAlias t1
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]


-- !query
set spark.sql.optimizer.decorrelateExistsIn.enabled = false
-- !query analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,95 @@ Project [t1a#x, t1b#x]
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]


-- !query
SELECT t1a, t1b
FROM t1
WHERE t1c = (SELECT t2c
FROM t2
WHERE t1b < t1d
ORDER BY t2c LIMIT 1)
-- !query analysis
Project [t1a#x, t1b#x]
+- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Sort [t2c#x ASC NULLS FIRST], true
: +- Project [t2c#x]
: +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
: +- SubqueryAlias t2
: +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x])
: +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
: +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
: +- SubqueryAlias t2
: +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+- SubqueryAlias t1
+- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x])
+- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x]
+- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]
+- SubqueryAlias t1
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]


-- !query
SELECT t1a, t1b
FROM t1
WHERE t1c = (SELECT MAX(t2c)
FROM t2
WHERE t1b < t1d
ORDER BY min(t2c) LIMIT 1)
-- !query analysis
Project [t1a#x, t1b#x]
+- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Project [max(t2c)#x]
: +- Sort [min(t2c#x)#x ASC NULLS FIRST], true
: +- Aggregate [max(t2c#x) AS max(t2c)#x, min(t2c#x) AS min(t2c#x)#x]
: +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
: +- SubqueryAlias t2
: +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x])
: +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
: +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
: +- SubqueryAlias t2
: +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+- SubqueryAlias t1
+- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x])
+- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x]
+- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]
+- SubqueryAlias t1
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]


-- !query
SELECT t1a, t1b
FROM t1
WHERE t1c = (SELECT DISTINCT t2c
FROM t2
WHERE t1b < t1d
ORDER BY t2c LIMIT 1)
-- !query analysis
Project [t1a#x, t1b#x]
+- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Sort [t2c#x ASC NULLS FIRST], true
: +- Distinct
: +- Project [t2c#x]
: +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
: +- SubqueryAlias t2
: +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x])
: +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
: +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
: +- SubqueryAlias t2
: +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+- SubqueryAlias t1
+- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x])
+- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x]
+- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]
+- SubqueryAlias t1
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x]


-- !query
CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
-- !query analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,24 @@ WHERE EXISTS (SELECT max(dept.dept_id)
GROUP BY state
LIMIT 1);

-- SPARK-46526: LIMIT over correlated predicate that references only the outer table.
SELECT *
FROM emp
WHERE EXISTS (SELECT max(dept.dept_id)
FROM dept
WHERE emp.salary > 200
LIMIT 1);

-- SPARK-46526: LIMIT over correlated predicate that references only the outer table,
-- and a group by.
SELECT *
FROM emp
WHERE EXISTS (SELECT state, max(dept.dept_name)
FROM dept
WHERE emp.salary > 200
GROUP BY state
LIMIT 1);

-- limit and offset in the not exists subquery block.
-- TC.03.03
SELECT *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,27 @@ GROUP BY t1b
ORDER BY t1b NULLS last
OFFSET 1;

-- SPARK-46526: LIMIT over correlated predicate that references only the outer table.
SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT t2d
FROM t2
WHERE t1a IS NOT NULL
LIMIT 10);

SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT MAX(t2d)
FROM t2
WHERE t1a IS NOT NULL
LIMIT 10);

SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT DISTINCT t2d
FROM t2
WHERE t1a IS NOT NULL
LIMIT 10);

set spark.sql.optimizer.decorrelateExistsIn.enabled = false;
-- LIMIT is not supported in correlated IN, unless the DECORRELATE_EXISTS_AND_IN_SUBQUERIES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,28 @@ WHERE t1c = (SELECT t2c
WHERE t2c = t1c
ORDER BY t2c LIMIT 1);

-- SPARK-46526: LIMIT over correlated predicate that references only the outer table.
SELECT t1a, t1b
FROM t1
WHERE t1c = (SELECT t2c
FROM t2
WHERE t1b < t1d
ORDER BY t2c LIMIT 1);

SELECT t1a, t1b
FROM t1
WHERE t1c = (SELECT MAX(t2c)
FROM t2
WHERE t1b < t1d
ORDER BY min(t2c) LIMIT 1);

SELECT t1a, t1b
FROM t1
WHERE t1c = (SELECT DISTINCT t2c
FROM t2
WHERE t1b < t1d
ORDER BY t2c LIMIT 1);

-- Set operations in correlation path

CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0);
Expand Down
Loading

0 comments on commit dc73a8d

Please sign in to comment.