-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-36905: [C++] Add support for SparseUnion to selection functions #36906
Changes from 3 commits
cba22d6
2db10e7
851c25f
6262f79
cddf9c8
200fd5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -741,6 +741,63 @@ struct DenseUnionSelectionImpl | |
} | ||
}; | ||
|
||
struct SparseUnionSelectionImpl | ||
: public Selection<SparseUnionSelectionImpl, SparseUnionType> { | ||
using Base = Selection<SparseUnionSelectionImpl, SparseUnionType>; | ||
LIFT_BASE_MEMBERS(); | ||
|
||
TypedBufferBuilder<int8_t> child_id_buffer_builder_; | ||
std::vector<int8_t> type_codes_; | ||
|
||
SparseUnionSelectionImpl(KernelContext* ctx, const ExecSpan& batch, | ||
int64_t output_length, ExecResult* out) | ||
: Base(ctx, batch, output_length, out), | ||
child_id_buffer_builder_(ctx->memory_pool()), | ||
type_codes_(checked_cast<const UnionType&>(*this->values.type).type_codes()) {} | ||
|
||
template <typename Adapter> | ||
Status GenerateOutput() { | ||
SparseUnionArray typed_values(this->values.ToArrayData()); | ||
Adapter adapter(this); | ||
RETURN_NOT_OK(adapter.Generate( | ||
[&](int64_t index) { | ||
int8_t child_id = typed_values.child_id(index); | ||
child_id_buffer_builder_.UnsafeAppend(type_codes_[child_id]); | ||
// TODO(jinshang): We use a naive approach for now: apply take for each child | ||
// array. There is room for optimization because the unselected child arrays can | ||
// have any value at this slot. | ||
return Status::OK(); | ||
}, | ||
[&]() { | ||
int8_t child_id = 0; | ||
child_id_buffer_builder_.UnsafeAppend(type_codes_[child_id]); | ||
return Status::OK(); | ||
})); | ||
return Status::OK(); | ||
} | ||
|
||
Status Init() override { | ||
RETURN_NOT_OK(child_id_buffer_builder_.Reserve(output_length)); | ||
return Status::OK(); | ||
} | ||
|
||
Status Finish() override { | ||
ARROW_ASSIGN_OR_RAISE(auto child_ids_buffer, child_id_buffer_builder_.Finish()); | ||
SparseUnionArray typed_values(this->values.ToArrayData()); | ||
auto num_fields = typed_values.num_fields(); | ||
auto num_rows = child_ids_buffer->size(); | ||
BufferVector buffers{nullptr, std::move(child_ids_buffer)}; | ||
*out = ArrayData(typed_values.type(), num_rows, std::move(buffers), 0); | ||
out->child_data.reserve(num_fields); | ||
for (auto i = 0; i < num_fields; i++) { | ||
ARROW_ASSIGN_OR_RAISE(auto child_datum, | ||
Take(*typed_values.field(i), *this->selection.ToArrayData())); | ||
out->child_data.emplace_back(std::move(child_datum).array()); | ||
} | ||
return Status::OK(); | ||
} | ||
}; | ||
|
||
struct FSLSelectionImpl : public Selection<FSLSelectionImpl, FixedSizeListType> { | ||
Int64Builder child_index_builder; | ||
|
||
|
@@ -863,6 +920,22 @@ Status DenseUnionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResul | |
return FilterExec<DenseUnionSelectionImpl>(ctx, batch, out); | ||
} | ||
|
||
Status SparseUnionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: move this into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved and extracted a |
||
// Transform filter to selection indices and then use Take. | ||
std::shared_ptr<ArrayData> indices; | ||
RETURN_NOT_OK(GetTakeIndices(batch[1].array, | ||
FilterState::Get(ctx).null_selection_behavior, | ||
ctx->memory_pool()) | ||
.Value(&indices)); | ||
|
||
Datum result; | ||
RETURN_NOT_OK(Take(batch[0].array.ToArrayData(), Datum(indices), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we can call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
TakeOptions::NoBoundsCheck(), ctx->exec_context()) | ||
.Value(&result)); | ||
out->value = result.array(); | ||
return Status::OK(); | ||
} | ||
|
||
Status MapFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { | ||
return FilterExec<ListSelectionImpl<MapType>>(ctx, batch, out); | ||
} | ||
|
@@ -909,6 +982,10 @@ Status DenseUnionTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* | |
return TakeExec<DenseUnionSelectionImpl>(ctx, batch, out); | ||
} | ||
|
||
Status SparseUnionTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { | ||
return TakeExec<SparseUnionSelectionImpl>(ctx, batch, out); | ||
} | ||
|
||
Status StructTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { | ||
return TakeExec<StructSelectionImpl>(ctx, batch, out); | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -1680,28 +1680,26 @@ These functions select and return a subset of their input. | |||||
+---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ | ||||||
| Function name | Arity | Input type 1 | Input type 2 | Output type | Options class | Notes | | ||||||
+===============+========+==============+==============+==============+=========================+===========+ | ||||||
| array_filter | Binary | Any | Boolean | Input type 1 | :struct:`FilterOptions` | \(1) \(3) | | ||||||
| array_filter | Binary | Any | Boolean | Input type 1 | :struct:`FilterOptions` | \(2) | | ||||||
+---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ | ||||||
| array_take | Binary | Any | Integer | Input type 1 | :struct:`TakeOptions` | \(1) \(4) | | ||||||
| array_take | Binary | Any | Integer | Input type 1 | :struct:`TakeOptions` | \(3) | | ||||||
+---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ | ||||||
| drop_null | Unary | Any | - | Input type 1 | | \(1) \(2) | | ||||||
| drop_null | Unary | Any | - | Input type 1 | | \(1) | | ||||||
+---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ | ||||||
| filter | Binary | Any | Boolean | Input type 1 | :struct:`FilterOptions` | \(1) \(3) | | ||||||
| filter | Binary | Any | Boolean | Input type 1 | :struct:`FilterOptions` | \(3) | | ||||||
+---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ | ||||||
| take | Binary | Any | Integer | Input type 1 | :struct:`TakeOptions` | \(1) \(4) | | ||||||
| take | Binary | Any | Integer | Input type 1 | :struct:`TakeOptions` | \(4) | | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose this should be
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed, and the previous line was also wrong. |
||||||
+---------------+--------+--------------+--------------+--------------+-------------------------+-----------+ | ||||||
|
||||||
* \(1) Sparse unions are unsupported. | ||||||
|
||||||
* \(2) Each element in the input is appended to the output iff it is non-null. | ||||||
* \(1) Each element in the input is appended to the output iff it is non-null. | ||||||
If the input is a record batch or table, any null value in a column drops | ||||||
the entire row. | ||||||
|
||||||
* \(3) Each element in input 1 (the values) is appended to the output iff | ||||||
* \(2) Each element in input 1 (the values) is appended to the output iff | ||||||
the corresponding element in input 2 (the filter) is true. How | ||||||
nulls in the filter are handled can be configured using FilterOptions. | ||||||
|
||||||
* \(4) For each element *i* in input 2 (the indices), the *i*'th element | ||||||
* \(3) For each element *i* in input 2 (the indices), the *i*'th element | ||||||
in input 1 (the values) is appended to the output. | ||||||
|
||||||
Containment tests | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be doing a pointless back-and-forth between type codes and child ids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed