Skip to content

Commit

Permalink
[SPARK-36280][SQL] Remove redundant aliases after RewritePredicateSub…
Browse files Browse the repository at this point in the history
…query

### What changes were proposed in this pull request?

Remove redundant aliases after `RewritePredicateSubquery`. For example:
```scala
sql("CREATE TABLE t1 USING parquet AS SELECT id AS a, id AS b, id AS c FROM range(10)")
sql("CREATE TABLE t2 USING parquet AS SELECT id AS x, id AS y FROM range(8)")
sql(
  """
    |SELECT *
    |FROM  t1
    |WHERE  a IN (SELECT x
    |  FROM  (SELECT x AS x,
    |           Rank() OVER (partition BY x ORDER BY Sum(y) DESC) AS ranking
    |    FROM   t2
    |    GROUP  BY x) tmp1
    |  WHERE  ranking <= 5)
    |""".stripMargin).explain
```
Before this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [a#10L], [x#7L], LeftSemi, BuildRight, false
   :- FileScan parquet default.t1[a#10L,b#11L,c#12L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=Kyligence#68]
      +- Project [x#7L]
         +- Filter (ranking#8 <= 5)
            +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST]
               +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0
                  +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#62]
                     +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)])
                        +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#59]
                           +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)])
                              +- FileScan parquet default.t2[x#15L,y#16L]
```

After this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [a#10L], [x#15L], LeftSemi, BuildRight, false
   :- FileScan parquet default.t1[a#10L,b#11L,c#12L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=Kyligence#67]
      +- Project [x#15L]
         +- Filter (ranking#8 <= 5)
            +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST]
               +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0
                  +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)])
                     +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#59]
                        +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)])
                           +- FileScan parquet default.t2[x#15L,y#16L]
```

### Why are the changes needed?

Reduce shuffle to improve query performance. This change can benefit TPC-DS q70.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#33509 from wangyum/SPARK-36280.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
wangyum authored and dongjoon-hyun committed Aug 3, 2021
1 parent 67cbc93 commit 4a6afb4
Show file tree
Hide file tree
Showing 26 changed files with 2,394 additions and 2,460 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
RewritePredicateSubquery,
ColumnPruning,
CollapseProject,
RemoveRedundantAliases,
RemoveNoopOperators) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) :+
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -100,35 +100,34 @@ TakeOrderedAndProject [customer_preferred_cust_flag]
InputAdapter
Exchange [customer_id] #10
WholeStageCodegen (24)
Project [customer_id,year_total]
Filter [year_total]
HashAggregate [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year,sum] [sum(UnscaledValue(CheckOverflow((promote_precision(cast(ws_ext_list_price as decimal(8,2))) - promote_precision(cast(ws_ext_discount_amt as decimal(8,2)))), DecimalType(8,2), true))),customer_id,year_total,sum]
InputAdapter
Exchange [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year] #11
WholeStageCodegen (23)
HashAggregate [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year,ws_ext_list_price,ws_ext_discount_amt] [sum,sum]
Project [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,ws_ext_discount_amt,ws_ext_list_price,d_year]
SortMergeJoin [ws_bill_customer_sk,c_customer_sk]
InputAdapter
WholeStageCodegen (20)
Sort [ws_bill_customer_sk]
InputAdapter
Exchange [ws_bill_customer_sk] #12
WholeStageCodegen (19)
Project [ws_bill_customer_sk,ws_ext_discount_amt,ws_ext_list_price,d_year]
BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
Filter [ws_bill_customer_sk]
ColumnarToRow
InputAdapter
Scan parquet default.web_sales [ws_bill_customer_sk,ws_ext_discount_amt,ws_ext_list_price,ws_sold_date_sk]
ReusedSubquery [d_date_sk] #1
InputAdapter
ReusedExchange [d_date_sk,d_year] #4
InputAdapter
WholeStageCodegen (22)
Sort [c_customer_sk]
InputAdapter
ReusedExchange [c_customer_sk,c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address] #5
Filter [year_total]
HashAggregate [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year,sum] [sum(UnscaledValue(CheckOverflow((promote_precision(cast(ws_ext_list_price as decimal(8,2))) - promote_precision(cast(ws_ext_discount_amt as decimal(8,2)))), DecimalType(8,2), true))),customer_id,year_total,sum]
InputAdapter
Exchange [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year] #11
WholeStageCodegen (23)
HashAggregate [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year,ws_ext_list_price,ws_ext_discount_amt] [sum,sum]
Project [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,ws_ext_discount_amt,ws_ext_list_price,d_year]
SortMergeJoin [ws_bill_customer_sk,c_customer_sk]
InputAdapter
WholeStageCodegen (20)
Sort [ws_bill_customer_sk]
InputAdapter
Exchange [ws_bill_customer_sk] #12
WholeStageCodegen (19)
Project [ws_bill_customer_sk,ws_ext_discount_amt,ws_ext_list_price,d_year]
BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
Filter [ws_bill_customer_sk]
ColumnarToRow
InputAdapter
Scan parquet default.web_sales [ws_bill_customer_sk,ws_ext_discount_amt,ws_ext_list_price,ws_sold_date_sk]
ReusedSubquery [d_date_sk] #1
InputAdapter
ReusedExchange [d_date_sk,d_year] #4
InputAdapter
WholeStageCodegen (22)
Sort [c_customer_sk]
InputAdapter
ReusedExchange [c_customer_sk,c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address] #5
InputAdapter
WholeStageCodegen (34)
Sort [customer_id]
Expand Down
Loading

0 comments on commit 4a6afb4

Please sign in to comment.