Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3970 RD fixed memory leak in filters #12756

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {
// Clear cache after each object because
// values allocated on another allocator and should be released
Cache.Clear();
Worker->GetGraph().Invalidate();
Worker->Invalidate();
};

auto& holderFactory = Worker->GetGraph().GetHolderFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct TComputationMutables {
std::vector<ui32> SerializableValues; // Indices of values that need to be saved in IComputationGraph::SaveGraphState() and restored in IComputationGraph::LoadGraphState().
ui32 CurWideFieldsIndex = 0U;
std::vector<TWideFieldsInitInfo> WideFieldInitialize;
std::vector<ui32> CachedValues; // Indices of values that holds temporary cached data and unreachable by dependencies
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Правки в /yql/essentials надо вносить в аркадию сначала

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

и /yt тоже

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сорри, это ваш бранч, делайте что хотите:)

Предлагаю поправить CODEOWNERS чтобы не было ревьюеров обычных


void DeferWideFieldsInit(ui32 count, std::set<ui32> used) {
Y_DEBUG_ABORT_UNLESS(AllOf(used, [count](ui32 i) { return i < count; }));
Expand Down Expand Up @@ -251,7 +252,8 @@ class IComputationGraph {
virtual IComputationExternalNode* GetEntryPoint(size_t index, bool require) = 0;
virtual const TArrowKernelsTopology* GetKernelsTopology() = 0;
virtual const TComputationNodePtrDeque& GetNodes() const = 0;
virtual void Invalidate() = 0;
virtual void Invalidate() = 0; // Invalidate all mutable values in graph (may lead to udf recreation)
virtual void InvalidateCaches() = 0; // Invalidate only cached values
virtual TMemoryUsageInfo& GetMemInfo() const = 0;
virtual const THolderFactory& GetHolderFactory() const = 0;
virtual ITerminator* GetTerminator() const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,12 @@ class TComputationGraph final : public IComputationGraph {
std::fill_n(Ctx->MutableValues.get(), PatternNodes->GetMutables().CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
}

void InvalidateCaches() override {
for (const auto cachedIndex : Ctx->Mutables.CachedValues) {
Ctx->MutableValues[cachedIndex] = NUdf::TUnboxedValuePod::Invalid();
}
}

const TComputationNodePtrDeque& GetNodes() const override {
return PatternNodes->GetNodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ TContainerCacheOnContext::TContainerCacheOnContext(TComputationMutables& mutable
: Index(mutables.CurValueIndex++)
{
++++mutables.CurValueIndex;
mutables.CachedValues.insert(mutables.CachedValues.end(), {Index, Index + 1, Index + 2});
}

NUdf::TUnboxedValuePod TContainerCacheOnContext::NewArray(TComputationContext& ctx, ui64 size, NUdf::TUnboxedValue*& items) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace NMiniKQL {

void ThrowNotSupportedImplForClass(const TString& className, const char *func) {
THROW yexception() << "Unsupported access to '" << func << "' method of: " << className;
}
}

template <class IComputationNodeInterface>
void TRefCountedComputationNode<IComputationNodeInterface>::Ref() {
Expand Down Expand Up @@ -100,7 +100,7 @@ Y_NO_INLINE void TStatefulComputationNodeBase::AddDependenceImpl(const IComputat
Dependencies.emplace_back(node);
}

Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, bool stateless) const {
if (self == owner)
return;
Expand Down Expand Up @@ -188,7 +188,7 @@ Y_NO_INLINE TStatefulFlowComputationNodeBase::TStatefulFlowComputationNodeBase(u
, StateKind(stateKind)
{}

Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
if (self == owner)
return;
Expand All @@ -205,7 +205,7 @@ Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase
, SecondKind(secondKind)
{}

Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
if (self == owner)
return;
Expand All @@ -221,7 +221,7 @@ Y_NO_INLINE ui32 TStatelessWideFlowComputationNodeBase::GetIndexImpl() const {
THROW yexception() << "Failed to get stateless node index.";
}

Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
if (self == owner)
return;
Expand Down Expand Up @@ -263,7 +263,7 @@ Y_NO_INLINE TPairStateWideFlowComputationNodeBase::TPairStateWideFlowComputation
{}

Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesImpl(
const IComputationNode* self, const IComputationNode* owner,
const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
if (self == owner)
return;
Expand Down Expand Up @@ -308,7 +308,9 @@ void TExternalComputationNode::CollectDependentIndexes(const IComputationNode*,

TExternalComputationNode::TExternalComputationNode(TComputationMutables& mutables, EValueRepresentation kind)
: TStatefulComputationNode(mutables, kind)
{}
{
mutables.CachedValues.push_back(ValueIndex);
}

NUdf::TUnboxedValue TExternalComputationNode::GetValue(TComputationContext& ctx) const {
return Getter ? Getter(ctx) : ValueRef(ctx);
Expand Down
5 changes: 5 additions & 0 deletions yql/essentials/public/purecalc/common/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,11 @@ namespace NYql {
* Get time provider
*/
virtual ITimeProvider* GetTimeProvider() const = 0;

/**
* Release all input data from worker state
*/
virtual void Invalidate() = 0;
};

/**
Expand Down
11 changes: 11 additions & 0 deletions yql/essentials/public/purecalc/common/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,17 @@ void TWorker<TBase>::Release() {
}
}

template <typename TBase>
void TWorker<TBase>::Invalidate() {
auto& ctx = Graph_.ComputationGraph_->GetContext();
for (const auto* selfNode : Graph_.SelfNodes_) {
if (selfNode) {
selfNode->InvalidateValue(ctx);
}
}
Graph_.ComputationGraph_->InvalidateCaches();
}

TPullStreamWorker::~TPullStreamWorker() {
auto guard = Guard(GetScopedAlloc());
Output_.Clear();
Expand Down
1 change: 1 addition & 0 deletions yql/essentials/public/purecalc/common/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ namespace NYql {
const TString& GetLLVMSettings() const override;
ui64 GetNativeYtTypeFlags() const override;
ITimeProvider* GetTimeProvider() const override;
void Invalidate() override;
protected:
void Release() override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace {

// Clear graph after each object because
// values allocated on another allocator and should be released
Worker_->GetGraph().Invalidate();
Worker_->Invalidate();
}
}

Expand Down
3 changes: 3 additions & 0 deletions yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class TComputationGraphProxy: public IComputationGraph {
void Invalidate() final {
return Graph->Invalidate();
}
void InvalidateCaches() final {
return Graph->InvalidateCaches();
}
TMemoryUsageInfo& GetMemInfo() const final {
return Graph->GetMemInfo();
}
Expand Down
Loading