From d1eefe6d1e8f2df16507984468e69081a6954fe8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Fri, 28 Feb 2025 21:19:18 +0400 Subject: [PATCH] Change array_compact serialization, delete rust custom impl --- native/core/src/execution/planner.rs | 19 ---------- native/proto/src/proto/expr.proto | 6 ---- .../scala/org/apache/comet/serde/arrays.scala | 36 +++++++++---------- 3 files changed, 18 insertions(+), 43 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b4f4ff5632..f42a9ed192 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -830,25 +830,6 @@ impl PhysicalPlanner { )); Ok(array_has_any_expr) } - ExprStruct::ArrayCompact(expr) => { - let src_array_expr = - self.create_expr(expr.array_expr.as_ref().unwrap(), Arc::clone(&input_schema))?; - let datatype = to_arrow_datatype(expr.item_datatype.as_ref().unwrap()); - - let null_literal_expr: Arc = - Arc::new(Literal::new(ScalarValue::Null.cast_to(&datatype)?)); - let args = vec![Arc::clone(&src_array_expr), null_literal_expr]; - let return_type = src_array_expr.data_type(&input_schema)?; - - let array_compact_expr = Arc::new(ScalarFunctionExpr::new( - "array_compact", - array_remove_all_udf(), - args, - return_type, - )); - - Ok(array_compact_expr) - } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 1a10a9f012..fd928fd8a3 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -89,7 +89,6 @@ message Expr { BinaryExpr array_intersect = 62; ArrayJoin array_join = 63; BinaryExpr arrays_overlap = 64; - ArrayCompact array_compact = 65; } } @@ -424,11 +423,6 @@ message ArrayJoin { Expr null_replacement_expr = 3; } -message ArrayCompact { - Expr array_expr = 1; - DataType item_datatype = 2; -} - message DataType { enum DataTypeId { BOOL = 0; diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 5108ea8493..56d2642645 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -19,11 +19,11 @@ package org.apache.comet.serde -import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression} -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, StructType} +import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression, Literal} +import org.apache.spark.sql.types._ import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto, serializeDataType} +import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto, optExprWithInfo, scalarExprToProtoWithReturnType} import org.apache.comet.shims.CometExprShim object CometArrayRemove extends CometExpressionSerde with CometExprShim { @@ -132,21 +132,21 @@ object CometArrayCompact extends CometExpressionSerde with IncompatExpr { inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { val child = expr.children.head - val elementType = serializeDataType(child.dataType.asInstanceOf[ArrayType].elementType) - val srcExprProto = exprToProto(child, inputs, binding) - if (elementType.isDefined && srcExprProto.isDefined) { - val arrayCompactBuilder = ExprOuterClass.ArrayCompact - .newBuilder() - .setArrayExpr(srcExprProto.get) - .setItemDatatype(elementType.get) - Some( - ExprOuterClass.Expr - .newBuilder() - .setArrayCompact(arrayCompactBuilder) - .build()) - } else { - withInfo(expr, "unsupported arguments for ArrayCompact", expr.children: _*) - None + val elementType = child.dataType.asInstanceOf[ArrayType].elementType + + val arrayExprProto = exprToProto(child, inputs, binding) + val nullLiteralProto = exprToProto(Literal(null, elementType), Seq.empty) + + val arrayCompactScalarExpr = scalarExprToProtoWithReturnType( + "array_remove_all", + ArrayType(elementType = elementType), + arrayExprProto, + nullLiteralProto) + arrayCompactScalarExpr match { + case None => + withInfo(expr, "unsupported arguments for ArrayCompact", expr.children: _*) + None + case expr => expr } } }