Skip to content

Commit

Permalink
use struct approach
Browse files Browse the repository at this point in the history
  • Loading branch information
js8544 committed Aug 7, 2023
1 parent 2db10e7 commit 851c25f
Showing 1 changed file with 18 additions and 22 deletions.
40 changes: 18 additions & 22 deletions cpp/src/arrow/compute/kernels/vector_selection_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -748,18 +748,12 @@ struct SparseUnionSelectionImpl

TypedBufferBuilder<int8_t> child_id_buffer_builder_;
std::vector<int8_t> type_codes_;
std::vector<Int64Builder> child_indices_builders_;

SparseUnionSelectionImpl(KernelContext* ctx, const ExecSpan& batch,
int64_t output_length, ExecResult* out)
: Base(ctx, batch, output_length, out),
child_id_buffer_builder_(ctx->memory_pool()),
type_codes_(checked_cast<const UnionType&>(*this->values.type).type_codes()),
child_indices_builders_(type_codes_.size()) {
for (auto& child_indices_builder : child_indices_builders_) {
child_indices_builder = Int64Builder(ctx->memory_pool());
}
}
type_codes_(checked_cast<const UnionType&>(*this->values.type).type_codes()) {}

template <typename Adapter>
Status GenerateOutput() {
Expand All @@ -772,27 +766,18 @@ struct SparseUnionSelectionImpl
// TODO(jinshang): We use a naive approach for now: apply take for each child
// array. There is room for optimization because the unselected child arrays can
// have any value at this slot.
for (auto& child_indices_builder : child_indices_builders_) {
child_indices_builder.UnsafeAppend(index);
}
return Status::OK();
},
[&]() {
int8_t child_id = 0;
child_id_buffer_builder_.UnsafeAppend(type_codes_[child_id]);
for (auto& child_indices_builder : child_indices_builders_) {
child_indices_builder.UnsafeAppendNull();
}
return Status::OK();
}));
return Status::OK();
}

Status Init() override {
RETURN_NOT_OK(child_id_buffer_builder_.Reserve(output_length));
for (auto& child_index_builder : child_indices_builders_) {
RETURN_NOT_OK(child_index_builder.Reserve(output_length));
}
return Status::OK();
}

Expand All @@ -803,12 +788,11 @@ struct SparseUnionSelectionImpl
auto num_rows = child_ids_buffer->size();
BufferVector buffers{nullptr, std::move(child_ids_buffer)};
*out = ArrayData(typed_values.type(), num_rows, std::move(buffers), 0);
out->child_data.reserve(num_fields);
for (auto i = 0; i < num_fields; i++) {
ARROW_ASSIGN_OR_RAISE(auto child_indices_array,
child_indices_builders_[i].Finish());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> child_array,
Take(*typed_values.field(i), *child_indices_array));
out->child_data.push_back(child_array->data());
ARROW_ASSIGN_OR_RAISE(auto child_datum,
Take(*typed_values.field(i), *this->selection.ToArrayData()));
out->child_data.emplace_back(std::move(child_datum).array());
}
return Status::OK();
}
Expand Down Expand Up @@ -937,7 +921,19 @@ Status DenseUnionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResul
}

Status SparseUnionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
return FilterExec<SparseUnionSelectionImpl>(ctx, batch, out);
// Transform filter to selection indices and then use Take.
std::shared_ptr<ArrayData> indices;
RETURN_NOT_OK(GetTakeIndices(batch[1].array,
FilterState::Get(ctx).null_selection_behavior,
ctx->memory_pool())
.Value(&indices));

Datum result;
RETURN_NOT_OK(Take(batch[0].array.ToArrayData(), Datum(indices),
TakeOptions::NoBoundsCheck(), ctx->exec_context())
.Value(&result));
out->value = result.array();
return Status::OK();
}

Status MapFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
Expand Down

0 comments on commit 851c25f

Please sign in to comment.