Skip to content

Commit

Permalink
Change array_compact serialization, delete rust custom impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Kazantsev Maksim committed Feb 28, 2025
1 parent c7b2488 commit d1eefe6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 43 deletions.
19 changes: 0 additions & 19 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,25 +830,6 @@ impl PhysicalPlanner {
));
Ok(array_has_any_expr)
}
ExprStruct::ArrayCompact(expr) => {
let src_array_expr =
self.create_expr(expr.array_expr.as_ref().unwrap(), Arc::clone(&input_schema))?;
let datatype = to_arrow_datatype(expr.item_datatype.as_ref().unwrap());

let null_literal_expr: Arc<dyn PhysicalExpr> =
Arc::new(Literal::new(ScalarValue::Null.cast_to(&datatype)?));
let args = vec![Arc::clone(&src_array_expr), null_literal_expr];
let return_type = src_array_expr.data_type(&input_schema)?;

let array_compact_expr = Arc::new(ScalarFunctionExpr::new(
"array_compact",
array_remove_all_udf(),
args,
return_type,
));

Ok(array_compact_expr)
}
expr => Err(ExecutionError::GeneralError(format!(
"Not implemented: {:?}",
expr
Expand Down
6 changes: 0 additions & 6 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ message Expr {
BinaryExpr array_intersect = 62;
ArrayJoin array_join = 63;
BinaryExpr arrays_overlap = 64;
ArrayCompact array_compact = 65;
}
}

Expand Down Expand Up @@ -424,11 +423,6 @@ message ArrayJoin {
Expr null_replacement_expr = 3;
}

message ArrayCompact {
Expr array_expr = 1;
DataType item_datatype = 2;
}

message DataType {
enum DataTypeId {
BOOL = 0;
Expand Down
36 changes: 18 additions & 18 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression}
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, StructType}
import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression, Literal}
import org.apache.spark.sql.types._

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto, serializeDataType}
import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto, optExprWithInfo, scalarExprToProtoWithReturnType}
import org.apache.comet.shims.CometExprShim

object CometArrayRemove extends CometExpressionSerde with CometExprShim {
Expand Down Expand Up @@ -132,21 +132,21 @@ object CometArrayCompact extends CometExpressionSerde with IncompatExpr {
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val child = expr.children.head
val elementType = serializeDataType(child.dataType.asInstanceOf[ArrayType].elementType)
val srcExprProto = exprToProto(child, inputs, binding)
if (elementType.isDefined && srcExprProto.isDefined) {
val arrayCompactBuilder = ExprOuterClass.ArrayCompact
.newBuilder()
.setArrayExpr(srcExprProto.get)
.setItemDatatype(elementType.get)
Some(
ExprOuterClass.Expr
.newBuilder()
.setArrayCompact(arrayCompactBuilder)
.build())
} else {
withInfo(expr, "unsupported arguments for ArrayCompact", expr.children: _*)
None
val elementType = child.dataType.asInstanceOf[ArrayType].elementType

val arrayExprProto = exprToProto(child, inputs, binding)
val nullLiteralProto = exprToProto(Literal(null, elementType), Seq.empty)

val arrayCompactScalarExpr = scalarExprToProtoWithReturnType(
"array_remove_all",
ArrayType(elementType = elementType),
arrayExprProto,
nullLiteralProto)
arrayCompactScalarExpr match {
case None =>
withInfo(expr, "unsupported arguments for ArrayCompact", expr.children: _*)
None
case expr => expr
}
}
}
Expand Down

0 comments on commit d1eefe6

Please sign in to comment.