Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32863: [C++][Parquet] Add DELTA_BYTE_ARRAY encoder to Parquet writer #14341

Merged
merged 78 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
c35b86c
Initial commit
rok Mar 1, 2023
0eeea96
Adding PutBinaryArray
rok Mar 1, 2023
450b0a6
Add FIXED_LEN_BYTE_ARRAY
rok Mar 8, 2023
ad7b35f
More FLBAType work
rok Mar 21, 2023
166ecf9
Review feedback
rok Mar 21, 2023
d365c7d
Review feedback
rok Mar 22, 2023
c4e2226
DeltaByteArrayDecoderImpl
rok Mar 22, 2023
d1fbd21
Add Python test for FLBA and boolean with RLE
rok Mar 23, 2023
ad63efc
Review feedback
rok Mar 23, 2023
665048f
Work
rok Mar 23, 2023
4c9b90f
Review feedback
rok Mar 24, 2023
23d663c
Review feedback
rok Mar 25, 2023
c6408f5
Refactoring
rok Mar 25, 2023
54053f9
Apply suggestions from code review
rok Mar 27, 2023
e88c838
Linting and adding a python flba test
rok Mar 27, 2023
c74e3f2
Update cpp/src/parquet/encoding.cc
rok Mar 31, 2023
8994bf6
Review feedback
rok Mar 31, 2023
925f1f7
CheckDecode
rok Mar 31, 2023
800c3f8
Work
rok Mar 31, 2023
10cadb1
Review feedback
rok Apr 4, 2023
597f567
Review feedback
rok Apr 4, 2023
74fbbdc
Review feedback
rok Apr 5, 2023
703b8b6
Work
rok Apr 5, 2023
6949b98
Review feedback
rok Apr 6, 2023
ea67049
Work
rok Apr 6, 2023
9c31398
Change to zero length suffix
rok Apr 6, 2023
0d5140c
Work
rok Apr 6, 2023
d2bfd7f
Review feedback
rok Apr 7, 2023
4d1debf
Review feedback
rok Apr 10, 2023
37e5436
Review feedback
rok Apr 11, 2023
58c89bc
Review feedback
rok Apr 11, 2023
562edd8
Change exception message.
rok May 2, 2023
01f8f94
Apply suggestions from code review
rok May 16, 2023
e6cd16b
Review feedback
rok May 16, 2023
c07c865
Chunk prefix lengths
rok May 17, 2023
ca3660d
Update cpp/src/parquet/encoding.cc
rok May 17, 2023
6951e03
Review feedback
rok May 17, 2023
02fe560
Change data distribution
rok May 17, 2023
0f49067
Refactor Put
rok May 19, 2023
ad90f19
Batch suffixes
rok May 19, 2023
9fdb3e6
Work
rok May 19, 2023
d78f3a9
Linting
rok May 19, 2023
7c02878
Rename length
rok May 19, 2023
a5be621
Update cpp/src/parquet/encoding.cc
rok May 26, 2023
e876222
ExecuteSpaced should use alternative InitData
rok Jun 3, 2023
744520c
Random data generator
rok Jun 16, 2023
33d5111
Concatenation probability
rok Jun 27, 2023
9ba2e0e
Update encoding_test.cc
rok Jun 27, 2023
bab955b
Change random strning generation
rok Jun 29, 2023
a3b3d0c
test
rok Jun 30, 2023
1867347
Minor change
rok Jul 3, 2023
bb54a87
Refactor DeltaByteArrayEncodingDirectPut
rok Jul 8, 2023
73316cf
Update cpp/src/parquet/encoding.cc
rok Jul 8, 2023
ba4538b
Review feedback
rok Jul 8, 2023
bc7fcde
Review feedback
rok Jul 8, 2023
3dc32b3
Apply suggestions from code review
rok Jul 8, 2023
51ff60a
Repeats
rok Jul 8, 2023
1d2fa5f
Enable DirectPut tests
rok Jul 9, 2023
b53e84e
GeneratePrefixedData
rok Jul 9, 2023
8911b5e
Work
rok Jul 11, 2023
2538ab8
Enable DeltaByteArrayEncodingDirectPut sans FLBAType
rok Jul 12, 2023
c19a528
Refactoring DeltaByteArrayEncodingDirectPut
rok Jul 12, 2023
74da1b7
random_byte_array etc
rok Jul 12, 2023
92b457a
Review feedback
rok Jul 14, 2023
e4b96aa
Apply suggestions from code review
rok Jul 20, 2023
45a6d51
Review feedback
rok Jul 20, 2023
1e10149
Review feedback
rok Jul 20, 2023
6c6fbde
Review feedback
rok Jul 20, 2023
6930a79
Review feedback
rok Jul 21, 2023
409a6ee
Review feedback
rok Jul 21, 2023
7dc32e1
Apply suggestions from code review
rok Aug 7, 2023
578d7de
Switch to resizable buffer
rok Aug 7, 2023
8167771
Apply suggestions from code review
rok Aug 17, 2023
0aaa6b4
Review feedback
rok Aug 17, 2023
9f0cdbb
Review feedback
rok Aug 17, 2023
46aa303
Review feedback
rok Aug 17, 2023
acc40ed
Review feedback
rok Aug 21, 2023
5039cf9
Rewrite Binary/FLBA specializations to correctly account for Accumula…
pitrou Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
495 changes: 422 additions & 73 deletions cpp/src/parquet/encoding.cc

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cpp/src/parquet/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ struct EncodingTraits<ByteArrayType> {
using Encoder = ByteArrayEncoder;
using Decoder = ByteArrayDecoder;

using ArrowType = ::arrow::BinaryType;
/// \brief Internal helper class for decoding BYTE_ARRAY data where we can
/// overflow the capacity of a single arrow::BinaryArray
struct Accumulator {
std::unique_ptr<::arrow::BinaryBuilder> builder;
std::vector<std::shared_ptr<::arrow::Array>> chunks;
};
using ArrowType = ::arrow::BinaryType;
using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>;
};

Expand Down
298 changes: 286 additions & 12 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/endian.h"
#include "arrow/util/span.h"
#include "arrow/util/string.h"
#include "parquet/encoding.h"
#include "parquet/platform.h"
Expand Down Expand Up @@ -181,7 +182,7 @@ class TestEncodingBase : public ::testing::Test {

void TearDown() {}

void InitData(int nvalues, int repeats) {
virtual void InitData(int nvalues, int repeats) {
num_values_ = nvalues * repeats;
input_bytes_.resize(num_values_ * sizeof(c_type));
output_bytes_.resize(num_values_ * sizeof(c_type));
Expand Down Expand Up @@ -1705,11 +1706,13 @@ class TestDeltaLengthByteArrayEncoding : public TestEncodingBase<Type> {
using c_type = typename Type::c_type;
static constexpr int TYPE = Type::type_num;

virtual Encoding::type GetEncoding() { return Encoding::DELTA_LENGTH_BYTE_ARRAY; }

virtual void CheckRoundtrip() {
auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
auto encoding = GetEncoding();
auto encoder = MakeTypedEncoder<Type>(encoding,
/*use_dictionary=*/false, descr_.get());
auto decoder =
MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get());
auto decoder = MakeTypedDecoder<Type>(encoding, descr_.get());

encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();
Expand All @@ -1722,10 +1725,10 @@ class TestDeltaLengthByteArrayEncoding : public TestEncodingBase<Type> {
}

void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) {
auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
auto encoding = GetEncoding();
auto encoder = MakeTypedEncoder<Type>(encoding,
/*use_dictionary=*/false, descr_.get());
auto decoder =
MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get());
auto decoder = MakeTypedDecoder<Type>(encoding, descr_.get());
int null_count = 0;
for (auto i = 0; i < num_values_; i++) {
if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
Expand Down Expand Up @@ -1771,6 +1774,19 @@ std::shared_ptr<Buffer> DeltaEncode(std::vector<int32_t> lengths) {
return encoder->FlushValues();
}

std::shared_ptr<Buffer> DeltaEncode(::arrow::util::span<const int32_t> lengths) {
auto encoder = MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
encoder->Put(lengths.data(), static_cast<int>(lengths.size()));
return encoder->FlushValues();
}

std::shared_ptr<Buffer> DeltaEncode(std::shared_ptr<::arrow::Array>& lengths) {
auto data = ::arrow::internal::checked_pointer_cast<const ::arrow::Int32Array>(lengths);
auto span = ::arrow::util::span<const int32_t>{data->raw_values(),
static_cast<size_t>(lengths->length())};
return DeltaEncode(span);
}

TEST(TestDeltaLengthByteArrayEncoding, AdHocRoundTrip) {
const std::shared_ptr<::arrow::Array> cases[] = {
::arrow::ArrayFromJSON(::arrow::utf8(), R"([])"),
Expand All @@ -1780,10 +1796,10 @@ TEST(TestDeltaLengthByteArrayEncoding, AdHocRoundTrip) {
};

std::string expected_encoded_vals[] = {
DeltaEncode({})->ToString(),
DeltaEncode({3, 2, 0})->ToString() + "abcde",
DeltaEncode({0, 0, 0})->ToString(),
DeltaEncode({0, 3})->ToString() + "xyz",
DeltaEncode(std::vector<int>({}))->ToString(),
DeltaEncode(std::vector<int>({3, 2, 0}))->ToString() + "abcde",
DeltaEncode(std::vector<int>({0, 0, 0}))->ToString(),
DeltaEncode(std::vector<int>({0, 3}))->ToString() + "xyz",
};

auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
Expand Down Expand Up @@ -1894,7 +1910,6 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
ASSERT_EQ(values->length(), result->length());
ASSERT_OK(result->ValidateFull());

auto upcast_result = CastBinaryTypesHelper(result, values->type());
::arrow::AssertArraysEqual(*values, *result);
};

Expand Down Expand Up @@ -1977,4 +1992,263 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
}

// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY encode/decode tests.

template <typename Type>
class TestDeltaByteArrayEncoding : public TestDeltaLengthByteArrayEncoding<Type> {
public:
using c_type = typename Type::c_type;
static constexpr int TYPE = Type::type_num;
static constexpr double prefixed_probability = 0.5;

void InitData(int nvalues, int repeats) override {
num_values_ = nvalues * repeats;
input_bytes_.resize(num_values_ * sizeof(c_type));
output_bytes_.resize(num_values_ * sizeof(c_type));
draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
GeneratePrefixedData<c_type>(nvalues, draws_, &data_buffer_, prefixed_probability);

// add some repeated values
for (int j = 1; j < repeats; ++j) {
for (int i = 0; i < nvalues; ++i) {
draws_[nvalues * j + i] = draws_[i];
}
}
}

Encoding::type GetEncoding() override { return Encoding::DELTA_BYTE_ARRAY; }

protected:
USING_BASE_MEMBERS();
std::vector<uint8_t> input_bytes_;
std::vector<uint8_t> output_bytes_;
};

using TestDeltaByteArrayEncodingTypes = ::testing::Types<ByteArrayType, FLBAType>;
TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);

TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
ASSERT_NO_FATAL_FAILURE(this->Execute(0, /*repeats=*/0));
ASSERT_NO_FATAL_FAILURE(this->Execute(250, 5));
ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 1));
ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
/*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_probability*/
0));
ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
/*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
/*null_probability*/ 0.5));
}

template <typename Type>
class TestDeltaByteArrayEncodingDirectPut : public TestEncodingBase<Type> {
using ArrowType = typename EncodingTraits<Type>::ArrowType;
using Accumulator = typename EncodingTraits<Type>::Accumulator;
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;

public:
std::unique_ptr<TypedEncoder<Type>> encoder =
MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY);
std::unique_ptr<TypedDecoder<Type>> decoder =
MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY);

void CheckDirectPut(std::shared_ptr<::arrow::Array> array);

void CheckRoundtrip() override;

protected:
USING_BASE_MEMBERS();
};

template <>
void TestDeltaByteArrayEncodingDirectPut<ByteArrayType>::CheckDirectPut(
std::shared_ptr<::arrow::Array> array) {
ASSERT_NO_THROW(encoder->Put(*array));
auto buf = encoder->FlushValues();

int num_values = static_cast<int>(array->length() - array->null_count());
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));

Accumulator acc;
acc.builder = std::make_unique<BuilderType>(array->type(), default_memory_pool());

ASSERT_EQ(num_values,
decoder->DecodeArrow(static_cast<int>(array->length()),
static_cast<int>(array->null_count()),
array->null_bitmap_data(), array->offset(), &acc));

ASSERT_EQ(acc.chunks.size(), 0) << "Accumulator shouldn't have overflowed chunks";
ASSERT_OK_AND_ASSIGN(auto result, acc.builder->Finish());
ASSERT_EQ(array->length(), result->length());
ASSERT_OK(result->ValidateFull());

::arrow::AssertArraysEqual(*array, *result);
}

template <>
void TestDeltaByteArrayEncodingDirectPut<FLBAType>::CheckDirectPut(
std::shared_ptr<::arrow::Array> array) {
ASSERT_NO_THROW(encoder->Put(*array));
auto buf = encoder->FlushValues();

int num_values = static_cast<int>(array->length() - array->null_count());
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));

Accumulator acc(array->type(), default_memory_pool());

ASSERT_EQ(num_values,
decoder->DecodeArrow(static_cast<int>(array->length()),
static_cast<int>(array->null_count()),
array->null_bitmap_data(), array->offset(), &acc));

ASSERT_OK_AND_ASSIGN(auto result, acc.Finish());
ASSERT_EQ(array->length(), result->length());
ASSERT_OK(result->ValidateFull());

::arrow::AssertArraysEqual(*array, *result);
}

template <>
void TestDeltaByteArrayEncodingDirectPut<ByteArrayType>::CheckRoundtrip() {
constexpr int64_t kSize = 500;
constexpr int32_t kMinLength = 0;
constexpr int32_t kMaxLength = 10;
constexpr int32_t kNumUnique = 10;
constexpr double kNullProbability = 0.25;
constexpr int kSeed = 42;
::arrow::random::RandomArrayGenerator rag{kSeed};
std::shared_ptr<::arrow::Array> values = rag.BinaryWithRepeats(
/*size=*/1, /*unique=*/1, kMinLength, kMaxLength, kNullProbability);
CheckDirectPut(values);

for (int i = 0; i < 10; ++i) {
values = rag.BinaryWithRepeats(kSize, kNumUnique, kMinLength, kMaxLength,
kNullProbability);
CheckDirectPut(values);
}
}

template <>
void TestDeltaByteArrayEncodingDirectPut<FLBAType>::CheckRoundtrip() {
constexpr int64_t kSize = 50;
constexpr int kSeed = 42;
constexpr int kByteWidth = 4;
::arrow::random::RandomArrayGenerator rag{kSeed};
std::shared_ptr<::arrow::Array> values =
rag.FixedSizeBinary(/*size=*/0, /*byte_width=*/kByteWidth);
CheckDirectPut(values);

for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
values = rag.FixedSizeBinary(kSize + seed, kByteWidth);
CheckDirectPut(values);
}
}

TYPED_TEST_SUITE(TestDeltaByteArrayEncodingDirectPut, TestDeltaByteArrayEncodingTypes);

TYPED_TEST(TestDeltaByteArrayEncodingDirectPut, DirectPut) {
ASSERT_NO_FATAL_FAILURE(this->CheckRoundtrip());
}

TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
auto CheckEncode = [](const std::shared_ptr<::arrow::Array>& values,
const std::shared_ptr<Buffer>& encoded) {
auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
ASSERT_NO_THROW(encoder->Put(*values));
auto buf = encoder->FlushValues();
ASSERT_TRUE(encoded->Equals(*buf));
};

auto CheckDecode = [](std::shared_ptr<Buffer> buf,
std::shared_ptr<::arrow::Array> values) {
int num_values = static_cast<int>(values->length());
auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));

typename EncodingTraits<ByteArrayType>::Accumulator acc;
if (::arrow::is_string(values->type()->id())) {
acc.builder = std::make_unique<::arrow::StringBuilder>();
} else {
acc.builder = std::make_unique<::arrow::BinaryBuilder>();
}

ASSERT_EQ(num_values,
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
values->null_bitmap_data(), values->offset(), &acc));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(acc.builder->Finish(&result));
ASSERT_EQ(num_values, result->length());
ASSERT_OK(result->ValidateFull());

rok marked this conversation as resolved.
Show resolved Hide resolved
auto upcast_result = CastBinaryTypesHelper(result, values->type());
::arrow::AssertArraysEqual(*values, *upcast_result);
};

auto CheckEncodeDecode = [&](std::string_view values,
std::shared_ptr<::arrow::Array> prefix_lengths,
std::shared_ptr<::arrow::Array> suffix_lengths,
std::string_view suffix_data) {
auto encoded = ::arrow::ConcatenateBuffers({DeltaEncode(prefix_lengths),
DeltaEncode(suffix_lengths),
std::make_shared<Buffer>(suffix_data)})
.ValueOrDie();

CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), encoded);
CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), encoded);
CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), encoded);
CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), encoded);

CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values));
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), values));
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values));
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
};

{
auto values = R"(["axis", "axle", "babble", "babyhood"])";
rok marked this conversation as resolved.
Show resolved Hide resolved
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 2, 0, 3])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 2, 6, 5])");

constexpr std::string_view suffix_data = "axislebabbleyhood";
CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}

{
auto values = R"(["axis", "axis", "axis", "axis"])";
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 4, 4])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 0, 0, 0])");

constexpr std::string_view suffix_data = "axis";
CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}

{
auto values = R"(["axisba", "axis", "axis", "axis"])";
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 4, 4])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 0, 0, 0])");

constexpr std::string_view suffix_data = "axisba";
CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}

{
auto values = R"(["baaxis", "axis", "axis", "axis"])";
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 0, 4, 4])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 4, 0, 0])");

constexpr std::string_view suffix_data = "baaxisaxis";
CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}

{
auto values = R"(["καλημέρα", "καμηλιέρη", "καμηλιέρη", "καλημέρα"])";
rok marked this conversation as resolved.
Show resolved Hide resolved
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 5, 18, 5])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([16, 13, 0, 11])");
const std::string suffix_data = "καλημέρα\xbcηλιέρη\xbbημέρα";
pitrou marked this conversation as resolved.
Show resolved Hide resolved
CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}
}
} // namespace parquet::test
Loading
Loading