Skip to content

Commit

Permalink
upgrade to DF 43
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 14, 2024
1 parent 546a4c0 commit 64e46c1
Show file tree
Hide file tree
Showing 22 changed files with 247 additions and 235 deletions.
186 changes: 101 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ rust-version = "1.62"
build = "build.rs"

[dependencies]
datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "42.0.0"
datafusion = { version = "43.0", features = ["pyarrow", "avro"] }
datafusion-proto = "43.0"
futures = "0.3"
glob = "0.3.1"
log = "0.4"
Expand Down
6 changes: 1 addition & 5 deletions src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ mod test {
do_test(6).await
}


#[tokio::test]
async fn test_q7() -> TestResult<()> {
do_test(7).await
Expand All @@ -302,7 +301,6 @@ mod test {
do_test(11).await
}


#[tokio::test]
async fn test_q12() -> TestResult<()> {
do_test(12).await
Expand All @@ -324,7 +322,6 @@ mod test {
do_test(15).await
}


#[tokio::test]
async fn test_q16() -> TestResult<()> {
do_test(16).await
Expand All @@ -340,7 +337,6 @@ mod test {
do_test(18).await
}


#[tokio::test]
async fn test_q19() -> TestResult<()> {
do_test(19).await
Expand Down Expand Up @@ -375,7 +371,7 @@ mod test {
];
for table in tables {
ctx.register_parquet(
table,
*table,
&format!("{data_path}/{table}.parquet"),
ParquetReadOptions::default(),
)
Expand Down
8 changes: 4 additions & 4 deletions testdata/expected-plans/q1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST
DataFusion Physical Plan
========================

SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true]
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST]
SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
CoalesceBatchesExec: target_batch_size=8192
Expand All @@ -36,13 +36,13 @@ ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "l_return

Query Stage #1 (2 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true]
SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))

Query Stage #2 (2 -> 1):
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST]
ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))

4 changes: 2 additions & 2 deletions testdata/expected-plans/q10.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ Sort: revenue DESC NULLS FIRST, fetch=20
Filter: orders.o_orderdate >= Date32("1993-07-01") AND orders.o_orderdate < Date32("1993-10-01")
TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate], partial_filters=[orders.o_orderdate >= Date32("1993-07-01"), orders.o_orderdate < Date32("1993-10-01")]
Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount
Filter: lineitem.l_returnflag = Utf8("R")
TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = Utf8("R")]
Filter: lineitem.l_returnflag = Utf8View("R")
TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = Utf8View("R")]
TableScan: nation projection=[n_nationkey, n_name]

DataFusion Physical Plan
Expand Down
20 changes: 10 additions & 10 deletions testdata/expected-plans/q11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Sort: value DESC NULLS FIRST
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Projection: nation.n_nationkey
Filter: nation.n_name = Utf8("ALGERIA")
TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("ALGERIA")]
Filter: nation.n_name = Utf8View("ALGERIA")
TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8View("ALGERIA")]
SubqueryAlias: __scalar_sq_1
Projection: CAST(CAST(sum(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15))
Aggregate: groupBy=[[]], aggr=[[sum(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
Expand All @@ -24,8 +24,8 @@ Sort: value DESC NULLS FIRST
TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Projection: nation.n_nationkey
Filter: nation.n_name = Utf8("ALGERIA")
TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("ALGERIA")]
Filter: nation.n_name = Utf8View("ALGERIA")
TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8View("ALGERIA")]

DataFusion Physical Plan
========================
Expand All @@ -42,9 +42,9 @@ SortPreservingMergeExec: [value@1 DESC]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(n_nationkey@0, s_nationkey@2)], projection=[ps_availqty@1, ps_supplycost@2]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_nationkey@0], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
CoalesceBatchesExec: target_batch_size=8192
FilterExec: n_name@1 = ALGERIA, projection=[n_nationkey@0]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: n_name@1 = ALGERIA, projection=[n_nationkey@0]
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END, required_guarantees=[n_name in (ALGERIA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([s_nationkey@2], 2), input_partitions=2
Expand All @@ -66,9 +66,9 @@ SortPreservingMergeExec: [value@1 DESC]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(n_nationkey@0, s_nationkey@3)], projection=[ps_partkey@1, ps_availqty@2, ps_supplycost@3]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_nationkey@0], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
CoalesceBatchesExec: target_batch_size=8192
FilterExec: n_name@1 = ALGERIA, projection=[n_nationkey@0]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: n_name@1 = ALGERIA, projection=[n_nationkey@0]
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END, required_guarantees=[n_name in (ALGERIA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([s_nationkey@3], 2), input_partitions=2
Expand Down
10 changes: 5 additions & 5 deletions testdata/expected-plans/q12.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ DataFusion Logical Plan

Sort: lineitem.l_shipmode ASC NULLS LAST
Projection: lineitem.l_shipmode, sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count
Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN orders.o_orderpriority = Utf8View("1-URGENT") OR orders.o_orderpriority = Utf8View("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN orders.o_orderpriority != Utf8View("1-URGENT") AND orders.o_orderpriority != Utf8View("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
Projection: orders.o_orderpriority, lineitem.l_shipmode
Inner Join: orders.o_orderkey = lineitem.l_orderkey
TableScan: orders projection=[o_orderkey, o_orderpriority]
Projection: lineitem.l_orderkey, lineitem.l_shipmode
Filter: (lineitem.l_shipmode = Utf8("FOB") OR lineitem.l_shipmode = Utf8("SHIP")) AND lineitem.l_receiptdate > lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND lineitem.l_receiptdate >= Date32("1995-01-01") AND lineitem.l_receiptdate < Date32("1996-01-01")
TableScan: lineitem projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8("FOB") OR lineitem.l_shipmode = Utf8("SHIP"), lineitem.l_receiptdate > lineitem.l_commitdate, lineitem.l_shipdate < lineitem.l_commitdate, lineitem.l_receiptdate >= Date32("1995-01-01"), lineitem.l_receiptdate < Date32("1996-01-01")]
Filter: (lineitem.l_shipmode = Utf8View("FOB") OR lineitem.l_shipmode = Utf8View("SHIP")) AND lineitem.l_receiptdate > lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND lineitem.l_receiptdate >= Date32("1995-01-01") AND lineitem.l_receiptdate < Date32("1996-01-01")
TableScan: lineitem projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8View("FOB") OR lineitem.l_shipmode = Utf8View("SHIP"), lineitem.l_receiptdate > lineitem.l_commitdate, lineitem.l_shipdate < lineitem.l_commitdate, lineitem.l_receiptdate >= Date32("1995-01-01"), lineitem.l_receiptdate < Date32("1996-01-01")]

DataFusion Physical Plan
========================
Expand All @@ -28,7 +28,7 @@ SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
RepartitionExec: partitioning=Hash([l_orderkey@0], 2), input_partitions=2
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (l_shipmode@4 = FOB OR l_shipmode@4 = SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 1995-01-01 AND l_receiptdate@3 < 1996-01-01, projection=[l_orderkey@0, l_shipmode@4]
ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END OR CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= SHIP AND SHIP <= l_shipmode_max@1 END) AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_max@4 >= 1995-01-01 END AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_min@7 < 1996-01-01 END, required_guarantees=[l_shipmode in (SHIP, FOB)]
ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END OR CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= SHIP AND SHIP <= l_shipmode_max@1 END) AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_max@4 >= 1995-01-01 END AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_min@7 < 1996-01-01 END, required_guarantees=[l_shipmode in (FOB, SHIP)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([o_orderkey@0], 2), input_partitions=2
ParquetExec: file_groups={ ... }, projection=[o_orderkey, o_orderpriority]
Expand All @@ -40,7 +40,7 @@ Query Stage #0 (2 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (l_shipmode@4 = FOB OR l_shipmode@4 = SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 1995-01-01 AND l_receiptdate@3 < 1996-01-01, projection=[l_orderkey@0, l_shipmode@4]
ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END OR CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= SHIP AND SHIP <= l_shipmode_max@1 END) AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_max@4 >= 1995-01-01 END AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_min@7 < 1996-01-01 END, required_guarantees=[l_shipmode in (SHIP, FOB)]
ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END OR CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= SHIP AND SHIP <= l_shipmode_max@1 END) AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_max@4 >= 1995-01-01 END AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_min@7 < 1996-01-01 END, required_guarantees=[l_shipmode in (FOB, SHIP)]

Query Stage #1 (2 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2))
Expand Down
12 changes: 6 additions & 6 deletions testdata/expected-plans/q13.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST
Left Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey]
Projection: orders.o_orderkey, orders.o_custkey
Filter: orders.o_comment NOT LIKE Utf8("%express%requests%")
TableScan: orders projection=[o_orderkey, o_custkey, o_comment], partial_filters=[orders.o_comment NOT LIKE Utf8("%express%requests%")]
Filter: orders.o_comment NOT LIKE Utf8View("%express%requests%")
TableScan: orders projection=[o_orderkey, o_custkey, o_comment], partial_filters=[orders.o_comment NOT LIKE Utf8View("%express%requests%")]

DataFusion Physical Plan
========================

SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC]
SortExec: expr=[custdist@1 DESC,c_count@0 DESC], preserve_partitioning=[true]
SortPreservingMergeExec: [custdist@1 DESC, c_count@0 DESC]
SortExec: expr=[custdist@1 DESC, c_count@0 DESC], preserve_partitioning=[true]
ProjectionExec: expr=[c_count@0 as c_count, count(*)@1 as custdist]
AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count], aggr=[count(*)]
CoalesceBatchesExec: target_batch_size=8192
Expand Down Expand Up @@ -64,13 +64,13 @@ ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "c_count"

Query Stage #3 (2 -> 2):
ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "c_count", index: 0 }], 2))
SortExec: expr=[custdist@1 DESC,c_count@0 DESC], preserve_partitioning=[true]
SortExec: expr=[custdist@1 DESC, c_count@0 DESC], preserve_partitioning=[true]
ProjectionExec: expr=[c_count@0 as c_count, count(*)@1 as custdist]
AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count], aggr=[count(*)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "c_count", index: 0 }], 2))

Query Stage #4 (2 -> 1):
SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC]
SortPreservingMergeExec: [custdist@1 DESC, c_count@0 DESC]
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "c_count", index: 0 }], 2))

Loading

0 comments on commit 64e46c1

Please sign in to comment.