Skip to content

Commit

Permalink
[GLUTEN-7971][CH] Support using left side as the build table for the …
Browse files Browse the repository at this point in the history
…left anti/semi join

Now Vanilla Spark does not support the right anti/semi join, but CH backend does. According to the runtime statistics, it can convert the A left anti/semi join B to B right anti/semi join A when AQE is on and the side ot A table is the smaller than B table.

Close #7971.
  • Loading branch information
zzcclp committed Nov 19, 2024
1 parent 90428b9 commit c2a8f00
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
CHConf.prefixOf("delta.metadata.optimize")
val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE: String = "true"

val GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT: String =
CHConf.prefixOf("convert.left.anti_semi.to.right")
val GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT_DEFAULT_VALUE: String = "false"

def affinityMode: String = {
SparkEnv.get.conf
.get(
Expand Down Expand Up @@ -377,6 +381,13 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
true
} else {
t match {
case LeftAnti | LeftSemi
if (SQLConf.get
.getConfString(
GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT,
GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT_DEFAULT_VALUE)
.toBoolean) =>
true
case LeftOuter => true
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,23 @@ object JoinTypeTransform {
} else {
JoinRel.JoinType.JOIN_TYPE_RIGHT
}
case LeftSemi | ExistenceJoin(_) =>
case LeftSemi =>
if (!buildRight) {
throw new IllegalArgumentException("LeftSemi join should not switch children")
JoinRel.JoinType.JOIN_TYPE_RIGHT_SEMI
} else {
JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
}
JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
case LeftAnti =>
if (!buildRight) {
throw new IllegalArgumentException("LeftAnti join should not switch children")
JoinRel.JoinType.JOIN_TYPE_RIGHT_ANTI
} else {
JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
}
case ExistenceJoin(_) =>
if (!buildRight) {
throw new IllegalArgumentException("Existence join should not switch children")
}
JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
case _ =>
// TODO: Support cross join with Cross Rel
JoinRel.JoinType.UNRECOGNIZED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ class GlutenClickHouseTPCDSParquetAQESuite
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
// Currently, it can not support to read multiple partitioned file in one task.
// .set("spark.sql.files.maxPartitionBytes", "134217728")
// .set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.memory.offHeap.size", "4g")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.apache.gluten.execution._

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, ReusedSubqueryExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
Expand All @@ -35,9 +37,6 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
// Currently, it can not support to read multiple partitioned file in one task.
// .set("spark.sql.files.maxPartitionBytes", "134217728")
// .set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.memory.offHeap.size", "4g")
}
Expand Down Expand Up @@ -265,4 +264,78 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
}
})
}

test("GLUTEN-7971: Support using left side as the build table for the left anti/semi join") {
withSQLConf(
("spark.sql.autoBroadcastJoinThreshold", "-1"),
("spark.gluten.sql.columnar.backend.ch.convert.left.anti_semi.to.right", "true")) {
val sql1 =
s"""
|select
| cd_gender,
| cd_marital_status,
| cd_education_status,
| count(*) cnt1
| from
| customer c,customer_address ca,customer_demographics
| where
| c.c_current_addr_sk = ca.ca_address_sk and
| ca_county in ('Walker County','Richland County','Gaines County','Douglas County')
| and cd_demo_sk = c.c_current_cdemo_sk and
| exists (select *
| from store_sales
| where c.c_customer_sk = ss_customer_sk)
| group by cd_gender,
| cd_marital_status,
| cd_education_status
| order by cd_gender,
| cd_marital_status,
| cd_education_status
| LIMIT 100 ;
|""".stripMargin
runQueryAndCompare(sql1)(
df => {
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
case h: CHShuffledHashJoinExecTransformer if h.joinType == LeftSemi => h
}
assertResult(1)(shuffledHashJoinExecs.size)
assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
})

val sql2 =
s"""
|select
| cd_gender,
| cd_marital_status,
| cd_education_status,
| count(*) cnt1
| from
| customer c,customer_address ca,customer_demographics
| where
| c.c_current_addr_sk = ca.ca_address_sk and
| ca_county in ('Walker County','Richland County','Gaines County','Douglas County')
| and cd_demo_sk = c.c_current_cdemo_sk and
| not exists (select *
| from store_sales
| where c.c_customer_sk = ss_customer_sk)
| group by cd_gender,
| cd_marital_status,
| cd_education_status
| order by cd_gender,
| cd_marital_status,
| cd_education_status
| LIMIT 100 ;
|""".stripMargin
runQueryAndCompare(sql2)(
df => {
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
case h: CHShuffledHashJoinExecTransformer if h.joinType == LeftAnti => h
}
assertResult(1)(shuffledHashJoinExecs.size)
assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite extends GlutenClickHouseT
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
// Currently, it can not support to read multiple partitioned file in one task.
// .set("spark.sql.files.maxPartitionBytes", "134217728")
// .set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.memory.offHeap.size", "4g")
// .set("spark.sql.planChangeLog.level", "error")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
// Currently, it can not support to read multiple partitioned file in one task.
// .set("spark.sql.files.maxPartitionBytes", "134217728")
// .set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.memory.offHeap.size", "4g")
.set("spark.gluten.sql.validation.logLevel", "ERROR")
.set("spark.gluten.sql.validation.printStackOnFailure", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.execution._

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper}

Expand Down Expand Up @@ -213,6 +214,23 @@ class GlutenClickHouseTPCHParquetAQESuite
runTPCHQuery(21) { df => }
}

test(
"TPCH Q21 with GLUTEN-7971: Support using left side as the build table for the left anti/semi join") {
withSQLConf(
("spark.sql.autoBroadcastJoinThreshold", "-1"),
("spark.gluten.sql.columnar.backend.ch.convert.left.anti_semi.to.right", "true")) {
runTPCHQuery(21, compareResult = false) {
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
case h: CHShuffledHashJoinExecTransformer if h.joinType == LeftSemi => h
}
assertResult(1)(shuffledHashJoinExecs.size)
assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
}
}
}

test("TPCH Q22") {
runTPCHQuery(22) {
df =>
Expand Down
4 changes: 4 additions & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,12 @@ JoinUtil::getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool i
return {DB::JoinKind::Left, DB::JoinStrictness::Any};
return {DB::JoinKind::Left, DB::JoinStrictness::Semi};
}
case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
return {DB::JoinKind::Right, DB::JoinStrictness::Semi};
case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_ANTI:
return {DB::JoinKind::Left, DB::JoinStrictness::Anti};
case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT_ANTI:
return {DB::JoinKind::Right, DB::JoinStrictness::Anti};
case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
return {DB::JoinKind::Left, DB::JoinStrictness::All};
case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT:
Expand Down
4 changes: 4 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPla
if (root_rel.root().input().has_write())
addSinkTransform(parser_context->queryContext(), root_rel.root().input().write(), builder);
LOG_INFO(getLogger("SerializedPlanParser"), "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0);
LOG_DEBUG(
getLogger("SerializedPlanParser"),
"clickhouse plan \n{}",
PlanUtil::explainPlan(*query_plan));

auto config = ExecutorConfig::loadFromContext(parser_context->queryContext());
return std::make_unique<LocalExecutor>(std::move(query_plan), std::move(builder), config.dump_pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ object JoinUtils {
case _: ExistenceJoin =>
inputBuildOutput.indices.map(ExpressionBuilder.makeSelection(_)) :+
ExpressionBuilder.makeSelection(buildOutput.size)
case LeftSemi | LeftAnti =>
// When the left semi/anti join support the BuildLeft
leftOutput.indices.map(idx => ExpressionBuilder.makeSelection(idx + streamedOutput.size))
case LeftExistence(_) =>
leftOutput.indices.map(ExpressionBuilder.makeSelection(_))
case _ =>
Expand Down

0 comments on commit c2a8f00

Please sign in to comment.