Skip to content

Commit

Permalink
apacheGH-37378: [C++] Add A Dictionary Compaction Function For Dictio…
Browse files Browse the repository at this point in the history
…naryArray (apache#37418)

### Rationale for this change
A Dictionary Compaction Function for DictionaryArray is supported.

### What changes are included in this PR?
Add a Function for Dictionary Compaction

### Are these changes tested?
Yes

Are there any user-facing changes?
No
* Closes: apache#37378

Lead-authored-by: Junming Chen <[email protected]>
Co-authored-by: Ben Harkins <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
3 people authored Oct 11, 2023
1 parent 1cc0f14 commit 73454b7
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 0 deletions.
114 changes: 114 additions & 0 deletions cpp/src/arrow/array/array_dict.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/api.h"
#include "arrow/datum.h"
#include "arrow/status.h"
#include "arrow/table.h"
Expand Down Expand Up @@ -211,6 +212,106 @@ Result<std::shared_ptr<ArrayData>> TransposeDictIndices(
return out_data;
}

struct CompactTransposeMapVistor {
const std::shared_ptr<ArrayData>& data;
arrow::MemoryPool* pool;
std::unique_ptr<Buffer> output_map;
std::shared_ptr<Array> out_compact_dictionary;

template <typename IndexArrowType>
Status CompactTransposeMapImpl() {
int64_t index_length = data->length;
int64_t dict_length = data->dictionary->length;
if (dict_length == 0) {
output_map = nullptr;
out_compact_dictionary = nullptr;
return Status::OK();
} else if (index_length == 0) {
ARROW_ASSIGN_OR_RAISE(out_compact_dictionary,
MakeEmptyArray(data->dictionary->type, pool));
ARROW_ASSIGN_OR_RAISE(output_map, AllocateBuffer(0, pool))
return Status::OK();
}

using CType = typename IndexArrowType::c_type;
const CType* indices_data = data->GetValues<CType>(1);
std::vector<bool> dict_used(dict_length, false);
CType dict_len = static_cast<CType>(dict_length);
int64_t dict_used_count = 0;
for (int64_t i = 0; i < index_length; i++) {
if (data->IsNull(i)) {
continue;
}

CType current_index = indices_data[i];
if (current_index < 0 || current_index >= dict_len) {
return Status::IndexError(
"Index out of bounds while compacting dictionary array: ", current_index,
"(dictionary is ", dict_length, " long) at position ", i);
}
if (dict_used[current_index]) continue;
dict_used[current_index] = true;
dict_used_count++;

if (dict_used_count == dict_length) {
// The dictionary is already compact, so just return here
output_map = nullptr;
out_compact_dictionary = nullptr;
return Status::OK();
}
}

using BuilderType = NumericBuilder<IndexArrowType>;
using arrow::compute::Take;
using arrow::compute::TakeOptions;
BuilderType dict_indices_builder(pool);
ARROW_RETURN_NOT_OK(dict_indices_builder.Reserve(dict_used_count));
ARROW_ASSIGN_OR_RAISE(output_map,
AllocateBuffer(dict_length * sizeof(int32_t), pool));
auto* output_map_raw = output_map->mutable_data_as<int32_t>();
int32_t current_index = 0;
for (CType i = 0; i < dict_len; i++) {
if (dict_used[i]) {
dict_indices_builder.UnsafeAppend(i);
output_map_raw[i] = current_index;
current_index++;
} else {
output_map_raw[i] = -1;
}
}
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> compacted_dict_indices,
dict_indices_builder.Finish());
ARROW_ASSIGN_OR_RAISE(auto compacted_dict_res,
Take(Datum(data->dictionary), compacted_dict_indices,
TakeOptions::NoBoundsCheck()));
out_compact_dictionary = compacted_dict_res.make_array();
return Status::OK();
}

template <typename Type>
enable_if_integer<Type, Status> Visit(const Type&) {
return CompactTransposeMapImpl<Type>();
}

Status Visit(const DataType& type) {
return Status::TypeError("Expected an Index Type of Int or UInt");
}
};

Result<std::unique_ptr<Buffer>> CompactTransposeMap(
const std::shared_ptr<ArrayData>& data, MemoryPool* pool,
std::shared_ptr<Array>& out_compact_dictionary) {
if (data->type->id() != Type::DICTIONARY) {
return Status::TypeError("Expected dictionary type");
}

const auto& dict_type = checked_cast<const DictionaryType&>(*data->type);
CompactTransposeMapVistor vistor{data, pool, nullptr, nullptr};
RETURN_NOT_OK(VisitTypeInline(*dict_type.index_type(), &vistor));

out_compact_dictionary = vistor.out_compact_dictionary;
return std::move(vistor.output_map);
}
} // namespace

Result<std::shared_ptr<Array>> DictionaryArray::Transpose(
Expand All @@ -222,6 +323,19 @@ Result<std::shared_ptr<Array>> DictionaryArray::Transpose(
return MakeArray(std::move(transposed));
}

Result<std::shared_ptr<Array>> DictionaryArray::Compact(MemoryPool* pool) const {
std::shared_ptr<Array> compact_dictionary;
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> transpose_map,
CompactTransposeMap(this->data_, pool, compact_dictionary));

if (transpose_map == nullptr) {
return std::make_shared<DictionaryArray>(this->data_);
} else {
return this->Transpose(this->type(), compact_dictionary,
transpose_map->data_as<int32_t>(), pool);
}
}

// ----------------------------------------------------------------------
// Dictionary unification

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/array/array_dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class ARROW_EXPORT DictionaryArray : public Array {
const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& dictionary,
const int32_t* transpose_map, MemoryPool* pool = default_memory_pool()) const;

Result<std::shared_ptr<Array>> Compact(MemoryPool* pool = default_memory_pool()) const;

/// \brief Determine whether dictionary arrays may be compared without unification
bool CanCompareIndices(const DictionaryArray& other) const;

Expand Down
71 changes: 71 additions & 0 deletions cpp/src/arrow/array/array_dict_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,77 @@ TEST(TestDictionary, IndicesArray) {
ASSERT_OK(arr->indices()->ValidateFull());
}

void CheckDictionaryCompact(const std::shared_ptr<DataType>& dict_type,
const std::string& input_dictionary_json,
const std::string& input_index_json,
const std::string& expected_dictionary_json,
const std::string& expected_index_json) {
auto input = DictArrayFromJSON(dict_type, input_index_json, input_dictionary_json);
const DictionaryArray& input_ref = checked_cast<const DictionaryArray&>(*input);

auto expected =
DictArrayFromJSON(dict_type, expected_index_json, expected_dictionary_json);

ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> actual, input_ref.Compact());
AssertArraysEqual(*expected, *actual, /*verbose=*/true);
}

TEST(TestDictionary, Compact) {
std::shared_ptr<arrow::DataType> type;
std::shared_ptr<arrow::DataType> dict_type;

for (const auto& index_type : all_dictionary_index_types()) {
ARROW_SCOPED_TRACE("index_type = ", index_type->ToString());

type = boolean();
dict_type = dictionary(index_type, type);

// input is compacted
CheckDictionaryCompact(dict_type, "[]", "[]", "[]", "[]");
CheckDictionaryCompact(dict_type, "[true, false]", "[0, 1, 0]", "[true, false]",
"[0, 1, 0]");
CheckDictionaryCompact(dict_type, "[true, null, false]", "[2, 1, 0]",
"[true, null, false]", "[2, 1, 0]");
CheckDictionaryCompact(dict_type, "[true, false]", "[0, null, 1, 0]", "[true, false]",
"[0, null, 1, 0]");
CheckDictionaryCompact(dict_type, "[true, null, false]", "[2, null, 1, 0]",
"[true, null, false]", "[2, null, 1, 0]");

// input isn't compacted
CheckDictionaryCompact(dict_type, "[null]", "[]", "[]", "[]");
CheckDictionaryCompact(dict_type, "[false]", "[null]", "[]", "[null]");
CheckDictionaryCompact(dict_type, "[true, false]", "[0]", "[true]", "[0]");
CheckDictionaryCompact(dict_type, "[true, false]", "[0, null]", "[true]",
"[0, null]");

// input isn't compacted && its indices needs to be adjusted
CheckDictionaryCompact(dict_type, "[true, null, false]", "[2, 1]", "[null, false]",
"[1, 0]");
CheckDictionaryCompact(dict_type, "[true, null, false]", "[2, null, 1]",
"[null, false]", "[1, null, 0]");

// indices out of bound
auto temp_indices = ArrayFromJSON(index_type, "[8, 0]");
auto temp_dictionary = ArrayFromJSON(type, "[true, false]");
DictionaryArray input(dict_type, temp_indices, temp_dictionary);
ASSERT_RAISES(IndexError, input.Compact());

type = int64();
dict_type = dictionary(index_type, type);

// input isn't compacted && its indices needs to be adjusted
CheckDictionaryCompact(dict_type, "[3, 4, 7, 0, 12, 191, 21, 8]",
"[0, 2, 4, 4, 6, 4, 2, 0, 6]", "[3, 7, 12, 21]",
"[0, 1, 2, 2, 3, 2, 1, 0, 3]");
CheckDictionaryCompact(dict_type, "[3, 4, 7, 0, 12, 191, 21, 8]",
"[4, 6, 7, 7, 6, 4, 6, 6, 6]", "[12, 21, 8]",
"[0, 1, 2, 2, 1, 0, 1, 1, 1]");
CheckDictionaryCompact(dict_type, "[3, 4, 7, 0, 12, 191, 21, 8]",
"[7, 4, 7, 7, 7, 7, 4, 7, 7]", "[12, 8]",
"[1, 0, 1, 1, 1, 1, 0, 1, 1]");
}
}

TEST(TestDictionaryUnifier, Numeric) {
auto dict_ty = int64();

Expand Down

0 comments on commit 73454b7

Please sign in to comment.