From f4d8380913727f7615a0e424c4a29eaa058e9213 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sat, 16 Nov 2024 20:39:14 +0800 Subject: [PATCH] extend encoder to collect bytes and simplify interface --- cpp/src/arrow/util/hashing.h | 8 + cpp/src/parquet/column_page.h | 13 +- cpp/src/parquet/column_writer.cc | 132 ++++++----- cpp/src/parquet/encoder.cc | 60 ++++- cpp/src/parquet/encoding.h | 4 + cpp/src/parquet/page_index.cc | 8 +- cpp/src/parquet/page_index.h | 3 +- cpp/src/parquet/page_index_test.cc | 12 +- cpp/src/parquet/size_statistics.cc | 176 ++------------- cpp/src/parquet/size_statistics.h | 65 +----- cpp/src/parquet/size_statistics_test.cc | 284 ++++-------------------- cpp/src/parquet/type_fwd.h | 1 - 12 files changed, 225 insertions(+), 541 deletions(-) diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index 4ead1a7283d81..52525a83aa2ea 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable { } } + // Visit the stored value at a specific index in insertion order. + // The visitor function should have the signature `void(std::string_view)` + // or `void(const std::string_view&)`. + template + void VisitValue(int32_t idx, VisitFunc&& visit) const { + visit(binary_builder_.GetView(idx)); + } + protected: struct Payload { int32_t memo_index; diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h index d4a9c708c41d0..111265a842ee7 100644 --- a/cpp/src/parquet/column_page.h +++ b/cpp/src/parquet/column_page.h @@ -70,9 +70,7 @@ class DataPage : public Page { /// Currently it is only present from data pages created by ColumnWriter in order /// to collect page index. std::optional first_row_index() const { return first_row_index_; } - const std::shared_ptr& size_statistics() const { - return size_statistics_; - } + const SizeStatistics& size_statistics() const { return size_statistics_; } virtual ~DataPage() = default; @@ -80,7 +78,7 @@ class DataPage : public Page { DataPage(PageType::type type, const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, int64_t uncompressed_size, EncodedStatistics statistics, std::optional first_row_index, - std::shared_ptr size_statistics) + SizeStatistics size_statistics) : Page(buffer, type), num_values_(num_values), encoding_(encoding), @@ -95,8 +93,7 @@ class DataPage : public Page { EncodedStatistics statistics_; /// Row ordinal within the row group to the first row in the data page. std::optional first_row_index_; - /// Size statistics for the data page. It may be null if unavailable. - std::shared_ptr size_statistics_; + SizeStatistics size_statistics_; }; class DataPageV1 : public DataPage { @@ -106,7 +103,7 @@ class DataPageV1 : public DataPage { Encoding::type repetition_level_encoding, int64_t uncompressed_size, EncodedStatistics statistics = EncodedStatistics(), std::optional first_row_index = std::nullopt, - std::shared_ptr size_statistics = NULLPTR) + SizeStatistics size_statistics = SizeStatistics()) : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size, std::move(statistics), std::move(first_row_index), std::move(size_statistics)), @@ -130,7 +127,7 @@ class DataPageV2 : public DataPage { int64_t uncompressed_size, bool is_compressed = false, EncodedStatistics statistics = EncodedStatistics(), std::optional first_row_index = std::nullopt, - std::shared_ptr size_statistics = NULLPTR) + SizeStatistics size_statistics = SizeStatistics()) : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size, std::move(statistics), std::move(first_row_index), std::move(size_statistics)), diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 291f016c03b60..3d5e7b1084e52 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -435,11 +435,10 @@ class SerializedPageWriter : public PageWriter { const int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_.get()); PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len)); - const auto& page_size_stats = page.size_statistics(); /// Collect page index if (column_index_builder_ != nullptr) { - column_index_builder_->AddPage(page.statistics(), page_size_stats.get()); + column_index_builder_->AddPage(page.statistics(), page.size_statistics()); } if (offset_index_builder_ != nullptr) { const int64_t compressed_size = output_data_len + header_size; @@ -455,8 +454,7 @@ class SerializedPageWriter : public PageWriter { /// has flushed all data pages. offset_index_builder_->AddPage( start_pos, static_cast(compressed_size), *page.first_row_index(), - page_size_stats ? page_size_stats->unencoded_byte_array_data_bytes - : std::nullopt); + page.size_statistics().unencoded_byte_array_data_bytes); } total_uncompressed_size_ += uncompressed_size + header_size; @@ -778,10 +776,10 @@ class ColumnWriterImpl { // Serializes Dictionary Page if enabled virtual void WriteDictionaryPage() = 0; - // A convenience struct to combine the encoded statistics and the size statistics + // A convenience struct to combine the encoded statistics and size statistics struct StatisticsPair { - EncodedStatistics encoded_stats; // required - std::shared_ptr size_statistics; // may be null if disabled + EncodedStatistics encoded_stats; + SizeStatistics size_stats; }; // Plain-encoded statistics of the current page @@ -1095,7 +1093,7 @@ int64_t ColumnWriterImpl::Close() { FlushBufferedDataPages(); - auto [chunk_statistics, chunk_size_stats] = GetChunkStatistics(); + auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics(); chunk_statistics.ApplyStatSizeLimits( properties_->max_statistics_size(descr_->path())); chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); @@ -1104,8 +1102,8 @@ int64_t ColumnWriterImpl::Close() { if (rows_written_ > 0 && chunk_statistics.is_set()) { metadata_->SetStatistics(chunk_statistics); } - if (rows_written_ > 0 && chunk_size_stats) { - metadata_->SetSizeStatistics(*chunk_size_stats); + if (rows_written_ > 0 && chunk_size_statistics.is_set()) { + metadata_->SetSizeStatistics(chunk_size_statistics); } metadata_->SetKeyValueMetadata(key_value_metadata_); pager_->Close(has_dictionary_, fallback_); @@ -1232,15 +1230,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< page_statistics_ = MakeStatistics(descr_, allocator_); chunk_statistics_ = MakeStatistics(descr_, allocator_); } - pages_change_on_record_boundaries_ = - properties->data_page_version() == ParquetDataPageVersion::V2 || - properties->page_index_enabled(descr_->path()); - if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk || properties->size_statistics_level() == SizeStatisticsLevel::Page) { - page_size_stats_builder_ = SizeStatisticsBuilder::Make(descr_); - chunk_size_stats_ = page_size_stats_builder_->Build(); + page_size_statistics_ = MakeSizeStatistics(descr_); + chunk_size_statistics_ = MakeSizeStatistics(descr_); } + pages_change_on_record_boundaries_ = + properties->data_page_version() == ParquetDataPageVersion::V2 || + properties->page_index_enabled(descr_->path()); } int64_t Close() override { return ColumnWriterImpl::Close(); } @@ -1378,7 +1375,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< result.encoded_stats = page_statistics_->Encode(); } if (properties_->size_statistics_level() == SizeStatisticsLevel::Page) { - result.size_statistics = page_size_stats_builder_->Build(); + ARROW_DCHECK(page_size_statistics_ != nullptr); + result.size_stats = *page_size_statistics_; } return result; } @@ -1388,8 +1386,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< if (chunk_statistics_) { result.encoded_stats = chunk_statistics_->Encode(); } - if (chunk_size_stats_) { - result.size_statistics = chunk_size_stats_; + if (chunk_size_statistics_) { + result.size_stats = *chunk_size_statistics_; } return result; } @@ -1399,10 +1397,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< chunk_statistics_->Merge(*page_statistics_); page_statistics_->Reset(); } - if (page_size_stats_builder_ != nullptr) { - auto page_size_stats = page_size_stats_builder_->Build(); - chunk_size_stats_->Merge(*page_size_stats); - page_size_stats_builder_->Reset(); + if (page_size_statistics_ != nullptr) { + chunk_size_statistics_->Merge(*page_size_statistics_); + page_size_statistics_->Reset(); } } @@ -1457,6 +1454,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< DictEncoder* current_dict_encoder_; std::shared_ptr page_statistics_; std::shared_ptr chunk_statistics_; + std::unique_ptr page_size_statistics_; + std::shared_ptr chunk_size_statistics_; bool pages_change_on_record_boundaries_; // If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the @@ -1465,10 +1464,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // which case we call back to the dense write path) std::shared_ptr<::arrow::Array> preserved_dictionary_; - // Utility to collect and store SizeStatistics of page and chunk. - std::unique_ptr page_size_stats_builder_; - std::shared_ptr chunk_size_stats_; - int64_t WriteLevels(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels) { int64_t values_to_write = 0; @@ -1504,7 +1499,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< num_buffered_rows_ += num_values; } - CollectLevelHistogram(num_values, def_levels, rep_levels); + UpdateLevelHistogram(num_values, def_levels, rep_levels); return values_to_write; } @@ -1597,27 +1592,44 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< num_buffered_rows_ += num_levels; } - CollectLevelHistogram(num_levels, def_levels, rep_levels); + UpdateLevelHistogram(num_levels, def_levels, rep_levels); } - void CollectLevelHistogram(int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels) { - if (page_size_stats_builder_ == nullptr) { + void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels) const { + if (page_size_statistics_ == nullptr) { return; } + auto add_levels = [](std::vector& level_histogram, + ::arrow::util::span levels) { + for (int16_t level : levels) { + ARROW_DCHECK_LT(level, static_cast(level_histogram.size())); + ++level_histogram[level]; + } + }; + if (descr_->max_definition_level() > 0) { - page_size_stats_builder_->AddDefinitionLevels( - {def_levels, static_cast(num_levels)}); + add_levels(page_size_statistics_->definition_level_histogram, + {def_levels, static_cast(num_levels)}); } else { - page_size_stats_builder_->AddRepeatedDefinitionLevels(num_levels, /*def_level=*/0); + page_size_statistics_->definition_level_histogram[0] += num_levels; } if (descr_->max_repetition_level() > 0) { - page_size_stats_builder_->AddRepetitionLevels( - {rep_levels, static_cast(num_levels)}); + add_levels(page_size_statistics_->repetition_level_histogram, + {rep_levels, static_cast(num_levels)}); } else { - page_size_stats_builder_->AddRepeatedRepetitionLevels(num_levels, /*rep_level=*/0); + page_size_statistics_->repetition_level_histogram[0] += num_levels; + } + } + + void UpdateUnencodedDataBytes() const { + if constexpr (std::is_same_v) { + if (page_size_statistics_ != nullptr) { + page_size_statistics_->IncrementUnencodedByteArrayDataBytes( + current_encoder_->ReportUnencodedDataBytes()); + } } } @@ -1672,11 +1684,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< if (page_statistics_ != nullptr) { page_statistics_->Update(values, num_values, num_nulls); } - if constexpr (std::is_same_v) { - if (page_size_stats_builder_ != nullptr) { - page_size_stats_builder_->AddValues(values, num_values); - } - } + UpdateUnencodedDataBytes(); } /// \brief Write values with spaces and update page statistics accordingly. @@ -1705,12 +1713,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_spaced_values, num_values, num_nulls); } - if constexpr (std::is_same_v) { - if (page_size_stats_builder_ != nullptr) { - page_size_stats_builder_->AddValuesSpaced(values, valid_bits, valid_bits_offset, - num_spaced_values); - } - } + UpdateUnencodedDataBytes(); } }; @@ -1769,14 +1772,8 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( exec_ctx.set_use_threads(false); std::shared_ptr<::arrow::Array> referenced_dictionary; - ::arrow::Datum referenced_indices; - if (page_size_stats_builder_) { - // SizeStatistics need to compute total bytes, so we cannot extract unique values. - referenced_indices = *chunk_indices; - } else { - PARQUET_ASSIGN_OR_THROW(referenced_indices, - ::arrow::compute::Unique(*chunk_indices, &exec_ctx)); - } + PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, + ::arrow::compute::Unique(*chunk_indices, &exec_ctx)); // On first run, we might be able to re-use the existing dictionary if (referenced_indices.length() == dictionary->length()) { @@ -1790,15 +1787,10 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( referenced_dictionary = referenced_dictionary_datum.make_array(); } - if (page_statistics_) { - int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); - page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); - page_statistics_->IncrementNumValues(non_null_count); - page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); - } - if (page_size_stats_builder_) { - page_size_stats_builder_->AddValues(*referenced_dictionary); - } + int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); + page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); + page_statistics_->IncrementNumValues(non_null_count); + page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); }; int64_t value_offset = 0; @@ -1815,13 +1807,15 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( AddIfNotNull(rep_levels, offset)); std::shared_ptr writeable_indices = indices->Slice(value_offset, batch_num_spaced_values); - if (page_statistics_ || page_size_stats_builder_) { + if (page_statistics_) { update_stats(/*num_chunk_levels=*/batch_size, writeable_indices); } PARQUET_ASSIGN_OR_THROW( writeable_indices, MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool)); dict_encoder->PutIndices(*writeable_indices); + // Update unencoded byte array data size to size statistics + UpdateUnencodedDataBytes(); CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, check_page); value_offset += batch_num_spaced_values; }; @@ -2302,9 +2296,7 @@ Status TypedColumnWriterImpl::WriteArrowDense( page_statistics_->IncrementNullCount(batch_size - non_null); page_statistics_->IncrementNumValues(non_null); } - if (page_size_stats_builder_ != nullptr) { - page_size_stats_builder_->AddValues(*data_slice); - } + UpdateUnencodedDataBytes(); CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null, check_page); CheckDictionarySizeLimit(); diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc index 89d5d44c5219c..9a4ea331d77f8 100644 --- a/cpp/src/parquet/encoder.cc +++ b/cpp/src/parquet/encoder.cc @@ -79,6 +79,12 @@ class EncoderImpl : virtual public Encoder { MemoryPool* memory_pool() const override { return pool_; } + int64_t ReportUnencodedDataBytes() override { + int64_t bytes = unencoded_data_bytes_; + unencoded_data_bytes_ = 0; + return bytes; + } + protected: // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY const ColumnDescriptor* descr_; @@ -87,6 +93,8 @@ class EncoderImpl : virtual public Encoder { /// Type length from descr const int type_length_; + /// Number of unencoded bytes written to the encoder + int64_t unencoded_data_bytes_ = 0; }; // ---------------------------------------------------------------------- @@ -132,6 +140,7 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder { DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL"; sink_.UnsafeAppend(&length, sizeof(uint32_t)); sink_.UnsafeAppend(data, static_cast(length)); + unencoded_data_bytes_ += length; } void Put(const ByteArray& val) { @@ -170,6 +179,7 @@ template void PlainEncoder::Put(const T* buffer, int num_values) { if (num_values > 0) { PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T))); + unencoded_data_bytes_ += num_values * sizeof(T); } } @@ -181,7 +191,8 @@ inline void PlainEncoder::Put(const ByteArray* src, int num_value } template -void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) { +[[nodiscard]] int64_t DirectPutImpl(const ::arrow::Array& values, + ::arrow::BufferBuilder* sink) { if (values.type_id() != ArrayType::TypeClass::type_id) { std::string type_name = ArrayType::TypeClass::type_name(); throw ParquetException("direct put to " + type_name + " from " + @@ -192,6 +203,7 @@ void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) { constexpr auto value_size = sizeof(value_type); auto raw_values = checked_cast(values).raw_values(); + int64_t original_length = sink->length(); if (values.null_count() == 0) { // no nulls, just dump the data PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size)); @@ -205,16 +217,17 @@ void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) { } } } + return sink->length() - original_length; } template <> void PlainEncoder::Put(const ::arrow::Array& values) { - DirectPutImpl<::arrow::Int32Array>(values, &sink_); + unencoded_data_bytes_ += DirectPutImpl<::arrow::Int32Array>(values, &sink_); } template <> void PlainEncoder::Put(const ::arrow::Array& values) { - DirectPutImpl<::arrow::Int64Array>(values, &sink_); + unencoded_data_bytes_ += DirectPutImpl<::arrow::Int64Array>(values, &sink_); } template <> @@ -224,12 +237,12 @@ void PlainEncoder::Put(const ::arrow::Array& values) { template <> void PlainEncoder::Put(const ::arrow::Array& values) { - DirectPutImpl<::arrow::FloatArray>(values, &sink_); + unencoded_data_bytes_ += DirectPutImpl<::arrow::FloatArray>(values, &sink_); } template <> void PlainEncoder::Put(const ::arrow::Array& values) { - DirectPutImpl<::arrow::DoubleArray>(values, &sink_); + unencoded_data_bytes_ += DirectPutImpl<::arrow::DoubleArray>(values, &sink_); } template @@ -275,6 +288,7 @@ inline void PlainEncoder::Put(const ::arrow::Array& values) { // no nulls, just dump the data PARQUET_THROW_NOT_OK( sink_.Append(data.raw_values(), data.length() * data.byte_width())); + unencoded_data_bytes_ += data.length() * data.byte_width(); } else { const int64_t total_bytes = data.length() * data.byte_width() - data.null_count() * data.byte_width(); @@ -284,6 +298,7 @@ inline void PlainEncoder::Put(const ::arrow::Array& values) { sink_.UnsafeAppend(data.Value(i), data.byte_width()); } } + unencoded_data_bytes_ += total_bytes; } } @@ -297,6 +312,7 @@ inline void PlainEncoder::Put(const FixedLenByteArray* src, int num_va DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL"; PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length())); } + unencoded_data_bytes_ += num_values * descr_->type_length(); } template <> @@ -346,6 +362,7 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco } } } + unencoded_data_bytes_ += sizeof(uint8_t) * (data.length() - data.null_count()); } private: @@ -361,6 +378,7 @@ void PlainEncoder::PutImpl(const SequenceType& src, int num_values) for (int i = 0; i < num_values; ++i) { sink_.UnsafeAppend(src[i]); } + unencoded_data_bytes_ += sizeof(SequenceType) * num_values; } int64_t PlainEncoder::EstimatedDataEncodedSize() { @@ -513,6 +531,22 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { static_cast(values[i + position]); } }); + + // Track unencoded bytes based on dictionary value type + if constexpr (std::is_same_v) { + // For ByteArray, need to look up actual lengths from dictionary + for (size_t idx = + buffer_position - static_cast(data.length() - data.null_count()); + idx < buffer_position; ++idx) { + memo_table_.VisitValue(buffered_indices_[idx], [&](std::string_view value) { + unencoded_data_bytes_ += value.length(); + }); + } + } else if constexpr (std::is_same_v) { + unencoded_data_bytes_ += type_length_ * (data.length() - data.null_count()); + } else { + unencoded_data_bytes_ += sizeof(T) * (data.length() - data.null_count()); + } } void PutIndices(const ::arrow::Array& data) override { @@ -633,6 +667,7 @@ inline void DictEncoderImpl::Put(const T& v) { int32_t memo_index; PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); + unencoded_data_bytes_ += sizeof(T); } template @@ -656,6 +691,7 @@ inline void DictEncoderImpl::PutByteArray(const void* ptr, PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); + unencoded_data_bytes_ += length; } template <> @@ -676,6 +712,7 @@ inline void DictEncoderImpl::Put(const FixedLenByteArray& v) { PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); + unencoded_data_bytes_ += type_length_; } template <> @@ -870,6 +907,7 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase { this->sink_.Append(reinterpret_cast(buffer), num_values * static_cast(sizeof(T)))); this->num_values_in_buffer_ += num_values; + this->unencoded_data_bytes_ += num_values * sizeof(T); } } @@ -913,6 +951,7 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBasenum_values_in_buffer_ += num_values; + this->unencoded_data_bytes_ += num_values * byte_width_; } void Put(const ::arrow::Array& values) override { @@ -922,6 +961,7 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBasenum_values_in_buffer_ += data.length(); + this->unencoded_data_bytes_ += data.length() * byte_width_; } else { const int64_t num_values = data.length() - data.null_count(); const int64_t total_bytes = num_values * byte_width_; @@ -933,6 +973,7 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBasenum_values_in_buffer_ += num_values; + this->unencoded_data_bytes_ += total_bytes; } } }; @@ -1075,6 +1116,8 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { FlushBlock(); } } + + unencoded_data_bytes_ += num_values * sizeof(T); } template @@ -1268,6 +1311,7 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, } length_encoder_.Put({static_cast(view.length())}, 1); PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length())); + unencoded_data_bytes_ += view.size(); return Status::OK(); }, []() { return Status::OK(); })); @@ -1313,6 +1357,7 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { for (int idx = 0; idx < num_values; idx++) { sink_.UnsafeAppend(src[idx].ptr, src[idx].len); } + unencoded_data_bytes_ += total_increment_size; } void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, @@ -1444,6 +1489,8 @@ class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder
RleBooleanEncoder::FlushValues() { diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 5717886f10759..0341ad10c5279 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -158,6 +158,10 @@ class Encoder { virtual Encoding::type encoding() const = 0; virtual void Put(const ::arrow::Array& values) = 0; + // Report the number of bytes before encoding that have been written + // to the encoder since the last report. Note that this call is not + // idempotent because it resets the internal counter. + virtual int64_t ReportUnencodedDataBytes() = 0; virtual MemoryPool* memory_pool() const = 0; }; diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 1f97239e65bb7..9f40542c18728 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -485,7 +485,7 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { } void AddPage(const EncodedStatistics& stats, - const SizeStatistics* size_stats) override { + const SizeStatistics& size_stats) override { if (state_ == BuilderState::kFinished) { throw ParquetException("Cannot add page to finished ColumnIndexBuilder."); } else if (state_ == BuilderState::kDiscarded) { @@ -519,9 +519,9 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { column_index_.null_counts.clear(); } - if (size_stats != nullptr) { - const auto& page_ref_level_hist = size_stats->repetition_level_histogram; - const auto& page_def_level_hist = size_stats->definition_level_histogram; + if (size_stats.is_set()) { + const auto& page_ref_level_hist = size_stats.repetition_level_histogram; + const auto& page_def_level_hist = size_stats.definition_level_histogram; column_index_.repetition_level_histograms.insert( column_index_.repetition_level_histograms.end(), page_ref_level_hist.cbegin(), page_ref_level_hist.cend()); diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index f612fb2f27c6b..2c51838730b30 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -21,6 +21,7 @@ #include "parquet/encryption/type_fwd.h" #include "parquet/type_fwd.h" #include "parquet/types.h" +#include "size_statistics.h" #include #include @@ -282,7 +283,7 @@ class PARQUET_EXPORT ColumnIndexBuilder { /// \param stats Page statistics in the encoded form. /// \param size_stats Size statistics of the page if available. virtual void AddPage(const EncodedStatistics& stats, - const SizeStatistics* size_stats = NULLPTR) = 0; + const SizeStatistics& size_stats = SizeStatistics{}) = 0; /// \brief Complete the column index. /// diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index daaccbcb93f55..9f2771e75793c 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -484,14 +484,14 @@ struct PageLevelHistogram { std::unique_ptr ConstructFakeSizeStatistics( const ColumnDescriptor* descr, const PageLevelHistogram& page_level_histogram) { - auto builder = SizeStatisticsBuilder::Make(descr); + auto stats = MakeSizeStatistics(descr); for (int16_t level = 0; level <= descr->max_repetition_level(); ++level) { - builder->AddRepeatedRepetitionLevels(page_level_histogram.rep_levels[level], level); + stats->repetition_level_histogram[level] = page_level_histogram.rep_levels[level]; } for (int16_t level = 0; level <= descr->max_definition_level(); ++level) { - builder->AddRepeatedDefinitionLevels(page_level_histogram.def_levels[level], level); + stats->definition_level_histogram[level] = page_level_histogram.def_levels[level]; } - return builder->Build(); + return stats; } void VerifyPageLevelHistogram(int16_t max_level, size_t page_id, @@ -519,8 +519,8 @@ void TestWriteTypedColumnIndex(schema::NodePtr node, for (size_t i = 0; i < page_stats.size(); ++i) { auto size_stats = build_size_stats ? ConstructFakeSizeStatistics(descr.get(), page_levels[i]) - : nullptr; - builder->AddPage(page_stats[i], size_stats.get()); + : std::make_unique(); + builder->AddPage(page_stats[i], *size_stats); } ASSERT_NO_THROW(builder->Finish()); diff --git a/cpp/src/parquet/size_statistics.cc b/cpp/src/parquet/size_statistics.cc index 925b3517547c1..48bb35b28b1cf 100644 --- a/cpp/src/parquet/size_statistics.cc +++ b/cpp/src/parquet/size_statistics.cc @@ -31,17 +31,15 @@ namespace parquet { void SizeStatistics::Merge(const SizeStatistics& other) { - if (repetition_level_histogram.size() != other.repetition_level_histogram.size() || - definition_level_histogram.size() != other.definition_level_histogram.size() || - unencoded_byte_array_data_bytes.has_value() != - other.unencoded_byte_array_data_bytes.has_value()) { - throw ParquetException("Cannot merge incompatible SizeStatistics"); - } - + ARROW_CHECK_EQ(repetition_level_histogram.size(), + other.repetition_level_histogram.size()); + ARROW_CHECK_EQ(definition_level_histogram.size(), + other.definition_level_histogram.size()); + ARROW_CHECK_EQ(unencoded_byte_array_data_bytes.has_value(), + other.unencoded_byte_array_data_bytes.has_value()); std::transform(repetition_level_histogram.begin(), repetition_level_histogram.end(), other.repetition_level_histogram.begin(), repetition_level_histogram.begin(), std::plus<>()); - std::transform(definition_level_histogram.begin(), definition_level_histogram.end(), other.definition_level_histogram.begin(), definition_level_histogram.begin(), std::plus<>()); @@ -51,155 +49,27 @@ void SizeStatistics::Merge(const SizeStatistics& other) { } } -class SizeStatisticsBuilder::Impl { - public: - explicit Impl(const ColumnDescriptor* descr) - : rep_level_histogram_(descr->max_repetition_level() + 1, 0), - def_level_histogram_(descr->max_definition_level() + 1, 0) { - if (descr->physical_type() == Type::BYTE_ARRAY) { - unencoded_byte_array_data_bytes_ = 0; - } - } - - void AddRepetitionLevels(::arrow::util::span rep_levels) { - for (int16_t rep_level : rep_levels) { - ARROW_DCHECK_LT(rep_level, static_cast(rep_level_histogram_.size())); - ++rep_level_histogram_[rep_level]; - } - } - - void AddDefinitionLevels(::arrow::util::span def_levels) { - for (int16_t def_level : def_levels) { - ARROW_DCHECK_LT(def_level, static_cast(def_level_histogram_.size())); - ++def_level_histogram_[def_level]; - } - } - - void AddRepeatedRepetitionLevels(int64_t num_levels, int16_t rep_level) { - ARROW_DCHECK_LT(rep_level, static_cast(rep_level_histogram_.size())); - rep_level_histogram_[rep_level] += num_levels; - } - - void AddRepeatedDefinitionLevels(int64_t num_levels, int16_t def_level) { - ARROW_DCHECK_LT(def_level, static_cast(def_level_histogram_.size())); - def_level_histogram_[def_level] += num_levels; - } - - void WriteValuesSpaced(const ByteArray* values, const uint8_t* valid_bits, - int64_t valid_bits_offset, int64_t num_spaced_values) { - int64_t total_bytes = 0; - ::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, - num_spaced_values, - [&](int64_t pos, int64_t length) { - for (int64_t i = 0; i < length; i++) { - // Don't bother to check unlikely overflow. - total_bytes += values[i + pos].len; - } - }); - IncrementUnencodedByteArrayDataBytes(total_bytes); - } - - void WriteValues(const ByteArray* values, int64_t num_values) { - int64_t total_bytes = 0; - std::for_each(values, values + num_values, - [&](const ByteArray& value) { total_bytes += values->len; }); - IncrementUnencodedByteArrayDataBytes(total_bytes); - } - - void WriteValues(const ::arrow::Array& values) { - int64_t total_bytes = 0; - const auto valid_func = [&](ByteArray val) { total_bytes += val.len; }; - const auto null_func = [&]() {}; - - if (::arrow::is_binary_like(values.type_id())) { - ::arrow::VisitArraySpanInline<::arrow::BinaryType>( - *values.data(), std::move(valid_func), std::move(null_func)); - } else if (::arrow::is_large_binary_like(values.type_id())) { - ::arrow::VisitArraySpanInline<::arrow::LargeBinaryType>( - *values.data(), std::move(valid_func), std::move(null_func)); - } else { - // TODO: support StringViewType and BinaryViewType - throw ParquetException("Unsupported type: " + values.type()->ToString()); - } - - IncrementUnencodedByteArrayDataBytes(total_bytes); - } - - std::unique_ptr Build() { - return std::make_unique(SizeStatistics{ - rep_level_histogram_, def_level_histogram_, unencoded_byte_array_data_bytes_}); - } - - void Reset() { - rep_level_histogram_.assign(rep_level_histogram_.size(), 0); - def_level_histogram_.assign(def_level_histogram_.size(), 0); - if (unencoded_byte_array_data_bytes_.has_value()) { - unencoded_byte_array_data_bytes_ = 0; - } - } - - private: - void IncrementUnencodedByteArrayDataBytes(int64_t total_bytes) { - ARROW_DCHECK(unencoded_byte_array_data_bytes_.has_value()); - if (::arrow::internal::AddWithOverflow( - total_bytes, unencoded_byte_array_data_bytes_.value(), &total_bytes)) { - throw ParquetException("unencoded byte array data bytes overflows to INT64_MAX"); - } - unencoded_byte_array_data_bytes_ = total_bytes; - } - - std::vector rep_level_histogram_; - std::vector def_level_histogram_; - std::optional unencoded_byte_array_data_bytes_; -}; - -void SizeStatisticsBuilder::AddRepetitionLevels( - ::arrow::util::span rep_levels) { - impl_->AddRepetitionLevels(rep_levels); -} - -void SizeStatisticsBuilder::AddDefinitionLevels( - ::arrow::util::span def_levels) { - impl_->AddDefinitionLevels(def_levels); -} - -void SizeStatisticsBuilder::AddRepeatedRepetitionLevels(int64_t num_levels, - int16_t rep_level) { - impl_->AddRepeatedRepetitionLevels(num_levels, rep_level); -} - -void SizeStatisticsBuilder::AddRepeatedDefinitionLevels(int64_t num_levels, - int16_t def_level) { - impl_->AddRepeatedDefinitionLevels(num_levels, def_level); +void SizeStatistics::IncrementUnencodedByteArrayDataBytes(int64_t value) { + ARROW_CHECK(unencoded_byte_array_data_bytes.has_value()); + unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() + value; } -void SizeStatisticsBuilder::AddValuesSpaced(const ByteArray* values, - const uint8_t* valid_bits, - int64_t valid_bits_offset, - int64_t num_spaced_values) { - impl_->WriteValuesSpaced(values, valid_bits, valid_bits_offset, num_spaced_values); -} - -void SizeStatisticsBuilder::AddValues(const ByteArray* values, int64_t num_values) { - impl_->WriteValues(values, num_values); -} - -void SizeStatisticsBuilder::AddValues(const ::arrow::Array& values) { - impl_->WriteValues(values); +void SizeStatistics::Reset() { + repetition_level_histogram.assign(repetition_level_histogram.size(), 0); + definition_level_histogram.assign(definition_level_histogram.size(), 0); + if (unencoded_byte_array_data_bytes.has_value()) { + unencoded_byte_array_data_bytes = 0; + } } -std::unique_ptr SizeStatisticsBuilder::Build() { return impl_->Build(); } - -void SizeStatisticsBuilder::Reset() { return impl_->Reset(); } - -SizeStatisticsBuilder::SizeStatisticsBuilder(const ColumnDescriptor* descr) - : impl_(std::make_unique(descr)) {} - -SizeStatisticsBuilder::~SizeStatisticsBuilder() = default; - -std::unique_ptr SizeStatisticsBuilder::Make( - const ColumnDescriptor* descr) { - return std::unique_ptr(new SizeStatisticsBuilder(descr)); +std::unique_ptr MakeSizeStatistics(const ColumnDescriptor* descr) { + auto size_stats = std::make_unique(); + size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 1, 0); + size_stats->definition_level_histogram.resize(descr->max_definition_level() + 1, 0); + if (descr->physical_type() == Type::BYTE_ARRAY) { + size_stats->unencoded_byte_array_data_bytes = 0; + } + return size_stats; } } // namespace parquet diff --git a/cpp/src/parquet/size_statistics.h b/cpp/src/parquet/size_statistics.h index 174a4aec2d698..42b403e82cf69 100644 --- a/cpp/src/parquet/size_statistics.h +++ b/cpp/src/parquet/size_statistics.h @@ -64,63 +64,22 @@ struct PARQUET_EXPORT SizeStatistics { /// physical type. std::optional unencoded_byte_array_data_bytes; - /// \brief Merge two SizeStatistics. - void Merge(const SizeStatistics& other); -}; - -/// \brief Builder to create a SizeStatistics. -class PARQUET_EXPORT SizeStatisticsBuilder { - public: - /// \brief API convenience to get a SizeStatisticsBuilder. - static std::unique_ptr Make(const ColumnDescriptor* descr); - - ~SizeStatisticsBuilder(); - - /// \brief Add repetition levels to the histogram. - /// \param rep_levels repetition levels to add. - void AddRepetitionLevels(::arrow::util::span rep_levels); - - /// \brief Add definition levels to the histogram. - /// \param def_levels definition levels to add. - void AddDefinitionLevels(::arrow::util::span def_levels); - - /// \brief Add repeated repetition level to the histogram. - /// \param num_levels number of repetition levels to add. - /// \param rep_level repeated repetition level value. - void AddRepeatedRepetitionLevels(int64_t num_levels, int16_t rep_level); + /// \brief Check if the SizeStatistics is set. + bool is_set() const { + return !repetition_level_histogram.empty() || !definition_level_histogram.empty() || + unencoded_byte_array_data_bytes.has_value(); + } - /// \brief Add repeated definition level to the histogram. - /// \param num_levels number of definition levels to add. - /// \param def_level repeated definition level value. - void AddRepeatedDefinitionLevels(int64_t num_levels, int16_t def_level); + /// \brief Increment the unencoded byte array data bytes. + void IncrementUnencodedByteArrayDataBytes(int64_t value); - /// \brief Add spaced BYTE_ARRAY values. - /// \param[in] values pointer to values of BYTE_ARRAY type. - /// \param[in] valid_bits pointer to bitmap representing if values are non-null. - /// \param[in] valid_bits_offset offset into valid_bits where the slice of data begins. - /// \param[in] num_spaced_values length of values in values/valid_bits to inspect. - void AddValuesSpaced(const ByteArray* values, const uint8_t* valid_bits, - int64_t valid_bits_offset, int64_t num_spaced_values); - - /// \brief Add dense BYTE_ARRAY values. - /// \param values pointer to values of BYTE_ARRAY type. - /// \param num_values length of values. - void AddValues(const ByteArray* values, int64_t num_values); - - /// \brief Add BYTE_ARRAY values in the arrow array. - void AddValues(const ::arrow::Array& values); - - /// \brief Build a SizeStatistics from collected data. - std::unique_ptr Build(); + /// \brief Merge two SizeStatistics. + void Merge(const SizeStatistics& other); - /// \brief Reset all collected data for reuse. void Reset(); - - private: - // PIMPL Idiom - explicit SizeStatisticsBuilder(const ColumnDescriptor* descr); - class Impl; - std::unique_ptr impl_; }; +PARQUET_EXPORT std::unique_ptr MakeSizeStatistics( + const ColumnDescriptor* descr); + } // namespace parquet diff --git a/cpp/src/parquet/size_statistics_test.cc b/cpp/src/parquet/size_statistics_test.cc index 0e762d8631ec6..fd5236eaccb6c 100644 --- a/cpp/src/parquet/size_statistics_test.cc +++ b/cpp/src/parquet/size_statistics_test.cc @@ -42,224 +42,41 @@ namespace parquet { -TEST(SizeStatistics, WriteBatchLevels) { - std::vector expected_def_level_histogram = {256, 128, 64, 32, 16, 8, 4, 2, 2}; - std::vector expected_rep_level_histogram = {256, 128, 64, 32, 32}; - constexpr int16_t kMaxDefLevel = 8; - constexpr int16_t kMaxRefLevel = 4; - auto descr = - std::make_unique(schema::Int32("a"), kMaxDefLevel, kMaxRefLevel); - auto builder = SizeStatisticsBuilder::Make(descr.get()); - - auto write_batch_levels = - [&](const std::vector& histogram, - const std::function)>& - write_levels_func) { - std::vector levels; - for (int16_t level = 0; level < static_cast(histogram.size()); level++) { - levels.insert(levels.end(), histogram[level], level); - } - - auto rng = std::default_random_engine{}; - std::shuffle(std::begin(levels), std::end(levels), rng); - - constexpr size_t kBatchSize = 64; - for (size_t i = 0; i < levels.size(); i += kBatchSize) { - auto batch_size = static_cast(std::min(kBatchSize, levels.size() - i)); - write_levels_func(builder.get(), - {levels.data() + i, static_cast(batch_size)}); - } - }; - - write_batch_levels(expected_def_level_histogram, - &SizeStatisticsBuilder::AddDefinitionLevels); - write_batch_levels(expected_rep_level_histogram, - &SizeStatisticsBuilder::AddRepetitionLevels); - auto size_statistics = builder->Build(); - EXPECT_EQ(size_statistics->definition_level_histogram, expected_def_level_histogram); - EXPECT_EQ(size_statistics->repetition_level_histogram, expected_rep_level_histogram); -} - -TEST(SizeStatistics, WriteRepeatedLevels) { - constexpr int16_t kMaxDefLevel = 2; - constexpr int16_t kMaxRepLevel = 3; - auto descr = - std::make_unique(schema::Int32("a"), kMaxDefLevel, kMaxRepLevel); - auto builder = SizeStatisticsBuilder::Make(descr.get()); - - constexpr int64_t kNumRounds = 10; - for (int64_t round = 1; round <= kNumRounds; round++) { - for (int16_t def_level = 0; def_level <= kMaxDefLevel; def_level++) { - builder->AddRepeatedDefinitionLevels(/*num_levels=*/round + def_level, def_level); - } - for (int16_t rep_level = 0; rep_level <= kMaxRepLevel; rep_level++) { - builder->AddRepeatedRepetitionLevels(/*num_levels=*/round + rep_level * rep_level, - rep_level); - } - } - - auto size_statistics = builder->Build(); - EXPECT_EQ(size_statistics->definition_level_histogram, - std::vector({55, 65, 75})); - EXPECT_EQ(size_statistics->repetition_level_histogram, - std::vector({55, 65, 95, 145})); -} - -TEST(SizeStatistics, WriteDenseByteArrayValues) { - constexpr std::string_view kValue = "foo"; - constexpr int kNumValues = 1000; - constexpr int kBatchSize = 64; - const std::vector values(kNumValues, kValue); - - auto descr = std::make_unique( - schema::ByteArray("a"), /*max_def_level=*/0, /*max_rep_level=*/0); - auto builder = SizeStatisticsBuilder::Make(descr.get()); - for (int i = 0; i < kNumValues; i += kBatchSize) { - auto batch_size = std::min(kBatchSize, kNumValues - i); - builder->AddValues(values.data() + i, batch_size); - } - - auto size_statistics = builder->Build(); - EXPECT_EQ(size_statistics->unencoded_byte_array_data_bytes.value_or(-1), - kNumValues * kValue.size()); -} - -TEST(SizeStatistics, WriteSpacedByteArrayValues) { - constexpr std::string_view kValue = "foo"; - constexpr int kNumValues = 1000; - constexpr int kBatchSize = 63; - const std::vector values(kNumValues, kValue); - ASSERT_OK_AND_ASSIGN(auto not_null_bitmap, ::arrow::AllocateBitmap(kNumValues)); - int not_null_count = 0; - for (int i = 0; i < kNumValues; i++) { - if (i % 3 == 0) { - ::arrow::bit_util::ClearBit(not_null_bitmap->mutable_data(), i); - } else { - ::arrow::bit_util::SetBit(not_null_bitmap->mutable_data(), i); - not_null_count++; - } - } - - auto descr = std::make_unique( - schema::ByteArray("a"), /*max_def_level=*/1, /*max_rep_level=*/0); - auto builder = SizeStatisticsBuilder::Make(descr.get()); - for (int i = 0; i < kNumValues; i += kBatchSize) { - auto batch_size = std::min(kBatchSize, kNumValues - i); - builder->AddValuesSpaced(values.data() + i, not_null_bitmap->data(), i, batch_size); - } - - auto size_statistics = builder->Build(); - EXPECT_EQ(size_statistics->unencoded_byte_array_data_bytes.value_or(-1), - not_null_count * kValue.size()); -} - -TEST(SizeStatistics, WriteBinaryArray) { - std::vector> arrays = { - ::arrow::ArrayFromJSON(::arrow::binary(), R"(["foo", null, "bar", "baz"])"), - ::arrow::ArrayFromJSON(::arrow::large_binary(), R"(["foo", null, "bar", "baz"])"), - }; - for (const auto& array : arrays) { - auto descr = std::make_unique( - schema::ByteArray("a"), /*max_def_level=*/1, /*max_rep_level=*/0); - auto builder = SizeStatisticsBuilder::Make(descr.get()); - builder->AddValues(*array); - auto size_statistics = builder->Build(); - EXPECT_EQ(size_statistics->unencoded_byte_array_data_bytes.value_or(-1), 9); - } -} - -TEST(SizeStatistics, MergeStatistics) { - constexpr int kNumValues = 16; - const std::array def_levels = {0, 0, 0, 0, 1, 1, 1, 1, - 2, 2, 2, 2, 3, 3, 3, 3}; - const std::array rep_levels = {0, 1, 2, 3, 0, 1, 2, 3, - 0, 1, 2, 3, 0, 1, 2, 3}; - const std::vector expected_histogram = {8, 8, 8, 8}; - constexpr std::string_view kByteArrayValue = "foo"; - const std::vector values(kNumValues, - parquet::ByteArray{kByteArrayValue}); - - for (const auto& descr : - {std::make_unique(schema::Int32("a"), /*max_def_level=*/3, - /*max_rep_level=*/3), - std::make_unique(schema::ByteArray("a"), /*max_def_level=*/3, - /*max_rep_level=*/3)}) { - auto builder = SizeStatisticsBuilder::Make(descr.get()); - builder->AddRepetitionLevels(rep_levels); - builder->AddDefinitionLevels(def_levels); - if (descr->physical_type() == Type::BYTE_ARRAY) { - builder->AddValues(values.data(), kNumValues); - } - auto size_statistics_1 = builder->Build(); - - builder->Reset(); - builder->AddRepetitionLevels(rep_levels); - builder->AddDefinitionLevels(def_levels); - if (descr->physical_type() == Type::BYTE_ARRAY) { - builder->AddValues(values.data(), kNumValues); - } - auto size_statistics_2 = builder->Build(); - - size_statistics_1->Merge(*size_statistics_2); - EXPECT_EQ(size_statistics_1->definition_level_histogram, expected_histogram); - EXPECT_EQ(size_statistics_1->repetition_level_histogram, expected_histogram); - if (descr->physical_type() == Type::BYTE_ARRAY) { - EXPECT_TRUE(size_statistics_1->unencoded_byte_array_data_bytes.has_value()); - EXPECT_EQ(size_statistics_1->unencoded_byte_array_data_bytes.value(), - kByteArrayValue.size() * kNumValues * 2); - } else { - EXPECT_FALSE(size_statistics_1->unencoded_byte_array_data_bytes.has_value()); - } - } -} - TEST(SizeStatistics, ThriftSerDe) { - constexpr int kNumValues = 16; - const std::array def_levels = {0, 0, 0, 0, 1, 1, 1, 1, - 2, 2, 2, 2, 3, 3, 3, 3}; - const std::array rep_levels = {0, 1, 2, 3, 0, 1, 2, 3, - 0, 1, 2, 3, 0, 1, 2, 3}; - const std::vector expected_histogram = {4, 4, 4, 4}; - constexpr std::string_view kByteArrayValue = "foo"; - const std::vector values(kNumValues, - parquet::ByteArray{kByteArrayValue}); + const std::vector kDefLevels = {128, 64, 32, 16}; + const std::vector kRepLevels = {100, 80, 60, 40, 20}; + constexpr int64_t kUnencodedByteArrayDataBytes = 1234; for (const auto& descr : {std::make_unique(schema::Int32("a"), /*max_def_level=*/3, - /*max_rep_level=*/3), + /*max_rep_level=*/4), std::make_unique(schema::ByteArray("a"), /*max_def_level=*/3, - /*max_rep_level=*/3)}) { - auto builder = SizeStatisticsBuilder::Make(descr.get()); - builder->AddRepetitionLevels(rep_levels); - builder->AddDefinitionLevels(def_levels); + /*max_rep_level=*/4)}) { + auto size_statistics = MakeSizeStatistics(descr.get()); + size_statistics->repetition_level_histogram = kRepLevels; + size_statistics->definition_level_histogram = kDefLevels; if (descr->physical_type() == Type::BYTE_ARRAY) { - builder->AddValues(values.data(), kNumValues); + size_statistics->IncrementUnencodedByteArrayDataBytes(kUnencodedByteArrayDataBytes); } - auto size_statistics = builder->Build(); auto thrift_statistics = ToThrift(*size_statistics); auto restored_statistics = FromThrift(thrift_statistics); - EXPECT_EQ(restored_statistics.definition_level_histogram, expected_histogram); - EXPECT_EQ(restored_statistics.repetition_level_histogram, expected_histogram); + EXPECT_EQ(restored_statistics.definition_level_histogram, kDefLevels); + EXPECT_EQ(restored_statistics.repetition_level_histogram, kRepLevels); if (descr->physical_type() == Type::BYTE_ARRAY) { EXPECT_TRUE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); EXPECT_EQ(restored_statistics.unencoded_byte_array_data_bytes.value(), - kByteArrayValue.size() * kNumValues); + kUnencodedByteArrayDataBytes); } else { EXPECT_FALSE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); } } } -struct RowGroupSizeStatistics { - std::vector ref_levels; - std::vector def_levels; - std::optional byte_array_bytes; - bool operator==(const RowGroupSizeStatistics& other) const { - return ref_levels == other.ref_levels && def_levels == other.def_levels && - byte_array_bytes == other.byte_array_bytes; - } -}; +bool operator==(const SizeStatistics& lhs, const SizeStatistics& rhs) { + return lhs.repetition_level_histogram == rhs.repetition_level_histogram && + lhs.definition_level_histogram == rhs.definition_level_histogram && + lhs.unencoded_byte_array_data_bytes == rhs.unencoded_byte_array_data_bytes; +} struct PageSizeStatistics { std::vector ref_levels; @@ -316,7 +133,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test { for (int j = 0; j < metadata->num_columns(); ++j) { auto column_metadata = row_group_metadata->ColumnChunk(j); auto size_stats = column_metadata->size_statistics(); - RowGroupSizeStatistics row_group_stats; + SizeStatistics row_group_stats; if (size_stats != nullptr) { row_group_stats = {size_stats->repetition_level_histogram, size_stats->definition_level_histogram, @@ -359,9 +176,9 @@ class SizeStatisticsRoundTripTest : public ::testing::Test { protected: std::shared_ptr buffer_; - std::vector row_group_stats_; + std::vector row_group_stats_; std::vector page_stats_; - inline static const RowGroupSizeStatistics kEmptyRowGroupStats{}; + inline static const SizeStatistics kEmptyRowGroupStats{}; inline static const PageSizeStatistics kEmptyPageStats{}; }; @@ -398,18 +215,17 @@ TEST_F(SizeStatisticsRoundTripTest, EnableColumnChunkSizeStats) { ])"})); ReadSizeStatistics(); - EXPECT_THAT( - row_group_stats_, - ::testing::ElementsAre(RowGroupSizeStatistics{/*ref_levels=*/{2, 2, 5}, + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*ref_levels=*/{2, 2, 5}, /*def_levels=*/{0, 0, 0, 0, 1, 8}, /*byte_array_bytes=*/std::nullopt}, - RowGroupSizeStatistics{/*ref_levels=*/{2, 2, 5}, + SizeStatistics{/*ref_levels=*/{2, 2, 5}, /*def_levels=*/{0, 0, 0, 0, 1, 8}, /*byte_array_bytes=*/12}, - RowGroupSizeStatistics{/*ref_levels=*/{2, 2, 0}, + SizeStatistics{/*ref_levels=*/{2, 2, 0}, /*def_levels=*/{0, 1, 1, 1, 1, 0}, /*byte_array_bytes=*/std::nullopt}, - RowGroupSizeStatistics{/*ref_levels=*/{2, 2, 0}, + SizeStatistics{/*ref_levels=*/{2, 2, 0}, /*def_levels=*/{0, 1, 1, 1, 1, 0}, /*byte_array_bytes=*/0})); EXPECT_THAT(page_stats_, ::testing::ElementsAre(kEmptyPageStats, kEmptyPageStats, @@ -429,18 +245,17 @@ TEST_F(SizeStatisticsRoundTripTest, EnablePageSizeStats) { ])"})); ReadSizeStatistics(); - EXPECT_THAT( - row_group_stats_, - ::testing::ElementsAre(RowGroupSizeStatistics{/*ref_levels=*/{2, 2, 5}, + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*ref_levels=*/{2, 2, 5}, /*def_levels=*/{0, 0, 0, 0, 1, 8}, /*byte_array_bytes=*/std::nullopt}, - RowGroupSizeStatistics{/*ref_levels=*/{2, 2, 5}, + SizeStatistics{/*ref_levels=*/{2, 2, 5}, /*def_levels=*/{0, 0, 0, 0, 1, 8}, /*byte_array_bytes=*/12}, - RowGroupSizeStatistics{/*ref_levels=*/{2, 2, 0}, + SizeStatistics{/*ref_levels=*/{2, 2, 0}, /*def_levels=*/{0, 1, 1, 1, 1, 0}, /*byte_array_bytes=*/std::nullopt}, - RowGroupSizeStatistics{/*ref_levels=*/{2, 2, 0}, + SizeStatistics{/*ref_levels=*/{2, 2, 0}, /*def_levels=*/{0, 1, 1, 1, 1, 0}, /*byte_array_bytes=*/0})); EXPECT_THAT(page_stats_, @@ -460,34 +275,23 @@ TEST_F(SizeStatisticsRoundTripTest, EnablePageSizeStats) { } TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) { - std::shared_ptr<::arrow::Array> dict; - std::vector dict_values = {"a", "aa", "aaa"}; - ::arrow::ArrayFromVector<::arrow::StringType, std::string>(dict_values, &dict); - std::shared_ptr<::arrow::DataType> dict_type = - ::arrow::dictionary(::arrow::int16(), ::arrow::utf8()); - - std::shared_ptr<::arrow::Array> indices; - std::vector is_valid = {true, true, false, true, true, true}; - std::vector indices_values = {1, 2, -1, 0, 2, 0}; - ::arrow::ArrayFromVector<::arrow::Int16Type, int16_t>(is_valid, indices_values, - &indices); - auto array = std::make_shared<::arrow::DictionaryArray>(dict_type, indices, dict); - - auto schema = ::arrow::schema({::arrow::field("a", dict_type)}); - auto table = ::arrow::Table::Make(schema, {array}, indices->length()); - WriteFile(SizeStatisticsLevel::Page, table); + auto schema = ::arrow::schema( + {::arrow::field("a", ::arrow::dictionary(::arrow::int16(), ::arrow::utf8()))}); + WriteFile( + SizeStatisticsLevel::Page, + ::arrow::TableFromJSON(schema, {R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"})); ReadSizeStatistics(); EXPECT_THAT(row_group_stats_, - ::testing::ElementsAre(RowGroupSizeStatistics{/*ref_levels=*/{2}, - /*def_levels=*/{0, 2}, - /*byte_array_bytes=*/5}, - RowGroupSizeStatistics{/*ref_levels=*/{2}, - /*def_levels=*/{1, 1}, - /*byte_array_bytes=*/1}, - RowGroupSizeStatistics{/*ref_levels=*/{2}, - /*def_levels=*/{0, 2}, - /*byte_array_bytes=*/4})); + ::testing::ElementsAre(SizeStatistics{/*ref_levels=*/{2}, + /*def_levels=*/{0, 2}, + /*byte_array_bytes=*/5}, + SizeStatistics{/*ref_levels=*/{2}, + /*def_levels=*/{1, 1}, + /*byte_array_bytes=*/1}, + SizeStatistics{/*ref_levels=*/{2}, + /*def_levels=*/{0, 2}, + /*byte_array_bytes=*/4})); EXPECT_THAT(page_stats_, ::testing::ElementsAre(PageSizeStatistics{/*ref_levels=*/{2}, /*def_levels=*/{0, 2}, diff --git a/cpp/src/parquet/type_fwd.h b/cpp/src/parquet/type_fwd.h index b12a86997c1db..cda0dc5a77e1f 100644 --- a/cpp/src/parquet/type_fwd.h +++ b/cpp/src/parquet/type_fwd.h @@ -68,7 +68,6 @@ struct ParquetVersion { }; }; -struct ByteArray; struct PageIndexLocation; class FileMetaData;