diff --git a/velox/expression/PeeledEncoding.cpp b/velox/expression/PeeledEncoding.cpp index 124a3737a343..c169c57278a6 100644 --- a/velox/expression/PeeledEncoding.cpp +++ b/velox/expression/PeeledEncoding.cpp @@ -104,7 +104,7 @@ void PeeledEncoding::setDictionaryWrapping( wrapNulls_ = firstWrapper.nulls(); return; } - auto wrapping = decoded.dictionaryWrapping(firstWrapper, rows.end()); + auto wrapping = decoded.dictionaryWrapping(*firstWrapper.pool(), rows.end()); wrap_ = std::move(wrapping.indices); wrapNulls_ = std::move(wrapping.nulls); } diff --git a/velox/vector/DecodedVector.cpp b/velox/vector/DecodedVector.cpp index 4c9c570572a7..8faad2616734 100644 --- a/velox/vector/DecodedVector.cpp +++ b/velox/vector/DecodedVector.cpp @@ -44,21 +44,26 @@ const std::vector& DecodedVector::zeroIndices() { return indices; } -void DecodedVector::decode( - const BaseVector& vector, +template +void DecodedVector::decodeImpl( + const T& vector, const SelectivityVector* rows, bool loadLazy) { - reset(end(vector.size(), rows)); + reset(end(vector->size(), rows)); partialRowsDecoded_ = rows != nullptr; loadLazy_ = loadLazy; - const bool isTopLevelLazyAndLoaded = - vector.isLazy() && vector.asUnchecked()->isLoaded(); - if (isTopLevelLazyAndLoaded || (loadLazy_ && isLazyNotLoaded(vector))) { - decode(*vector.loadedVector(), rows, loadLazy); + const bool isTopLevelLazyAndLoaded = vector->isLazy() && + vector->template asUnchecked()->isLoaded(); + if (isTopLevelLazyAndLoaded || (loadLazy_ && isLazyNotLoaded(*vector))) { + if constexpr (std::is_same_v) { + decodeImpl(BaseVector::loadedVectorShared(vector), rows, loadLazy); + } else { + decodeImpl(vector->loadedVector(), rows, loadLazy); + } return; } - const auto encoding = vector.encoding(); + const auto encoding = vector->encoding(); switch (encoding) { case VectorEncoding::Simple::FLAT: case VectorEncoding::Simple::BIASED: @@ -71,9 +76,10 @@ void DecodedVector::decode( return; case VectorEncoding::Simple::CONSTANT: { isConstantMapping_ = true; - if (isLazyNotLoaded(vector)) { - baseVector_ = vector.valueVector().get(); - constantIndex_ = vector.wrapInfo()->as()[0]; + if (isLazyNotLoaded(*vector)) { + baseVectorShared_ = vector->valueVector(); + baseVector_ = vector->valueVector().get(); + constantIndex_ = vector->wrapInfo()->template as()[0]; mayHaveNulls_ = true; } else { setBaseData(vector, rows); @@ -82,7 +88,11 @@ void DecodedVector::decode( } case VectorEncoding::Simple::DICTIONARY: case VectorEncoding::Simple::SEQUENCE: { - combineWrappers(&vector, rows); + if constexpr (std::is_same_v) { + combineWrappers(vector.get(), rows); + } else { + combineWrappers(vector, rows); + } break; } default: @@ -92,6 +102,32 @@ void DecodedVector::decode( } } +DecodedVector::DecodedVector( + const BaseVector& vector, + const SelectivityVector& rows, + bool loadLazy) { + decodeImpl(&vector, &rows, loadLazy); +} + +DecodedVector::DecodedVector(const BaseVector& vector, bool loadLazy) { + decodeImpl(&vector, nullptr, loadLazy); +} + +void DecodedVector::decode( + const BaseVector& vector, + const SelectivityVector& rows, + bool loadLazy) { + decodeImpl(&vector, &rows, loadLazy); +} + +void DecodedVector::decode(const BaseVector& vector, bool loadLazy) { + decodeImpl(&vector, nullptr, loadLazy); +} + +void DecodedVector::decode(const VectorPtr& vector, bool loadLazy) { + decodeImpl(vector, nullptr, loadLazy); +} + void DecodedVector::makeIndices( const BaseVector& vector, const SelectivityVector* rows, @@ -115,6 +151,7 @@ void DecodedVector::reset(vector_size_t size) { nulls_ = nullptr; allNulls_.reset(); baseVector_ = nullptr; + baseVectorShared_.reset(); mayHaveNulls_ = false; hasExtraNulls_ = false; isConstantMapping_ = false; @@ -138,10 +175,10 @@ void DecodedVector::combineWrappers( const SelectivityVector* rows, int numLevels) { auto topEncoding = vector->encoding(); - BaseVector* values = nullptr; + VectorPtr values; if (topEncoding == VectorEncoding::Simple::DICTIONARY) { indices_ = vector->wrapInfo()->as(); - values = vector->valueVector().get(); + values = vector->valueVector(); nulls_ = vector->rawNulls(); if (nulls_) { hasExtraNulls_ = true; @@ -155,14 +192,15 @@ void DecodedVector::combineWrappers( int32_t levelCounter = 0; for (;;) { if (numLevels != -1 && ++levelCounter == numLevels) { - baseVector_ = values; + baseVectorShared_ = values; + baseVector_ = values.get(); return; } auto encoding = values->encoding(); if (isLazy(encoding) && (loadLazy_ || values->asUnchecked()->isLoaded())) { - values = values->loadedVector(); + values = BaseVector::loadedVectorShared(values); encoding = values->encoding(); } @@ -174,11 +212,11 @@ void DecodedVector::combineWrappers( case VectorEncoding::Simple::ROW: case VectorEncoding::Simple::ARRAY: case VectorEncoding::Simple::MAP: - setBaseData(*values, rows); + setBaseData(values, rows); return; case VectorEncoding::Simple::DICTIONARY: { applyDictionaryWrapper(*values, rows); - values = values->valueVector().get(); + values = values->valueVector(); break; } default: @@ -226,7 +264,7 @@ void DecodedVector::applyDictionaryWrapper( }); } -void DecodedVector::fillInIndices() { +void DecodedVector::fillInIndices() const { if (isConstantMapping_) { if (size_ > zeroIndices().size() || constantIndex_ != 0) { copiedIndices_.resize(size_); @@ -284,24 +322,31 @@ void DecodedVector::setFlatNulls( } } +template void DecodedVector::setBaseData( - const BaseVector& vector, + const T& vector, const SelectivityVector* rows) { - auto encoding = vector.encoding(); - baseVector_ = &vector; + auto encoding = vector->encoding(); + if constexpr (std::is_same_v) { + baseVectorShared_ = vector; + baseVector_ = vector.get(); + } else { + baseVector_ = vector; + } switch (encoding) { case VectorEncoding::Simple::LAZY: break; case VectorEncoding::Simple::FLAT: { // values() may be nullptr if 'vector' is all nulls. - data_ = vector.values() ? vector.values()->as() : nullptr; - setFlatNulls(vector, rows); + data_ = + vector->values() ? vector->values()->template as() : nullptr; + setFlatNulls(*vector, rows); break; } case VectorEncoding::Simple::ROW: case VectorEncoding::Simple::ARRAY: case VectorEncoding::Simple::MAP: { - setFlatNulls(vector, rows); + setFlatNulls(*vector, rows); break; } case VectorEncoding::Simple::CONSTANT: { @@ -313,31 +358,37 @@ void DecodedVector::setBaseData( } } +template void DecodedVector::setBaseDataForConstant( - const BaseVector& vector, + const T& vector, const SelectivityVector* rows) { - if (!vector.isScalar()) { - baseVector_ = vector.wrappedVector(); - constantIndex_ = vector.wrappedIndex(0); + if (!vector->isScalar()) { + if constexpr (std::is_same_v) { + baseVectorShared_ = BaseVector::wrappedVectorShared(vector); + baseVector_ = baseVectorShared_.get(); + } else { + baseVector_ = vector->wrappedVector(); + } + constantIndex_ = vector->wrappedIndex(0); } - if (!hasExtraNulls_ || vector.isNullAt(0)) { + if (!hasExtraNulls_ || vector->isNullAt(0)) { // A mapping over a constant is constant except if the // mapping adds nulls and the constant is not null. isConstantMapping_ = true; hasExtraNulls_ = false; indices_ = nullptr; - nulls_ = vector.isNullAt(0) ? &constantNullMask_ : nullptr; + nulls_ = vector->isNullAt(0) ? &constantNullMask_ : nullptr; } else { makeIndicesMutable(); applyToRows(rows, [this](vector_size_t row) { copiedIndices_[row] = constantIndex_; }); - setFlatNulls(vector, rows); + setFlatNulls(*vector, rows); } - data_ = vector.valuesAsVoid(); + data_ = vector->valuesAsVoid(); if (!nulls_) { - nulls_ = vector.isNullAt(0) ? &constantNullMask_ : nullptr; + nulls_ = vector->isNullAt(0) ? &constantNullMask_ : nullptr; } mayHaveNulls_ = hasExtraNulls_ || nulls_; } @@ -374,25 +425,23 @@ BufferPtr copyNullsBuffer( } // namespace DecodedVector::DictionaryWrapping DecodedVector::dictionaryWrapping( - const BaseVector& wrapper, + memory::MemoryPool& pool, vector_size_t size) const { - VELOX_CHECK(!isIdentityMapping_); - VELOX_CHECK(!isConstantMapping_); VELOX_CHECK_LE(size, size_); // Make a copy of the indices and nulls buffers. - BufferPtr indices = copyIndicesBuffer(indices_, size, wrapper.pool()); + BufferPtr indices = copyIndicesBuffer(this->indices(), size, &pool); // Only copy nulls if we have nulls coming from one of the wrappers, don't // do it if nulls are missing or from the base vector. // TODO: remove the check for hasExtraNulls_ after #3553 is merged. BufferPtr nulls = - hasExtraNulls_ ? copyNullsBuffer(nulls_, size, wrapper.pool()) : nullptr; + hasExtraNulls_ ? copyNullsBuffer(nulls_, size, &pool) : nullptr; return {std::move(indices), std::move(nulls)}; } VectorPtr DecodedVector::wrap( VectorPtr data, - const BaseVector& wrapper, + memory::MemoryPool& pool, vector_size_t size) { if (isConstantMapping_) { if (isNullAt(0)) { @@ -406,7 +455,7 @@ VectorPtr DecodedVector::wrap( return BaseVector::wrapInConstant(size, constantIndex_, data); } - auto wrapping = dictionaryWrapping(wrapper, size); + auto wrapping = dictionaryWrapping(pool, size); return BaseVector::wrapInDictionary( std::move(wrapping.nulls), std::move(wrapping.indices), diff --git a/velox/vector/DecodedVector.h b/velox/vector/DecodedVector.h index 4da24b75d062..3090bf15fa85 100644 --- a/velox/vector/DecodedVector.h +++ b/velox/vector/DecodedVector.h @@ -107,26 +107,22 @@ class DecodedVector { DecodedVector( const BaseVector& vector, const SelectivityVector& rows, - bool loadLazy = true) { - decode(vector, &rows, loadLazy); - } + bool loadLazy = true); - DecodedVector(const BaseVector& vector, bool loadLazy = true) { - decode(vector, nullptr, loadLazy); - } + explicit DecodedVector(const BaseVector& vector, bool loadLazy = true); /// Resets the internal state and decodes 'vector' for 'rows'. See /// constructor. void decode( const BaseVector& vector, const SelectivityVector& rows, - bool loadLazy = true) { - decode(vector, &rows, loadLazy); - } + bool loadLazy = true); - void decode(const BaseVector& vector, bool loadLazy = true) { - decode(vector, nullptr, loadLazy); - } + void decode(const BaseVector& vector, bool loadLazy = true); + + /// Same as other `decode`, but allow we get the shared ownership of the base + /// vector via `sharedBase()` later. + void decode(const VectorPtr& vector, bool loadLazy = true); /// Returns the values buffer for the base vector. Assumes the vector is of /// scalar type and has been already decoded. Use indices() to access @@ -156,7 +152,7 @@ class DecodedVector { /// Returns the mapping from top-level rows to rows in the base vector or /// data() buffer. - const vector_size_t* indices() { + const vector_size_t* indices() const { if (!indices_) { fillInIndices(); } @@ -220,6 +216,11 @@ class DecodedVector { return baseVector_; } + /// Return a shared_ptr to the flat or constant base vector. + const VectorPtr& sharedBase() const { + return baseVectorShared_; + } + /// Returns true if the decoded vector was flat. bool isIdentityMapping() const { return isIdentityMapping_; @@ -259,13 +260,25 @@ class DecodedVector { /// have been previously decoded by 'this'. This is used when 'data' /// is a component of the base vector of 'wrapper' and must be used /// in the same context, thus with the same indirections. - VectorPtr wrap(VectorPtr data, const BaseVector& wrapper, vector_size_t size); + VectorPtr wrap(VectorPtr data, memory::MemoryPool& pool, vector_size_t size); + + VectorPtr wrap( + VectorPtr data, + memory::MemoryPool& pool, + const SelectivityVector& rows) { + return wrap(std::move(data), pool, rows.end()); + } + + VectorPtr + wrap(VectorPtr data, const BaseVector& wrapper, vector_size_t size) { + return wrap(std::move(data), *wrapper.pool(), size); + } VectorPtr wrap( VectorPtr data, const BaseVector& wrapper, const SelectivityVector& rows) { - return wrap(std::move(data), wrapper, rows.end()); + return wrap(std::move(data), *wrapper.pool(), rows.end()); } struct DictionaryWrapping { @@ -281,13 +294,13 @@ class DecodedVector { /// to use makeIndices() instead of decoded() when initializing the /// DecodedVector. DictionaryWrapping dictionaryWrapping( - const BaseVector& wrapper, + memory::MemoryPool& pool, vector_size_t size) const; DictionaryWrapping dictionaryWrapping( - const BaseVector& wrapper, + memory::MemoryPool& pool, const SelectivityVector& rows) const { - return dictionaryWrapping(wrapper, rows.end()); + return dictionaryWrapping(pool, rows.end()); } /// END: Members that must only be used by PeeledEncoding @@ -300,14 +313,13 @@ class DecodedVector { DecodedVector( const BaseVector& vector, const SelectivityVector* rows, - bool loadLazy = true) { - decode(vector, rows, loadLazy); + bool loadLazy) { + decodeImpl(&vector, rows, loadLazy); } - void decode( - const BaseVector& vector, - const SelectivityVector* rows, - bool loadLazy = true); + template + void + decodeImpl(const T& vector, const SelectivityVector* rows, bool loadLazy); void makeIndices( const BaseVector& vector, @@ -341,13 +353,13 @@ class DecodedVector { void copyNulls(vector_size_t size); - void fillInIndices(); + void fillInIndices() const; - void setBaseData(const BaseVector& vector, const SelectivityVector* rows); + template + void setBaseData(const T& vector, const SelectivityVector* rows); - void setBaseDataForConstant( - const BaseVector& vector, - const SelectivityVector* rows); + template + void setBaseDataForConstant(const T& vector, const SelectivityVector* rows); void reset(vector_size_t size); @@ -373,7 +385,7 @@ class DecodedVector { // The indices into 'data_' or 'baseVector_' for the rows in // 'rows' given to decode(). Only positions that are in // 'selection' are guaranteed to have valid values. - const vector_size_t* indices_ = nullptr; + mutable const vector_size_t* indices_ = nullptr; // The base array of 'vector' given to decode(), nullptr if vector is of // complex type. @@ -395,6 +407,8 @@ class DecodedVector { // after constant and dictionary vectors have been peeled off. const BaseVector* baseVector_ = nullptr; + VectorPtr baseVectorShared_; + // True if either the leaf vector has nulls or if nulls were added // by a dictionary wrapper. bool mayHaveNulls_ = false; @@ -419,7 +433,7 @@ class DecodedVector { // Holds indices if an array of indices needs to be materialized, // e.g. when combining nested dictionaries. - std::vector copiedIndices_; + mutable std::vector copiedIndices_; // Used as backing for 'nulls_' when null-ness is combined from // dictionary and base values. diff --git a/velox/vector/tests/DecodedVectorTest.cpp b/velox/vector/tests/DecodedVectorTest.cpp index c9c3f6be9f5b..1640a2474bc4 100644 --- a/velox/vector/tests/DecodedVectorTest.cpp +++ b/velox/vector/tests/DecodedVectorTest.cpp @@ -71,7 +71,7 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { void assertDecodedVector( const std::vector>& expected, const SelectivityVector& selection, - SimpleVector* outVector, + const FlatVectorPtr& outVector, bool dbgPrintVec) { auto check = [&](auto& decoded) { auto end = selection.end(); @@ -110,20 +110,21 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { check(decoded); } - { - if (selection.isAllSelected()) { - DecodedVector decoded(*outVector); - check(decoded); - } + if (selection.isAllSelected()) { + DecodedVector decoded(*outVector); + check(decoded); + ASSERT_FALSE(decoded.sharedBase()); + decoded.decode(outVector); + ASSERT_TRUE(decoded.sharedBase()); + check(decoded); } } template void assertDecodedVector( const std::vector>& expected, - BaseVector* outBaseVector, + const FlatVectorPtr& outVector, bool dbgPrintVec) { - auto* outVector = reinterpret_cast*>(outBaseVector); assertDecodedVector(expected, allSelected_, outVector, dbgPrintVec); assertDecodedVector(expected, halfSelected_, outVector, dbgPrintVec); } @@ -136,7 +137,7 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { const auto& data = cardData.data(); EXPECT_EQ(cardinality, data.size()); auto flatVector = makeNullableFlatVector(data, type); - assertDecodedVector(data, flatVector.get(), false); + assertDecodedVector(data, flatVector, false); } template @@ -164,6 +165,10 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { { DecodedVector decoded(*constantVector); check(decoded); + ASSERT_FALSE(decoded.sharedBase()); + decoded.decode(constantVector); + ASSERT_TRUE(decoded.sharedBase()); + check(decoded); } } @@ -204,6 +209,10 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { { DecodedVector decoded(*constantVector); check(decoded); + ASSERT_FALSE(decoded.sharedBase()); + decoded.decode(constantVector); + ASSERT_TRUE(decoded.sharedBase()); + check(decoded); } } @@ -234,6 +243,10 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { { DecodedVector decoded(*constantVector); check(decoded); + ASSERT_FALSE(decoded.sharedBase()); + decoded.decode(constantVector); + ASSERT_TRUE(decoded.sharedBase()); + check(decoded); } } @@ -270,6 +283,10 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { { DecodedVector decoded(*constantVector); check(decoded); + ASSERT_FALSE(decoded.sharedBase()); + decoded.decode(constantVector); + ASSERT_TRUE(decoded.sharedBase()); + check(decoded); } } @@ -317,6 +334,7 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { { DecodedVector decoded(*dictionaryVector, false); + ASSERT_TRUE(decoded.sharedBase()); check(decoded); } } @@ -361,6 +379,7 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { { DecodedVector decoded(*dictionaryVector, false); + ASSERT_TRUE(decoded.sharedBase()); check(decoded); } } @@ -394,6 +413,7 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { { DecodedVector decoded(*dictionaryVector, false); + ASSERT_TRUE(decoded.sharedBase()); check(decoded); } } @@ -432,6 +452,7 @@ class DecodedVectorTest : public testing::Test, public VectorTestBase { { DecodedVector decoded(*dictionaryVector); + ASSERT_TRUE(decoded.sharedBase()); check(decoded); } } @@ -463,6 +484,10 @@ void DecodedVectorTest::testConstant(const StringView& value) { { DecodedVector decoded(*constantVector); check(decoded); + ASSERT_FALSE(decoded.sharedBase()); + decoded.decode(constantVector); + ASSERT_TRUE(decoded.sharedBase()); + check(decoded); } } @@ -1380,7 +1405,7 @@ TEST_F(DecodedVectorTest, dictionaryWrapping) { DecodedVector decoded; decoded.decode(*dict, rows); - auto wrapping = decoded.dictionaryWrapping(*dict, rows); + auto wrapping = decoded.dictionaryWrapping(*dict->pool(), rows); auto wrapped = BaseVector::wrapInDictionary( std::move(wrapping.nulls), std::move(wrapping.indices), @@ -1393,7 +1418,7 @@ TEST_F(DecodedVectorTest, dictionaryWrapping) { { DecodedVector decoded; decoded.decode(*dict); - auto wrapping = decoded.dictionaryWrapping(*dict, outerDictSize); + auto wrapping = decoded.dictionaryWrapping(*dict->pool(), outerDictSize); auto wrapped = BaseVector::wrapInDictionary( std::move(wrapping.nulls), std::move(wrapping.indices), @@ -1404,6 +1429,32 @@ TEST_F(DecodedVectorTest, dictionaryWrapping) { } } +TEST_F(DecodedVectorTest, dictionaryWrappingForFlat) { + auto vector = makeFlatVector(10, folly::identity); + DecodedVector decoded; + decoded.decode(vector); + auto wrapping = decoded.dictionaryWrapping(*pool(), vector->size()); + ASSERT_FALSE(wrapping.nulls); + ASSERT_GE(wrapping.indices->size(), 10 * sizeof(vector_size_t)); + auto* indices = wrapping.indices->as(); + for (int i = 0; i < 10; ++i) { + ASSERT_EQ(indices[i], i); + } +} + +TEST_F(DecodedVectorTest, dictionaryWrappingForConstant) { + auto vector = makeConstant(42, 10); + DecodedVector decoded; + decoded.decode(vector); + auto wrapping = decoded.dictionaryWrapping(*pool(), vector->size()); + ASSERT_FALSE(wrapping.nulls); + ASSERT_GE(wrapping.indices->size(), 10 * sizeof(vector_size_t)); + auto* indices = wrapping.indices->as(); + for (int i = 0; i < 10; ++i) { + ASSERT_EQ(indices[i], 0); + } +} + TEST_F(DecodedVectorTest, previousIndicesInReUsedDecodedVector) { // Verify that when DecodedVector is re-used with different set of valid rows, // then the unselected indices would still have valid values. @@ -1433,7 +1484,7 @@ TEST_F(DecodedVectorTest, previousIndicesInReUsedDecodedVector) { rows.setValid(2, true); rows.updateBounds(); d.decode(*dict2, rows); - auto wrapping = d.dictionaryWrapping(*d.base(), d.base()->size()); + auto wrapping = d.dictionaryWrapping(*d.base()->pool(), d.base()->size()); auto rawIndices = wrapping.indices->as(); // Ensure the previous index on the unselected row is reset. EXPECT_EQ(rawIndices[0], 0);