Skip to content

Commit

Permalink
extend encoder to collect bytes and simplify interface
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Nov 17, 2024
1 parent d0ea0bd commit f4d8380
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 541 deletions.
8 changes: 8 additions & 0 deletions cpp/src/arrow/util/hashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable {
}
}

// Visit the stored value at a specific index in insertion order.
// The visitor function should have the signature `void(std::string_view)`
// or `void(const std::string_view&)`.
template <typename VisitFunc>
void VisitValue(int32_t idx, VisitFunc&& visit) const {
visit(binary_builder_.GetView(idx));
}

protected:
struct Payload {
int32_t memo_index;
Expand Down
13 changes: 5 additions & 8 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,15 @@ class DataPage : public Page {
/// Currently it is only present from data pages created by ColumnWriter in order
/// to collect page index.
std::optional<int64_t> first_row_index() const { return first_row_index_; }
const std::shared_ptr<SizeStatistics>& size_statistics() const {
return size_statistics_;
}
const SizeStatistics& size_statistics() const { return size_statistics_; }

virtual ~DataPage() = default;

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, int64_t uncompressed_size,
EncodedStatistics statistics, std::optional<int64_t> first_row_index,
std::shared_ptr<SizeStatistics> size_statistics)
SizeStatistics size_statistics)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
Expand All @@ -95,8 +93,7 @@ class DataPage : public Page {
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_;
/// Size statistics for the data page. It may be null if unavailable.
std::shared_ptr<SizeStatistics> size_statistics_;
SizeStatistics size_statistics_;
};

class DataPageV1 : public DataPage {
Expand All @@ -106,7 +103,7 @@ class DataPageV1 : public DataPage {
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt,
std::shared_ptr<SizeStatistics> size_statistics = NULLPTR)
SizeStatistics size_statistics = SizeStatistics())
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size,
std::move(statistics), std::move(first_row_index),
std::move(size_statistics)),
Expand All @@ -130,7 +127,7 @@ class DataPageV2 : public DataPage {
int64_t uncompressed_size, bool is_compressed = false,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt,
std::shared_ptr<SizeStatistics> size_statistics = NULLPTR)
SizeStatistics size_statistics = SizeStatistics())
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size,
std::move(statistics), std::move(first_row_index),
std::move(size_statistics)),
Expand Down
132 changes: 62 additions & 70 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,10 @@ class SerializedPageWriter : public PageWriter {
const int64_t header_size =
thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_.get());
PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
const auto& page_size_stats = page.size_statistics();

/// Collect page index
if (column_index_builder_ != nullptr) {
column_index_builder_->AddPage(page.statistics(), page_size_stats.get());
column_index_builder_->AddPage(page.statistics(), page.size_statistics());
}
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
Expand All @@ -455,8 +454,7 @@ class SerializedPageWriter : public PageWriter {
/// has flushed all data pages.
offset_index_builder_->AddPage(
start_pos, static_cast<int32_t>(compressed_size), *page.first_row_index(),
page_size_stats ? page_size_stats->unencoded_byte_array_data_bytes
: std::nullopt);
page.size_statistics().unencoded_byte_array_data_bytes);
}

total_uncompressed_size_ += uncompressed_size + header_size;
Expand Down Expand Up @@ -778,10 +776,10 @@ class ColumnWriterImpl {
// Serializes Dictionary Page if enabled
virtual void WriteDictionaryPage() = 0;

// A convenience struct to combine the encoded statistics and the size statistics
// A convenience struct to combine the encoded statistics and size statistics
struct StatisticsPair {
EncodedStatistics encoded_stats; // required
std::shared_ptr<SizeStatistics> size_statistics; // may be null if disabled
EncodedStatistics encoded_stats;
SizeStatistics size_stats;
};

// Plain-encoded statistics of the current page
Expand Down Expand Up @@ -1095,7 +1093,7 @@ int64_t ColumnWriterImpl::Close() {

FlushBufferedDataPages();

auto [chunk_statistics, chunk_size_stats] = GetChunkStatistics();
auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics();
chunk_statistics.ApplyStatSizeLimits(
properties_->max_statistics_size(descr_->path()));
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
Expand All @@ -1104,8 +1102,8 @@ int64_t ColumnWriterImpl::Close() {
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
if (rows_written_ > 0 && chunk_size_stats) {
metadata_->SetSizeStatistics(*chunk_size_stats);
if (rows_written_ > 0 && chunk_size_statistics.is_set()) {
metadata_->SetSizeStatistics(chunk_size_statistics);
}
metadata_->SetKeyValueMetadata(key_value_metadata_);
pager_->Close(has_dictionary_, fallback_);
Expand Down Expand Up @@ -1232,15 +1230,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
}
pages_change_on_record_boundaries_ =
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());

if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk ||
properties->size_statistics_level() == SizeStatisticsLevel::Page) {
page_size_stats_builder_ = SizeStatisticsBuilder::Make(descr_);
chunk_size_stats_ = page_size_stats_builder_->Build();
page_size_statistics_ = MakeSizeStatistics(descr_);
chunk_size_statistics_ = MakeSizeStatistics(descr_);
}
pages_change_on_record_boundaries_ =
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());
}

int64_t Close() override { return ColumnWriterImpl::Close(); }
Expand Down Expand Up @@ -1378,7 +1375,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
result.encoded_stats = page_statistics_->Encode();
}
if (properties_->size_statistics_level() == SizeStatisticsLevel::Page) {
result.size_statistics = page_size_stats_builder_->Build();
ARROW_DCHECK(page_size_statistics_ != nullptr);
result.size_stats = *page_size_statistics_;
}
return result;
}
Expand All @@ -1388,8 +1386,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
if (chunk_statistics_) {
result.encoded_stats = chunk_statistics_->Encode();
}
if (chunk_size_stats_) {
result.size_statistics = chunk_size_stats_;
if (chunk_size_statistics_) {
result.size_stats = *chunk_size_statistics_;
}
return result;
}
Expand All @@ -1399,10 +1397,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
chunk_statistics_->Merge(*page_statistics_);
page_statistics_->Reset();
}
if (page_size_stats_builder_ != nullptr) {
auto page_size_stats = page_size_stats_builder_->Build();
chunk_size_stats_->Merge(*page_size_stats);
page_size_stats_builder_->Reset();
if (page_size_statistics_ != nullptr) {
chunk_size_statistics_->Merge(*page_size_statistics_);
page_size_statistics_->Reset();
}
}

Expand Down Expand Up @@ -1457,6 +1454,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
DictEncoder<DType>* current_dict_encoder_;
std::shared_ptr<TypedStats> page_statistics_;
std::shared_ptr<TypedStats> chunk_statistics_;
std::unique_ptr<SizeStatistics> page_size_statistics_;
std::shared_ptr<SizeStatistics> chunk_size_statistics_;
bool pages_change_on_record_boundaries_;

// If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the
Expand All @@ -1465,10 +1464,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
// which case we call back to the dense write path)
std::shared_ptr<::arrow::Array> preserved_dictionary_;

// Utility to collect and store SizeStatistics of page and chunk.
std::unique_ptr<SizeStatisticsBuilder> page_size_stats_builder_;
std::shared_ptr<SizeStatistics> chunk_size_stats_;

int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels) {
int64_t values_to_write = 0;
Expand Down Expand Up @@ -1504,7 +1499,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
num_buffered_rows_ += num_values;
}

CollectLevelHistogram(num_values, def_levels, rep_levels);
UpdateLevelHistogram(num_values, def_levels, rep_levels);
return values_to_write;
}

Expand Down Expand Up @@ -1597,27 +1592,44 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
num_buffered_rows_ += num_levels;
}

CollectLevelHistogram(num_levels, def_levels, rep_levels);
UpdateLevelHistogram(num_levels, def_levels, rep_levels);
}

void CollectLevelHistogram(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
if (page_size_stats_builder_ == nullptr) {
void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) const {
if (page_size_statistics_ == nullptr) {
return;
}

auto add_levels = [](std::vector<int64_t>& level_histogram,
::arrow::util::span<const int16_t> levels) {
for (int16_t level : levels) {
ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
++level_histogram[level];
}
};

if (descr_->max_definition_level() > 0) {
page_size_stats_builder_->AddDefinitionLevels(
{def_levels, static_cast<size_t>(num_levels)});
add_levels(page_size_statistics_->definition_level_histogram,
{def_levels, static_cast<size_t>(num_levels)});
} else {
page_size_stats_builder_->AddRepeatedDefinitionLevels(num_levels, /*def_level=*/0);
page_size_statistics_->definition_level_histogram[0] += num_levels;
}

if (descr_->max_repetition_level() > 0) {
page_size_stats_builder_->AddRepetitionLevels(
{rep_levels, static_cast<size_t>(num_levels)});
add_levels(page_size_statistics_->repetition_level_histogram,
{rep_levels, static_cast<size_t>(num_levels)});
} else {
page_size_stats_builder_->AddRepeatedRepetitionLevels(num_levels, /*rep_level=*/0);
page_size_statistics_->repetition_level_histogram[0] += num_levels;
}
}

void UpdateUnencodedDataBytes() const {
if constexpr (std::is_same_v<T, ByteArray>) {
if (page_size_statistics_ != nullptr) {
page_size_statistics_->IncrementUnencodedByteArrayDataBytes(
current_encoder_->ReportUnencodedDataBytes());
}
}
}

Expand Down Expand Up @@ -1672,11 +1684,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
if (page_statistics_ != nullptr) {
page_statistics_->Update(values, num_values, num_nulls);
}
if constexpr (std::is_same_v<T, ByteArray>) {
if (page_size_stats_builder_ != nullptr) {
page_size_stats_builder_->AddValues(values, num_values);
}
}
UpdateUnencodedDataBytes();
}

/// \brief Write values with spaces and update page statistics accordingly.
Expand Down Expand Up @@ -1705,12 +1713,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values, num_values, num_nulls);
}
if constexpr (std::is_same_v<T, ByteArray>) {
if (page_size_stats_builder_ != nullptr) {
page_size_stats_builder_->AddValuesSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values);
}
}
UpdateUnencodedDataBytes();
}
};

Expand Down Expand Up @@ -1769,14 +1772,8 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
exec_ctx.set_use_threads(false);

std::shared_ptr<::arrow::Array> referenced_dictionary;
::arrow::Datum referenced_indices;
if (page_size_stats_builder_) {
// SizeStatistics need to compute total bytes, so we cannot extract unique values.
referenced_indices = *chunk_indices;
} else {
PARQUET_ASSIGN_OR_THROW(referenced_indices,
::arrow::compute::Unique(*chunk_indices, &exec_ctx));
}
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*chunk_indices, &exec_ctx));

// On first run, we might be able to re-use the existing dictionary
if (referenced_indices.length() == dictionary->length()) {
Expand All @@ -1790,15 +1787,10 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
referenced_dictionary = referenced_dictionary_datum.make_array();
}

if (page_statistics_) {
int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count();
page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
}
if (page_size_stats_builder_) {
page_size_stats_builder_->AddValues(*referenced_dictionary);
}
int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count();
page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
};

int64_t value_offset = 0;
Expand All @@ -1815,13 +1807,15 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
AddIfNotNull(rep_levels, offset));
std::shared_ptr<Array> writeable_indices =
indices->Slice(value_offset, batch_num_spaced_values);
if (page_statistics_ || page_size_stats_builder_) {
if (page_statistics_) {
update_stats(/*num_chunk_levels=*/batch_size, writeable_indices);
}
PARQUET_ASSIGN_OR_THROW(
writeable_indices,
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
dict_encoder->PutIndices(*writeable_indices);
// Update unencoded byte array data size to size statistics
UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, check_page);
value_offset += batch_num_spaced_values;
};
Expand Down Expand Up @@ -2302,9 +2296,7 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
page_statistics_->IncrementNullCount(batch_size - non_null);
page_statistics_->IncrementNumValues(non_null);
}
if (page_size_stats_builder_ != nullptr) {
page_size_stats_builder_->AddValues(*data_slice);
}
UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null,
check_page);
CheckDictionarySizeLimit();
Expand Down
Loading

0 comments on commit f4d8380

Please sign in to comment.