Skip to content

Commit

Permalink
dont create temporary sparsed array on merging (ydb-platform#8174)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 24, 2024
1 parent 92ff6c1 commit b5dba78
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 133 deletions.
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/accessor/abstract/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class IChunkedArray {
}

ui32 GetLocalIndex(const ui32 position) const {
AFL_VERIFY(Contains(position));
AFL_VERIFY(Contains(position))("pos", position)("start", GlobalStartPosition);
return position - GlobalStartPosition;
}

Expand Down
53 changes: 35 additions & 18 deletions ydb/core/formats/arrow/accessor/sparsed/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,14 @@ TSparsedArray::TSparsedArray(const IChunkedArray& defaultArray, const std::share
pos = current->GetAddress().GetGlobalFinishPosition();
AFL_VERIFY(pos <= GetRecordsCount());
}
std::vector<std::shared_ptr<arrow::Field>> fields = { std::make_shared<arrow::Field>("index", arrow::uint32()),
std::make_shared<arrow::Field>("value", GetDataType()) };
auto schema = std::make_shared<arrow::Schema>(fields);
std::vector<std::shared_ptr<arrow::Array>> columns = { NArrow::TStatusValidator::GetValid(builderIndex->Finish()),
NArrow::TStatusValidator::GetValid(builderValue->Finish()) };
records = arrow::RecordBatch::Make(schema, sparsedRecordsCount, columns);
records = arrow::RecordBatch::Make(BuildSchema(GetDataType()), sparsedRecordsCount, columns);
AFL_VERIFY_DEBUG(records->ValidateFull().ok());
return true;
}));
AFL_VERIFY(records);
Records.emplace_back(TSparsedArrayChunk(0, GetRecordsCount(), records, DefaultValue));
Records.emplace_back(0, GetRecordsCount(), records, DefaultValue);
}

std::vector<NKikimr::NArrow::NAccessor::TChunkedArraySerialized> TSparsedArray::DoSplitBySizes(
Expand Down Expand Up @@ -136,27 +133,44 @@ ui32 TSparsedArray::GetLastIndex(const std::shared_ptr<arrow::RecordBatch>& batc
return ui32Column->Value(ui32Column->length() - 1);
}

namespace {
static thread_local THashMap<TString, std::shared_ptr<arrow::RecordBatch>> SimpleBatchesCache;
}

NKikimr::NArrow::NAccessor::TSparsedArrayChunk TSparsedArray::MakeDefaultChunk(
const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount) {
auto it = SimpleBatchesCache.find(type->ToString());
if (it == SimpleBatchesCache.end()) {
it = SimpleBatchesCache.emplace(type->ToString(), NArrow::MakeEmptyBatch(BuildSchema(type))).first;
AFL_VERIFY(it->second->ValidateFull().ok());
}
return TSparsedArrayChunk(0, recordsCount, it->second, defaultValue);
}

IChunkedArray::TLocalDataAddress TSparsedArrayChunk::GetChunk(
const std::optional<IChunkedArray::TCommonChunkAddress>& /*chunkCurrent*/, const ui64 position, const ui32 chunkIdx) const {
auto it = RemapExternalToInternal.upper_bound(position);
const auto predCompare = [](const ui32 position, const TInternalChunkInfo& item) {
return position < item.GetStartExt();
};
auto it = std::upper_bound(RemapExternalToInternal.begin(), RemapExternalToInternal.end(), position, predCompare);
AFL_VERIFY(it != RemapExternalToInternal.begin());
--it;
if (it->second.GetIsDefault()) {
if (it->GetIsDefault()) {
return IChunkedArray::TLocalDataAddress(
NArrow::TThreadSimpleArraysCache::Get(ColValue->type(), DefaultValue, it->second.GetSize()), StartPosition + it->first, chunkIdx);
NArrow::TThreadSimpleArraysCache::Get(ColValue->type(), DefaultValue, it->GetSize()), StartPosition + it->GetStartExt(), chunkIdx);
} else {
return IChunkedArray::TLocalDataAddress(
ColValue->Slice(it->second.GetStart(), it->second.GetSize()), StartPosition + it->first, chunkIdx);
ColValue->Slice(it->GetStartInt(), it->GetSize()), StartPosition + it->GetStartExt(), chunkIdx);
}
}

std::vector<std::shared_ptr<arrow::Array>> TSparsedArrayChunk::GetChunkedArray() const {
std::vector<std::shared_ptr<arrow::Array>> chunks;
for (auto&& i : RemapExternalToInternal) {
if (i.second.GetIsDefault()) {
chunks.emplace_back(NArrow::TThreadSimpleArraysCache::Get(ColValue->type(), DefaultValue, i.second.GetSize()));
if (i.GetIsDefault()) {
chunks.emplace_back(NArrow::TThreadSimpleArraysCache::Get(ColValue->type(), DefaultValue, i.GetSize()));
} else {
chunks.emplace_back(ColValue->Slice(i.second.GetStart(), i.second.GetSize()));
chunks.emplace_back(ColValue->Slice(i.GetStartInt(), i.GetSize()));
}
}
return chunks;
Expand Down Expand Up @@ -189,23 +203,26 @@ TSparsedArrayChunk::TSparsedArrayChunk(const ui32 posStart, const ui32 recordsCo
for (ui32 idx = 0; idx < UI32ColIndex->length(); ++idx) {
if (nextIndex != UI32ColIndex->Value(idx)) {
if (idx - startIndexInt) {
AFL_VERIFY(RemapExternalToInternal.emplace(startIndexExt, TInternalChunkInfo(startIndexInt, idx - startIndexInt, false)).second);
RemapExternalToInternal.emplace_back(startIndexExt, startIndexInt, idx - startIndexInt, false);
}
AFL_VERIFY(RemapExternalToInternal.emplace(nextIndex, TInternalChunkInfo(0, UI32ColIndex->Value(idx) - nextIndex, true)).second);
RemapExternalToInternal.emplace_back(nextIndex, 0, UI32ColIndex->Value(idx) - nextIndex, true);
startIndexExt = UI32ColIndex->Value(idx);
startIndexInt = idx;
}
nextIndex = UI32ColIndex->Value(idx) + 1;
}
if (UI32ColIndex->length() > startIndexInt) {
AFL_VERIFY(RemapExternalToInternal.emplace(startIndexExt, TInternalChunkInfo(startIndexInt, UI32ColIndex->length() - startIndexInt, false)).second);
RemapExternalToInternal.emplace_back(startIndexExt, startIndexInt, UI32ColIndex->length() - startIndexInt, false);
}
if (nextIndex != RecordsCount) {
AFL_VERIFY(RemapExternalToInternal.emplace(nextIndex, TInternalChunkInfo(0, RecordsCount - nextIndex, true)).second);
RemapExternalToInternal.emplace_back(nextIndex, 0, RecordsCount - nextIndex, true);
}
ui32 count = 0;
for (auto&& i : RemapExternalToInternal) {
count += i.second.GetSize();
count += i.GetSize();
}
for (ui32 i = 0; i + 1 < RemapExternalToInternal.size(); ++i) {
AFL_VERIFY(RemapExternalToInternal[i + 1].GetStartExt() == RemapExternalToInternal[i].GetStartExt() + RemapExternalToInternal[i].GetSize());
}
AFL_VERIFY(count == RecordsCount)("count", count)("records_count", RecordsCount);
AFL_VERIFY(ColValue);
Expand Down Expand Up @@ -256,7 +273,7 @@ void TSparsedArray::TBuilder::AddChunk(const ui32 recordsCount, const std::share
auto* arr = static_cast<const arrow::UInt32Array*>(data->column(0).get());
AFL_VERIFY(arr->Value(arr->length() - 1) < recordsCount)("val", arr->Value(arr->length() - 1))("count", recordsCount);
}
Chunks.emplace_back(TSparsedArrayChunk(RecordsCount, recordsCount, data, DefaultValue));
Chunks.emplace_back(RecordsCount, recordsCount, data, DefaultValue);
RecordsCount += recordsCount;
}

Expand Down
64 changes: 40 additions & 24 deletions ydb/core/formats/arrow/accessor/sparsed/accessor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/formats/arrow/accessor/abstract/accessor.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>

#include <ydb/library/accessor/accessor.h>

Expand All @@ -9,7 +10,7 @@

namespace NKikimr::NArrow::NAccessor {

class TSparsedArrayChunk {
class TSparsedArrayChunk: public TMoveOnly {
private:
YDB_READONLY(ui32, RecordsCount, 0);
YDB_READONLY(ui32, StartPosition, 0);
Expand All @@ -24,20 +25,26 @@ class TSparsedArrayChunk {

class TInternalChunkInfo {
private:
YDB_READONLY(ui32, Start, 0);
YDB_READONLY(ui32, StartExt, 0);
YDB_READONLY(ui32, StartInt, 0);
YDB_READONLY(ui32, Size, 0);
YDB_READONLY(bool, IsDefault, false);

public:
TInternalChunkInfo(const ui32 start, const ui32 size, const bool defaultFlag)
: Start(start)
TInternalChunkInfo(const ui32 startExt, const ui32 startInt, const ui32 size, const bool defaultFlag)
: StartExt(startExt)
, StartInt(startInt)
, Size(size)
, IsDefault(defaultFlag) {
AFL_VERIFY(Size);
}

bool operator<(const TInternalChunkInfo& item) const {
return StartExt < item.StartExt;
}
};

std::map<ui32, TInternalChunkInfo> RemapExternalToInternal;
std::vector<TInternalChunkInfo> RemapExternalToInternal;

public:
ui32 GetFinishPosition() const {
Expand Down Expand Up @@ -87,8 +94,7 @@ class TSparsedArray: public IChunkedArray {
virtual std::vector<TChunkedArraySerialized> DoSplitBySizes(
const TColumnSaver& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) override;

virtual TLocalDataAddress DoGetLocalData(
const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override {
virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override {
ui32 currentIdx = 0;
for (ui32 i = 0; i < Records.size(); ++i) {
if (currentIdx <= position && position < currentIdx + Records[i].GetRecordsCount()) {
Expand All @@ -115,38 +121,48 @@ class TSparsedArray: public IChunkedArray {
return bytes;
}

TSparsedArray(std::vector<TSparsedArrayChunk>&& data, const std::shared_ptr<arrow::Scalar>& /*defaultValue*/,
TSparsedArray(std::vector<TSparsedArrayChunk>&& data, const std::shared_ptr<arrow::Scalar>& defaultValue,
const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount)
: TBase(recordsCount, EType::SparsedArray, type)
, DefaultValue(defaultValue)
, Records(std::move(data)) {
}

static ui32 GetLastIndex(const std::shared_ptr<arrow::RecordBatch>& batch);

static std::shared_ptr<arrow::Schema> BuildSchema(const std::shared_ptr<arrow::DataType>& type) {
std::vector<std::shared_ptr<arrow::Field>> fields = { std::make_shared<arrow::Field>("index", arrow::uint32()),
std::make_shared<arrow::Field>("value", type) };
return std::make_shared<arrow::Schema>(fields);
}

static TSparsedArrayChunk MakeDefaultChunk(
const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount);

public:
TSparsedArray(const IChunkedArray& defaultArray, const std::shared_ptr<arrow::Scalar>& defaultValue);
TSparsedArray(const std::shared_ptr<arrow::Scalar>& defaultValue,
const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount)
: TSparsedArray({}, defaultValue, type, recordsCount)
{


TSparsedArray(const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount)
: TBase(recordsCount, EType::SparsedArray, type)
, DefaultValue(defaultValue) {
Records.emplace_back(MakeDefaultChunk(defaultValue, type, recordsCount));
}

virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
auto chunk = GetSparsedChunk(index);
auto& chunk = GetSparsedChunk(index);
return chunk.GetScalar(index - chunk.GetStartPosition());
}

TSparsedArrayChunk GetSparsedChunk(const ui64 position) const {
ui32 currentIdx = 0;
for (ui32 i = 0; i < Records.size(); ++i) {
if (currentIdx <= position && position < currentIdx + Records[i].GetRecordsCount()) {
return Records[i];
}
currentIdx += Records[i].GetRecordsCount();
}
AFL_VERIFY(false);
return Records.back();
const TSparsedArrayChunk& GetSparsedChunk(const ui64 position) const {
const auto pred = [](const ui64 position, const TSparsedArrayChunk& item) {
return position < item.GetStartPosition();
};
auto it = std::upper_bound(Records.begin(), Records.end(), position, pred);
AFL_VERIFY(it != Records.begin());
--it;
AFL_VERIFY(position < it->GetStartPosition() + it->GetRecordsCount());
AFL_VERIFY(it->GetStartPosition() <= position);
return *it;
}

class TBuilder {
Expand Down
32 changes: 32 additions & 0 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,38 @@ bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) {
return ScalarCompare(x, y) < 0;
}

bool ColumnEqualsScalar(
const std::shared_ptr<arrow::Array>& c, const ui32 position, const std::shared_ptr<arrow::Scalar>& s) {
AFL_VERIFY(c);
if (!s) {
return c->IsNull(position) ;
}
AFL_VERIFY(c->type()->Equals(s->type))("s", s->type->ToString())("c", c->type()->ToString());

return SwitchTypeImpl<bool, 0>(c->type()->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
using TArrayType = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
using TValue = std::decay_t<decltype(static_cast<const TScalar&>(*s).value)>;

if constexpr (arrow::has_string_view<typename TWrap::T>()) {
const auto& cval = static_cast<const TArrayType&>(*c).GetView(position);
const auto& sval = static_cast<const TScalar&>(*s).value;
AFL_VERIFY(sval);
TStringBuf cBuf(reinterpret_cast<const char*>(cval.data()), cval.size());
TStringBuf sBuf(reinterpret_cast<const char*>(sval->data()), sval->size());
return cBuf == sBuf;
}
if constexpr (std::is_arithmetic_v<TValue>) {
const auto cval = static_cast<const TArrayType&>(*c).GetView(position);
const auto sval = static_cast<const TScalar&>(*s).value;
return (cval == sval);
}
Y_ABORT_UNLESS(false); // TODO: non primitive types
return false;
});
}

int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y) {
Y_VERIFY_S(x.type->Equals(y.type), x.type->ToString() + " vs " + y.type->ToString());

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/formats/arrow/arrow_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
int ScalarCompareNullable(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
std::partial_ordering ColumnsCompare(
const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
bool ColumnEqualsScalar(
const std::shared_ptr<arrow::Array>& c, const ui32 position, const std::shared_ptr<arrow::Scalar>& s);
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
std::shared_ptr<arrow::RecordBatch> ReallocateBatch(std::shared_ptr<arrow::RecordBatch> original);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ void IColumnMerger::Start(const std::vector<std::shared_ptr<NArrow::NAccessor::I
AFL_VERIFY(!Started);
Started = true;
for (auto&& i : input) {
if (!i) {
continue;
}
AFL_VERIFY(i->GetDataType()->id() == Context.GetResultField()->type()->id())("input", i->GetDataType()->ToString())(
"result", Context.GetResultField()->ToString());
}
Expand Down
41 changes: 26 additions & 15 deletions ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared

ui32 idx = 0;
for (auto&& batch : Batches) {
AFL_VERIFY(batch->GetColumnsCount() == resultFiltered->GetColumnsCount())("data", batch->GetColumnsCount())(
"schema", resultFiltered->GetColumnsCount());
{
NArrow::NConstruction::IArrayBuilder::TPtr column =
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(
Expand All @@ -53,9 +51,31 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared

std::vector<std::map<ui32, std::vector<TColumnPortionResult>>> chunkGroups;
chunkGroups.resize(batchResults.size());
for (auto&& columnId : resultFiltered->GetColumnIds()) {
NActors::TLogContextGuard logGuard(
NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId)));

using TColumnData = std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>;
THashMap<ui32, TColumnData> columnsData;
{
ui32 batchIdx = 0;
for (auto&& p : Batches) {
ui32 columnIdx = 0;
for (auto&& i : p->GetSchema()->GetFields()) {
const std::optional<ui32> columnId = resultFiltered->GetIndexInfo().GetColumnIdOptional(i->name());
if (columnId) {
auto it = columnsData.find(*columnId);
if (it == columnsData.end()) {
it = columnsData.emplace(*columnId, TColumnData(Batches.size())).first;
}
it->second[batchIdx] = p->GetColumnVerified(columnIdx);
}
++columnIdx;
}
++batchIdx;
}
}

for (auto&& [columnId, columnData] : columnsData) {
const TString& columnName = resultFiltered->GetIndexInfo().GetColumnName(columnId);
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", columnName));
auto columnInfo = stats->GetColumnInfo(columnId);

TColumnMergeContext commonContext(
Expand All @@ -72,16 +92,7 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
AFL_VERIFY(!!merger)("problem", "cannot create merger")(
"class_name", commonContext.GetLoader()->GetAccessorConstructor().GetClassName());

{
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> parts;
for (auto&& p : Batches) {
parts.emplace_back(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)));
}

merger->Start(parts);
}

std::map<std::string, std::vector<NCompaction::TColumnPortionResult>> columnChunks;
merger->Start(columnData);
ui32 batchIdx = 0;
for (auto&& batchResult : batchResults) {
const ui32 portionRecordsCountLimit =
Expand Down
Loading

0 comments on commit b5dba78

Please sign in to comment.