diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index e972a86ccf0ef..a3cef4b4ce856 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -56,6 +56,8 @@ using arrow::Status; using arrow::VisitNullBitmapInline; using arrow::internal::AddWithOverflow; using arrow::internal::checked_cast; +using arrow::internal::MultiplyWithOverflow; +using arrow::internal::SubtractWithOverflow; using arrow::util::SafeLoad; using arrow::util::SafeLoadAs; using std::string_view; @@ -1178,41 +1180,126 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { return max_values; } -struct ArrowBinaryHelper { - explicit ArrowBinaryHelper(typename EncodingTraits::Accumulator* out) { - this->out = out; - this->builder = out->builder.get(); - this->chunk_space_remaining = - ::arrow::kBinaryMemoryLimit - this->builder->value_data_length(); +// A helper class to abstract away differences between EncodingTraits::Accumulator +// for ByteArrayType and FLBAType. +template +struct ArrowBinaryHelper; + +template <> +struct ArrowBinaryHelper { + using Accumulator = typename EncodingTraits::Accumulator; + + ArrowBinaryHelper(Accumulator* acc, int64_t length) + : acc_(acc), + entries_remaining_(length), + chunk_space_remaining_(::arrow::kBinaryMemoryLimit - + acc_->builder->value_data_length()) {} + + Status Prepare(std::optional estimated_data_length = {}) { + RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_)); + if (estimated_data_length.has_value()) { + RETURN_NOT_OK(acc_->builder->ReserveData( + std::min(*estimated_data_length, ::arrow::kBinaryMemoryLimit))); + } + return Status::OK(); + } + + Status PrepareNextInput(int64_t next_value_length, + std::optional estimated_remaining_data_length = {}) { + if (ARROW_PREDICT_FALSE(!CanFit(next_value_length))) { + // This element would exceed the capacity of a chunk + RETURN_NOT_OK(PushChunk()); + RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_)); + if (estimated_remaining_data_length.has_value()) { + RETURN_NOT_OK(acc_->builder->ReserveData( + std::min(*estimated_remaining_data_length, chunk_space_remaining_))); + } + } + return Status::OK(); + } + + void UnsafeAppend(const uint8_t* data, int32_t length) { + DCHECK(CanFit(length)); + DCHECK_GT(entries_remaining_, 0); + chunk_space_remaining_ -= length; + --entries_remaining_; + acc_->builder->UnsafeAppend(data, length); + } + + Status Append(const uint8_t* data, int32_t length) { + DCHECK(CanFit(length)); + DCHECK_GT(entries_remaining_, 0); + chunk_space_remaining_ -= length; + --entries_remaining_; + return acc_->builder->Append(data, length); + } + + void UnsafeAppendNull() { + --entries_remaining_; + acc_->builder->UnsafeAppendNull(); } + Status AppendNull() { + --entries_remaining_; + return acc_->builder->AppendNull(); + } + + private: Status PushChunk() { - std::shared_ptr<::arrow::Array> result; - RETURN_NOT_OK(builder->Finish(&result)); - out->chunks.push_back(result); - chunk_space_remaining = ::arrow::kBinaryMemoryLimit; + ARROW_ASSIGN_OR_RAISE(auto chunk, acc_->builder->Finish()); + acc_->chunks.push_back(std::move(chunk)); + chunk_space_remaining_ = ::arrow::kBinaryMemoryLimit; return Status::OK(); } - bool CanFit(int64_t length) const { return length <= chunk_space_remaining; } + bool CanFit(int64_t length) const { return length <= chunk_space_remaining_; } - void UnsafeAppend(const uint8_t* data, int32_t length) { - chunk_space_remaining -= length; - builder->UnsafeAppend(data, length); + Accumulator* acc_; + int64_t entries_remaining_; + int64_t chunk_space_remaining_; +}; + +template <> +struct ArrowBinaryHelper { + using Accumulator = typename EncodingTraits::Accumulator; + + ArrowBinaryHelper(Accumulator* acc, int64_t length) + : acc_(acc), entries_remaining_(length) {} + + Status Prepare(std::optional estimated_data_length = {}) { + return acc_->Reserve(entries_remaining_); } - void UnsafeAppendNull() { builder->UnsafeAppendNull(); } + Status PrepareNextInput(int64_t next_value_length, + std::optional estimated_remaining_data_length = {}) { + return Status::OK(); + } + + void UnsafeAppend(const uint8_t* data, int32_t length) { + DCHECK_GT(entries_remaining_, 0); + --entries_remaining_; + acc_->UnsafeAppend(data); + } Status Append(const uint8_t* data, int32_t length) { - chunk_space_remaining -= length; - return builder->Append(data, length); + DCHECK_GT(entries_remaining_, 0); + --entries_remaining_; + return acc_->Append(data); + } + + void UnsafeAppendNull() { + --entries_remaining_; + acc_->UnsafeAppendNull(); } - Status AppendNull() { return builder->AppendNull(); } + Status AppendNull() { + --entries_remaining_; + return acc_->AppendNull(); + } - typename EncodingTraits::Accumulator* out; - ::arrow::BinaryBuilder* builder; - int64_t chunk_space_remaining; + private: + Accumulator* acc_; + int64_t entries_remaining_; }; template <> @@ -1313,12 +1400,10 @@ class PlainByteArrayDecoder : public PlainDecoder, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out, int* out_values_decoded) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); int values_decoded = 0; - RETURN_NOT_OK(helper.builder->Reserve(num_values)); - RETURN_NOT_OK(helper.builder->ReserveData( - std::min(len_, helper.chunk_space_remaining))); + RETURN_NOT_OK(helper.Prepare(len_)); int i = 0; RETURN_NOT_OK(VisitNullBitmapInline( @@ -1335,13 +1420,7 @@ class PlainByteArrayDecoder : public PlainDecoder, if (ARROW_PREDICT_FALSE(len_ < increment)) { ParquetException::EofException(); } - if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { - // This element would exceed the capacity of a chunk - RETURN_NOT_OK(helper.PushChunk()); - RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); - RETURN_NOT_OK(helper.builder->ReserveData( - std::min(len_, helper.chunk_space_remaining))); - } + RETURN_NOT_OK(helper.PrepareNextInput(value_len, len_)); helper.UnsafeAppend(data_ + 4, value_len); data_ += increment; len_ -= increment; @@ -1421,7 +1500,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { byte_array_offsets_(AllocateBuffer(pool, 0)), indices_scratch_space_(AllocateBuffer(pool, 0)) {} - // Perform type-specific initiatialization + // Perform type-specific initialization void SetDict(TypedDecoder* dictionary) override; void SetData(int num_values, const uint8_t* data, int len) override { @@ -1834,7 +1913,8 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); + RETURN_NOT_OK(helper.Prepare()); auto dict_values = reinterpret_cast(dictionary_->data()); int values_decoded = 0; @@ -1855,9 +1935,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, const auto index = indices[pos_indices++]; RETURN_NOT_OK(IndexInBounds(index)); const auto& val = dict_values[index]; - if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - RETURN_NOT_OK(helper.PushChunk()); - } + RETURN_NOT_OK(helper.PrepareNextInput(val.len)); RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); ++values_decoded; return Status::OK(); @@ -1903,20 +1981,21 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, int32_t indices[kBufferSize]; int values_decoded = 0; - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); + RETURN_NOT_OK(helper.Prepare(len_)); + auto dict_values = reinterpret_cast(dictionary_->data()); while (values_decoded < num_values) { - int32_t batch_size = std::min(kBufferSize, num_values - values_decoded); - int num_indices = idx_decoder_.GetBatch(indices, batch_size); + const int32_t batch_size = + std::min(kBufferSize, num_values - values_decoded); + const int num_indices = idx_decoder_.GetBatch(indices, batch_size); if (num_indices == 0) ParquetException::EofException(); for (int i = 0; i < num_indices; ++i) { auto idx = indices[i]; RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; - if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - RETURN_NOT_OK(helper.PushChunk()); - } + RETURN_NOT_OK(helper.PrepareNextInput(val.len)); RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); } values_decoded += num_indices; @@ -2746,7 +2825,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out, int* out_num_values) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); + RETURN_NOT_OK(helper.Prepare()); std::vector values(num_values - null_count); const int num_valid_values = Decode(values.data(), num_values - null_count); @@ -2762,9 +2842,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, valid_bits, valid_bits_offset, num_values, null_count, [&]() { const auto& val = values_ptr[value_idx]; - if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - RETURN_NOT_OK(helper.PushChunk()); - } + RETURN_NOT_OK(helper.PrepareNextInput(val.len)); RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); ++value_idx; return Status::OK(); @@ -2782,8 +2860,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder len_decoder_; - int num_valid_values_; - uint32_t length_idx_; + int num_valid_values_{0}; + uint32_t length_idx_{0}; std::shared_ptr buffered_length_; }; @@ -2977,12 +3055,238 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder { + static constexpr std::string_view kEmpty = ""; + public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(/*descr=*/nullptr, pool), + suffix_encoder_(descr, pool), + last_value_(""), + empty_(static_cast(kEmpty.size()), + reinterpret_cast(kEmpty.data())) {} + + std::shared_ptr FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != nullptr) { + if (buffer_ == nullptr) { + PARQUET_ASSIGN_OR_THROW(buffer_, + ::arrow::AllocateResizableBuffer(num_values * sizeof(T), + this->memory_pool())); + } else { + PARQUET_THROW_NOT_OK(buffer_->Resize(num_values * sizeof(T), false)); + } + T* data = reinterpret_cast(buffer_->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + protected: + template + void PutInternal(const T* src, int num_values, const VisitorType visitor) { + if (num_values == 0) { + return; + } + + std::string_view last_value_view = last_value_; + constexpr int kBatchSize = 256; + std::array prefix_lengths; + std::array suffixes; + + for (int i = 0; i < num_values; i += kBatchSize) { + const int batch_size = std::min(kBatchSize, num_values - i); + + for (int j = 0; j < batch_size; ++j) { + const int idx = i + j; + const auto view = visitor[idx]; + const auto len = static_cast(view.length()); + + uint32_t common_prefix_length = 0; + const uint32_t maximum_common_prefix_length = + std::min(len, static_cast(last_value_view.length())); + while (common_prefix_length < maximum_common_prefix_length) { + if (last_value_view[common_prefix_length] != view[common_prefix_length]) { + break; + } + common_prefix_length++; + } + + last_value_view = view; + prefix_lengths[j] = common_prefix_length; + const uint32_t suffix_length = len - common_prefix_length; + const uint8_t* suffix_ptr = src[idx].ptr + common_prefix_length; + + // Convert to ByteArray, so it can be passed to the suffix_encoder_. + const ByteArray suffix(suffix_length, suffix_ptr); + suffixes[j] = suffix; + } + suffix_encoder_.Put(suffixes.data(), batch_size); + prefix_length_encoder_.Put(prefix_lengths.data(), batch_size); + } + last_value_ = last_value_view; + } + + template + void PutBinaryArray(const ArrayType& array) { + auto previous_len = static_cast(last_value_.length()); + std::string_view last_value_view = last_value_; + + PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline( + *array.data(), + [&](::std::string_view view) { + if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) { + return Status::Invalid("Parquet cannot store strings with size 2GB or more"); + } + const ByteArray src{view}; + + uint32_t common_prefix_length = 0; + const uint32_t len = src.len; + const uint32_t maximum_common_prefix_length = std::min(previous_len, len); + while (common_prefix_length < maximum_common_prefix_length) { + if (last_value_view[common_prefix_length] != view[common_prefix_length]) { + break; + } + common_prefix_length++; + } + previous_len = len; + prefix_length_encoder_.Put({static_cast(common_prefix_length)}, 1); + + last_value_view = view; + const auto suffix_length = static_cast(len - common_prefix_length); + if (suffix_length == 0) { + suffix_encoder_.Put(&empty_, 1); + return Status::OK(); + } + const uint8_t* suffix_ptr = src.ptr + common_prefix_length; + // Convert to ByteArray, so it can be passed to the suffix_encoder_. + const ByteArray suffix(suffix_length, suffix_ptr); + suffix_encoder_.Put(&suffix, 1); + + return Status::OK(); + }, + []() { return Status::OK(); })); + last_value_ = last_value_view; + } + + ::arrow::BufferBuilder sink_; + DeltaBitPackEncoder prefix_length_encoder_; + DeltaLengthByteArrayEncoder suffix_encoder_; + std::string last_value_; + const ByteArray empty_; + std::unique_ptr buffer_; +}; + +struct ByteArrayVisitor { + const ByteArray* src; + + std::string_view operator[](int i) const { + if (ARROW_PREDICT_FALSE(src[i].len >= kMaxByteArraySize)) { + throw ParquetException("Parquet cannot store strings with size 2GB or more"); + } + return std::string_view{src[i]}; + } + + uint32_t len(int i) const { return src[i].len; } +}; + +struct FLBAVisitor { + const FLBA* src; + const uint32_t type_length; + + std::string_view operator[](int i) const { + return std::string_view{reinterpret_cast(src[i].ptr), type_length}; + } + + uint32_t len(int i) const { return type_length; } +}; + +template <> +void DeltaByteArrayEncoder::Put(const ByteArray* src, int num_values) { + auto visitor = ByteArrayVisitor{src}; + PutInternal(src, num_values, visitor); +} + +template <> +void DeltaByteArrayEncoder::Put(const FLBA* src, int num_values) { + auto visitor = FLBAVisitor{src, static_cast(descr_->type_length())}; + PutInternal(src, num_values, visitor); +} + +template +void DeltaByteArrayEncoder::Put(const ::arrow::Array& values) { + if (::arrow::is_binary_like(values.type_id())) { + PutBinaryArray(checked_cast(values)); + } else if (::arrow::is_large_binary_like(values.type_id())) { + PutBinaryArray(checked_cast(values)); + } else if (::arrow::is_fixed_size_binary(values.type_id())) { + PutBinaryArray(checked_cast(values)); + } else { + throw ParquetException("Only BaseBinaryArray and subclasses supported"); + } +} + +template +std::shared_ptr DeltaByteArrayEncoder::FlushValues() { + PARQUET_THROW_NOT_OK(sink_.Resize(EstimatedDataEncodedSize(), false)); + + std::shared_ptr prefix_lengths = prefix_length_encoder_.FlushValues(); + PARQUET_THROW_NOT_OK(sink_.Append(prefix_lengths->data(), prefix_lengths->size())); + + std::shared_ptr suffixes = suffix_encoder_.FlushValues(); + PARQUET_THROW_NOT_OK(sink_.Append(suffixes->data(), suffixes->size())); + + std::shared_ptr buffer; + PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + last_value_.clear(); + return buffer; +} + +// ---------------------------------------------------------------------- +// DeltaByteArrayDecoder + +template +class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecoder { + using T = typename DType::c_type; + + public: + explicit DeltaByteArrayDecoderImpl(const ColumnDescriptor* descr, + MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY), + pool_(pool), prefix_len_decoder_(nullptr, pool), suffix_decoder_(nullptr, pool), last_value_in_previous_page_(""), @@ -3017,27 +3321,22 @@ class DeltaByteArrayDecoder : public DecoderImpl, last_value_ = ""; } - int Decode(ByteArray* buffer, int max_values) override { - return GetInternal(buffer, max_values); - } - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out) override { + typename EncodingTraits::Accumulator* out) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); return result; } - int DecodeArrow( - int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename EncodingTraits::DictAccumulator* builder) override { + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) override { ParquetException::NYI("DecodeArrow of DictAccumulator for DeltaByteArrayDecoder"); } - private: + protected: int GetInternal(ByteArray* buffer, int max_values) { // Decode up to `max_values` strings into an internal buffer // and reference them into `buffer`. @@ -3080,7 +3379,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, buffer[i].ptr = data_ptr; buffer[i].len += prefix_len_ptr[i]; data_ptr += buffer[i].len; - prefix = string_view{reinterpret_cast(buffer[i].ptr), buffer[i].len}; + prefix = std::string_view{buffer[i]}; } prefix_len_offset_ += max_values; this->num_values_ -= max_values; @@ -3095,9 +3394,10 @@ class DeltaByteArrayDecoder : public DecoderImpl, Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, + typename EncodingTraits::Accumulator* out, int* out_num_values) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); + RETURN_NOT_OK(helper.Prepare()); std::vector values(num_values); const int num_valid_values = GetInternal(values.data(), num_values - null_count); @@ -3110,9 +3410,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, valid_bits, valid_bits_offset, num_values, null_count, [&]() { const auto& val = values_ptr[value_idx]; - if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - RETURN_NOT_OK(helper.PushChunk()); - } + RETURN_NOT_OK(helper.PrepareNextInput(val.len)); RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); ++value_idx; return Status::OK(); @@ -3128,18 +3426,54 @@ class DeltaByteArrayDecoder : public DecoderImpl, return Status::OK(); } + MemoryPool* pool_; + + private: std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; std::string last_value_; // string buffer for last value in previous page std::string last_value_in_previous_page_; - int num_valid_values_; - uint32_t prefix_len_offset_; + int num_valid_values_{0}; + uint32_t prefix_len_offset_{0}; std::shared_ptr buffered_prefix_length_; std::shared_ptr buffered_data_; }; +class DeltaByteArrayDecoder : public DeltaByteArrayDecoderImpl { + public: + using Base = DeltaByteArrayDecoderImpl; + using Base::DeltaByteArrayDecoderImpl; + + int Decode(ByteArray* buffer, int max_values) override { + return GetInternal(buffer, max_values); + } +}; + +class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl, + virtual public FLBADecoder { + public: + using Base = DeltaByteArrayDecoderImpl; + using Base::DeltaByteArrayDecoderImpl; + using Base::pool_; + + int Decode(FixedLenByteArray* buffer, int max_values) override { + // GetInternal currently only support ByteArray. + std::vector decode_byte_array(max_values); + const int decoded_values_size = GetInternal(decode_byte_array.data(), max_values); + const uint32_t type_length = descr_->type_length(); + + for (int i = 0; i < decoded_values_size; i++) { + if (ARROW_PREDICT_FALSE(decode_byte_array[i].len != type_length)) { + throw ParquetException("Fixed length byte array length mismatch"); + } + buffer[i].ptr = decode_byte_array[i].ptr; + } + return decoded_values_size; + } +}; + // ---------------------------------------------------------------------- // BYTE_STREAM_SPLIT @@ -3353,6 +3687,16 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin default: throw ParquetException("RLE only supports BOOLEAN"); } + } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { + switch (type_num) { + case Type::BYTE_ARRAY: + return std::make_unique>(descr, pool); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique>(descr, pool); + default: + throw ParquetException( + "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); + } } else { ParquetException::NYI("Selected encoding is not supported"); } @@ -3404,10 +3748,15 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin "DELTA_BINARY_PACKED decoder only supports INT32 and INT64"); } } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { - if (type_num == Type::BYTE_ARRAY) { - return std::make_unique(descr, pool); + switch (type_num) { + case Type::BYTE_ARRAY: + return std::make_unique(descr, pool); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique(descr, pool); + default: + throw ParquetException( + "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); } - throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { if (type_num == Type::BYTE_ARRAY) { return std::make_unique(descr, pool); diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 9f9b740ff3424..6cdfe37920200 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -141,13 +141,13 @@ struct EncodingTraits { using Encoder = ByteArrayEncoder; using Decoder = ByteArrayDecoder; + using ArrowType = ::arrow::BinaryType; /// \brief Internal helper class for decoding BYTE_ARRAY data where we can /// overflow the capacity of a single arrow::BinaryArray struct Accumulator { std::unique_ptr<::arrow::BinaryBuilder> builder; std::vector> chunks; }; - using ArrowType = ::arrow::BinaryType; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>; }; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 0ac5fd76e79c9..71dc40d33ac47 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -36,6 +36,7 @@ #include "arrow/util/bitmap_writer.h" #include "arrow/util/checked_cast.h" #include "arrow/util/endian.h" +#include "arrow/util/span.h" #include "arrow/util/string.h" #include "parquet/encoding.h" #include "parquet/platform.h" @@ -181,7 +182,7 @@ class TestEncodingBase : public ::testing::Test { void TearDown() {} - void InitData(int nvalues, int repeats) { + virtual void InitData(int nvalues, int repeats) { num_values_ = nvalues * repeats; input_bytes_.resize(num_values_ * sizeof(c_type)); output_bytes_.resize(num_values_ * sizeof(c_type)); @@ -1705,11 +1706,13 @@ class TestDeltaLengthByteArrayEncoding : public TestEncodingBase { using c_type = typename Type::c_type; static constexpr int TYPE = Type::type_num; + virtual Encoding::type GetEncoding() { return Encoding::DELTA_LENGTH_BYTE_ARRAY; } + virtual void CheckRoundtrip() { - auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, + auto encoding = GetEncoding(); + auto encoder = MakeTypedEncoder(encoding, /*use_dictionary=*/false, descr_.get()); - auto decoder = - MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); + auto decoder = MakeTypedDecoder(encoding, descr_.get()); encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); @@ -1722,10 +1725,10 @@ class TestDeltaLengthByteArrayEncoding : public TestEncodingBase { } void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { - auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, + auto encoding = GetEncoding(); + auto encoder = MakeTypedEncoder(encoding, /*use_dictionary=*/false, descr_.get()); - auto decoder = - MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); + auto decoder = MakeTypedDecoder(encoding, descr_.get()); int null_count = 0; for (auto i = 0; i < num_values_; i++) { if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) { @@ -1771,6 +1774,19 @@ std::shared_ptr DeltaEncode(std::vector lengths) { return encoder->FlushValues(); } +std::shared_ptr DeltaEncode(::arrow::util::span lengths) { + auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED); + encoder->Put(lengths.data(), static_cast(lengths.size())); + return encoder->FlushValues(); +} + +std::shared_ptr DeltaEncode(std::shared_ptr<::arrow::Array>& lengths) { + auto data = ::arrow::internal::checked_pointer_cast(lengths); + auto span = ::arrow::util::span{data->raw_values(), + static_cast(lengths->length())}; + return DeltaEncode(span); +} + TEST(TestDeltaLengthByteArrayEncoding, AdHocRoundTrip) { const std::shared_ptr<::arrow::Array> cases[] = { ::arrow::ArrayFromJSON(::arrow::utf8(), R"([])"), @@ -1780,10 +1796,10 @@ TEST(TestDeltaLengthByteArrayEncoding, AdHocRoundTrip) { }; std::string expected_encoded_vals[] = { - DeltaEncode({})->ToString(), - DeltaEncode({3, 2, 0})->ToString() + "abcde", - DeltaEncode({0, 0, 0})->ToString(), - DeltaEncode({0, 3})->ToString() + "xyz", + DeltaEncode(std::vector({}))->ToString(), + DeltaEncode(std::vector({3, 2, 0}))->ToString() + "abcde", + DeltaEncode(std::vector({0, 0, 0}))->ToString(), + DeltaEncode(std::vector({0, 3}))->ToString() + "xyz", }; auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, @@ -1894,7 +1910,6 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { ASSERT_EQ(values->length(), result->length()); ASSERT_OK(result->ValidateFull()); - auto upcast_result = CastBinaryTypesHelper(result, values->type()); ::arrow::AssertArraysEqual(*values, *result); }; @@ -1977,4 +1992,263 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) { CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values)); } +// ---------------------------------------------------------------------- +// DELTA_BYTE_ARRAY encode/decode tests. + +template +class TestDeltaByteArrayEncoding : public TestDeltaLengthByteArrayEncoding { + public: + using c_type = typename Type::c_type; + static constexpr int TYPE = Type::type_num; + static constexpr double prefixed_probability = 0.5; + + void InitData(int nvalues, int repeats) override { + num_values_ = nvalues * repeats; + input_bytes_.resize(num_values_ * sizeof(c_type)); + output_bytes_.resize(num_values_ * sizeof(c_type)); + draws_ = reinterpret_cast(input_bytes_.data()); + decode_buf_ = reinterpret_cast(output_bytes_.data()); + GeneratePrefixedData(nvalues, draws_, &data_buffer_, prefixed_probability); + + // add some repeated values + for (int j = 1; j < repeats; ++j) { + for (int i = 0; i < nvalues; ++i) { + draws_[nvalues * j + i] = draws_[i]; + } + } + } + + Encoding::type GetEncoding() override { return Encoding::DELTA_BYTE_ARRAY; } + + protected: + USING_BASE_MEMBERS(); + std::vector input_bytes_; + std::vector output_bytes_; +}; + +using TestDeltaByteArrayEncodingTypes = ::testing::Types; +TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes); + +TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(0, /*repeats=*/0)); + ASSERT_NO_FATAL_FAILURE(this->Execute(250, 5)); + ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 1)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_probability*/ + 0)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.5)); +} + +template +class TestDeltaByteArrayEncodingDirectPut : public TestEncodingBase { + using ArrowType = typename EncodingTraits::ArrowType; + using Accumulator = typename EncodingTraits::Accumulator; + using BuilderType = typename ::arrow::TypeTraits::BuilderType; + + public: + std::unique_ptr> encoder = + MakeTypedEncoder(Encoding::DELTA_BYTE_ARRAY); + std::unique_ptr> decoder = + MakeTypedDecoder(Encoding::DELTA_BYTE_ARRAY); + + void CheckDirectPut(std::shared_ptr<::arrow::Array> array); + + void CheckRoundtrip() override; + + protected: + USING_BASE_MEMBERS(); +}; + +template <> +void TestDeltaByteArrayEncodingDirectPut::CheckDirectPut( + std::shared_ptr<::arrow::Array> array) { + ASSERT_NO_THROW(encoder->Put(*array)); + auto buf = encoder->FlushValues(); + + int num_values = static_cast(array->length() - array->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + Accumulator acc; + acc.builder = std::make_unique(array->type(), default_memory_pool()); + + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(array->length()), + static_cast(array->null_count()), + array->null_bitmap_data(), array->offset(), &acc)); + + ASSERT_EQ(acc.chunks.size(), 0) << "Accumulator shouldn't have overflowed chunks"; + ASSERT_OK_AND_ASSIGN(auto result, acc.builder->Finish()); + ASSERT_EQ(array->length(), result->length()); + ASSERT_OK(result->ValidateFull()); + + ::arrow::AssertArraysEqual(*array, *result); +} + +template <> +void TestDeltaByteArrayEncodingDirectPut::CheckDirectPut( + std::shared_ptr<::arrow::Array> array) { + ASSERT_NO_THROW(encoder->Put(*array)); + auto buf = encoder->FlushValues(); + + int num_values = static_cast(array->length() - array->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + Accumulator acc(array->type(), default_memory_pool()); + + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(array->length()), + static_cast(array->null_count()), + array->null_bitmap_data(), array->offset(), &acc)); + + ASSERT_OK_AND_ASSIGN(auto result, acc.Finish()); + ASSERT_EQ(array->length(), result->length()); + ASSERT_OK(result->ValidateFull()); + + ::arrow::AssertArraysEqual(*array, *result); +} + +template <> +void TestDeltaByteArrayEncodingDirectPut::CheckRoundtrip() { + constexpr int64_t kSize = 500; + constexpr int32_t kMinLength = 0; + constexpr int32_t kMaxLength = 10; + constexpr int32_t kNumUnique = 10; + constexpr double kNullProbability = 0.25; + constexpr int kSeed = 42; + ::arrow::random::RandomArrayGenerator rag{kSeed}; + std::shared_ptr<::arrow::Array> values = rag.BinaryWithRepeats( + /*size=*/1, /*unique=*/1, kMinLength, kMaxLength, kNullProbability); + CheckDirectPut(values); + + for (int i = 0; i < 10; ++i) { + values = rag.BinaryWithRepeats(kSize, kNumUnique, kMinLength, kMaxLength, + kNullProbability); + CheckDirectPut(values); + } +} + +template <> +void TestDeltaByteArrayEncodingDirectPut::CheckRoundtrip() { + constexpr int64_t kSize = 50; + constexpr int kSeed = 42; + constexpr int kByteWidth = 4; + ::arrow::random::RandomArrayGenerator rag{kSeed}; + std::shared_ptr<::arrow::Array> values = + rag.FixedSizeBinary(/*size=*/0, /*byte_width=*/kByteWidth); + CheckDirectPut(values); + + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + values = rag.FixedSizeBinary(kSize + seed, kByteWidth); + CheckDirectPut(values); + } +} + +TYPED_TEST_SUITE(TestDeltaByteArrayEncodingDirectPut, TestDeltaByteArrayEncodingTypes); + +TYPED_TEST(TestDeltaByteArrayEncodingDirectPut, DirectPut) { + ASSERT_NO_FATAL_FAILURE(this->CheckRoundtrip()); +} + +TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) { + auto CheckEncode = [](const std::shared_ptr<::arrow::Array>& values, + const std::shared_ptr& encoded) { + auto encoder = MakeTypedEncoder(Encoding::DELTA_BYTE_ARRAY); + ASSERT_NO_THROW(encoder->Put(*values)); + auto buf = encoder->FlushValues(); + ASSERT_TRUE(encoded->Equals(*buf)); + }; + + auto CheckDecode = [](std::shared_ptr buf, + std::shared_ptr<::arrow::Array> values) { + int num_values = static_cast(values->length()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BYTE_ARRAY); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + typename EncodingTraits::Accumulator acc; + if (::arrow::is_string(values->type()->id())) { + acc.builder = std::make_unique<::arrow::StringBuilder>(); + } else { + acc.builder = std::make_unique<::arrow::BinaryBuilder>(); + } + + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_EQ(num_values, result->length()); + ASSERT_OK(result->ValidateFull()); + + auto upcast_result = CastBinaryTypesHelper(result, values->type()); + ::arrow::AssertArraysEqual(*values, *upcast_result); + }; + + auto CheckEncodeDecode = [&](std::string_view values, + std::shared_ptr<::arrow::Array> prefix_lengths, + std::shared_ptr<::arrow::Array> suffix_lengths, + std::string_view suffix_data) { + auto encoded = ::arrow::ConcatenateBuffers({DeltaEncode(prefix_lengths), + DeltaEncode(suffix_lengths), + std::make_shared(suffix_data)}) + .ValueOrDie(); + + CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), encoded); + CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), encoded); + CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), encoded); + CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), encoded); + + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values)); + }; + + { + auto values = R"(["axis", "axle", "babble", "babyhood"])"; + auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 2, 0, 3])"); + auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 2, 6, 5])"); + + constexpr std::string_view suffix_data = "axislebabbleyhood"; + CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data); + } + + { + auto values = R"(["axis", "axis", "axis", "axis"])"; + auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 4, 4])"); + auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 0, 0, 0])"); + + constexpr std::string_view suffix_data = "axis"; + CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data); + } + + { + auto values = R"(["axisba", "axis", "axis", "axis"])"; + auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 4, 4])"); + auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 0, 0, 0])"); + + constexpr std::string_view suffix_data = "axisba"; + CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data); + } + + { + auto values = R"(["baaxis", "axis", "axis", "axis"])"; + auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 0, 4, 4])"); + auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 4, 0, 0])"); + + constexpr std::string_view suffix_data = "baaxisaxis"; + CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data); + } + + { + auto values = R"(["καλημέρα", "καμηλιέρη", "καμηλιέρη", "καλημέρα"])"; + auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 5, 18, 5])"); + auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([16, 13, 0, 11])"); + const std::string suffix_data = "καλημέρα\xbcηλιέρη\xbbημέρα"; + CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data); + } +} } // namespace parquet::test diff --git a/cpp/src/parquet/test_util.cc b/cpp/src/parquet/test_util.cc index 9d104618bfd7e..b65945cc7329f 100644 --- a/cpp/src/parquet/test_util.cc +++ b/cpp/src/parquet/test_util.cc @@ -30,6 +30,7 @@ #include #include +#include "arrow/testing/uniform_real.h" #include "parquet/column_page.h" #include "parquet/column_reader.h" #include "parquet/column_writer.h" @@ -132,5 +133,56 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int m random_byte_array(n, seed, buf, out, 0, max_size); } +void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, + int min_size, int max_size, double prefixed_probability) { + std::default_random_engine gen(seed); + std::uniform_int_distribution dist_size(min_size, max_size); + std::uniform_int_distribution dist_byte(0, 255); + std::bernoulli_distribution dist_has_prefix(prefixed_probability); + std::uniform_real_distribution dist_prefix_length(0, 1); + + for (int i = 0; i < n; ++i) { + int len = dist_size(gen); + out[i].len = len; + out[i].ptr = buf; + + bool do_prefix = dist_has_prefix(gen) && i > 0; + int prefix_len = 0; + if (do_prefix) { + int max_prefix_len = std::min(len, static_cast(out[i - 1].len)); + prefix_len = static_cast(std::ceil(max_prefix_len * dist_prefix_length(gen))); + } + for (int j = 0; j < prefix_len; ++j) { + buf[j] = out[i - 1].ptr[j]; + } + for (int j = prefix_len; j < len; ++j) { + buf[j] = static_cast(dist_byte(gen)); + } + buf += len; + } +} + +void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, int len, FLBA* out, + double prefixed_probability) { + std::default_random_engine gen(seed); + std::uniform_int_distribution dist_byte(0, 255); + std::bernoulli_distribution dist_has_prefix(prefixed_probability); + std::uniform_int_distribution dist_size(0, len); + + for (int i = 0; i < n; ++i) { + out[i].ptr = buf; + + bool do_prefix = dist_has_prefix(gen) && i > 0; + int prefix_len = do_prefix ? dist_size(gen) : 0; + for (int j = 0; j < prefix_len; ++j) { + buf[j] = out[i - 1].ptr[j]; + } + for (int j = prefix_len; j < len; ++j) { + buf[j] = static_cast(dist_byte(gen)); + } + buf += len; + } +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/test_util.h b/cpp/src/parquet/test_util.h index b0aafa037ead1..c8578609e9b1d 100644 --- a/cpp/src/parquet/test_util.h +++ b/cpp/src/parquet/test_util.h @@ -155,6 +155,12 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int m void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int max_size); +void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, + int min_size, int max_size, double prefixed_probability); + +void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, int len, FLBA* out, + double prefixed_probability); + template std::shared_ptr EncodeValues(Encoding::type encoding, bool use_dictionary, const Sequence& values, int length, @@ -777,18 +783,46 @@ inline void GenerateData(int num_values, Int96* out, std::vector template <> inline void GenerateData(int num_values, ByteArray* out, std::vector* heap) { - // seed the prng so failure is deterministic int max_byte_array_len = 12; heap->resize(num_values * max_byte_array_len); + // seed the prng so failure is deterministic random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len); } +// Generate ByteArray or FLBA data where there is a given probability +// for each value to share a common prefix with its predecessor. +// This is useful to exercise prefix-based encodings such as DELTA_BYTE_ARRAY. +template +inline void GeneratePrefixedData(int num_values, T* out, std::vector* heap, + double prefixed_probability); + +template <> +inline void GeneratePrefixedData(int num_values, ByteArray* out, + std::vector* heap, + double prefixed_probability) { + int max_byte_array_len = 12; + heap->resize(num_values * max_byte_array_len); + // seed the prng so failure is deterministic + prefixed_random_byte_array(num_values, /*seed=*/0, heap->data(), out, /*min_size=*/2, + /*max_size=*/max_byte_array_len, prefixed_probability); +} + static constexpr int kGenerateDataFLBALength = 8; template <> -inline void GenerateData(int num_values, FLBA* out, std::vector* heap) { +inline void GeneratePrefixedData(int num_values, FLBA* out, + std::vector* heap, + double prefixed_probability) { + heap->resize(num_values * kGenerateDataFLBALength); // seed the prng so failure is deterministic + prefixed_random_byte_array(num_values, /*seed=*/0, heap->data(), + kGenerateDataFLBALength, out, prefixed_probability); +} + +template <> +inline void GenerateData(int num_values, FLBA* out, std::vector* heap) { heap->resize(num_values * kGenerateDataFLBALength); + // seed the prng so failure is deterministic random_fixed_byte_array(num_values, 0, heap->data(), kGenerateDataFLBALength, out); } diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index e81e9de0a1efa..0315376a883e9 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -577,6 +577,11 @@ struct ByteArray { ByteArray(::std::string_view view) // NOLINT implicit conversion : ByteArray(static_cast(view.size()), reinterpret_cast(view.data())) {} + + explicit operator std::string_view() const { + return std::string_view{reinterpret_cast(ptr), len}; + } + uint32_t len; const uint8_t* ptr; }; diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 95f2d8d98dc0a..23fca8fd73010 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -401,7 +401,7 @@ Encodings +--------------------------+----------+----------+---------+ | DELTA_BINARY_PACKED | ✓ | ✓ | | +--------------------------+----------+----------+---------+ -| DELTA_BYTE_ARRAY | ✓ | | | +| DELTA_BYTE_ARRAY | ✓ | ✓ | | +--------------------------+----------+----------+---------+ | DELTA_LENGTH_BYTE_ARRAY | ✓ | ✓ | | +--------------------------+----------+----------+---------+ diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 9bc59cbcf96eb..dd12a2661656a 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -392,9 +392,13 @@ def test_byte_stream_split(use_legacy_dataset): def test_column_encoding(use_legacy_dataset): arr_float = pa.array(list(map(float, range(100)))) arr_int = pa.array(list(map(int, range(100)))) - arr_bin = pa.array([str(x) for x in range(100)]) - mixed_table = pa.Table.from_arrays([arr_float, arr_int, arr_bin], - names=['a', 'b', 'c']) + arr_bin = pa.array([str(x) for x in range(100)], type=pa.binary()) + arr_flba = pa.array( + [str(x).zfill(10) for x in range(100)], type=pa.binary(10)) + arr_bool = pa.array([False, True, False, False] * 25) + mixed_table = pa.Table.from_arrays( + [arr_float, arr_int, arr_bin, arr_flba, arr_bool], + names=['a', 'b', 'c', 'd', 'e']) # Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for # column 'b' and 'c'. @@ -426,6 +430,21 @@ def test_column_encoding(use_legacy_dataset): 'c': "DELTA_LENGTH_BYTE_ARRAY"}, use_legacy_dataset=use_legacy_dataset) + # Check "DELTA_BYTE_ARRAY" for byte columns. + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=False, + column_encoding={'a': "PLAIN", + 'b': "DELTA_BINARY_PACKED", + 'c': "DELTA_BYTE_ARRAY", + 'd': "DELTA_BYTE_ARRAY"}, + use_legacy_dataset=use_legacy_dataset) + + # Check "RLE" for boolean columns. + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=False, + column_encoding={'e': "RLE"}, + use_legacy_dataset=use_legacy_dataset) + # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'. # This should throw an error as it is only supports FLOAT and DOUBLE. with pytest.raises(IOError,