diff --git a/services/storage/in_memory_storage.go b/services/storage/in_memory_storage.go index 7ba3925..e8d93ef 100644 --- a/services/storage/in_memory_storage.go +++ b/services/storage/in_memory_storage.go @@ -309,6 +309,13 @@ func (storage *InMemoryStorage) ClearBlockHeightLogs(blockHeight primitives.Bloc storage.mutext.Lock() defer storage.mutext.Unlock() + if blockHeight > 0 { + delete(storage.preprepareStorage, blockHeight-1) + delete(storage.prepareStorage, blockHeight-1) + delete(storage.commitStorage, blockHeight-1) + delete(storage.viewChangeStorage, blockHeight-1) + } + storage.resetPreprepareStorage(blockHeight) storage.resetPrepareStorage(blockHeight) storage.resetCommitStorage(blockHeight) diff --git a/services/termincommittee/term_in_committee.go b/services/termincommittee/term_in_committee.go index 9863709..0350735 100644 --- a/services/termincommittee/term_in_committee.go +++ b/services/termincommittee/term_in_committee.go @@ -209,7 +209,9 @@ func (tic *TermInCommittee) initView(ctx context.Context, newView primitives.Vie func (tic *TermInCommittee) Dispose() { tic.electionTrigger.Stop() - tic.storage.ClearBlockHeightLogs(tic.State.Height()) + height := tic.State.Height() + tic.storage.ClearBlockHeightLogs(height) + tic.logger.Debug("LHFLOW Dispose() for H=%d", height) } func (tic *TermInCommittee) calcLeaderMemberId(view primitives.View) primitives.MemberId { diff --git a/test/goroutineleak/goroutine_leak_test.go b/test/goroutineleak/goroutine_leak_test.go index 1e57801..58dfcdf 100644 --- a/test/goroutineleak/goroutine_leak_test.go +++ b/test/goroutineleak/goroutine_leak_test.go @@ -21,23 +21,47 @@ import ( ) func test2HeavyNetworks(t *testing.T) { + runtime.MemProfileRate = 1 + before, _ := os.Create("/tmp/lh-mem-before.prof") + defer before.Close() + after, _ := os.Create("/tmp/lh-mem-after4.prof") + defer after.Close() + + runtime.GC() + runtime.GC() + runtime.GC() + runtime.GC() + pprof.WriteHeapProfile(before) + test.WithContext(func(ctx context.Context) { net1 := network.ATestNetworkBuilder(21). - LogToConsole(t). // This is a very long test, running with logs lets you view progress + //LogToConsole(t). // This is a very long test, running with logs lets you view progress Build(ctx). StartConsensus(ctx) - net1.WaitUntilNodesEventuallyReachASpecificHeight(ctx, 20) + net1.WaitUntilSubsetOfNodesEventuallyReachASpecificHeight(ctx, 70, 1) - net2 := network.ATestNetworkBuilder(31). - LogToConsole(t). - Build(ctx). - StartConsensus(ctx) - net2.WaitUntilNodesEventuallyReachASpecificHeight(ctx, 20) + //net2 := network.ATestNetworkBuilder(4). + // LogToConsole(t). + // Build(ctx). + // StartConsensus(ctx) + //net2.WaitUntilNodesEventuallyReachASpecificHeight(ctx, 20) }) + + time.Sleep(20 * time.Millisecond) // give goroutines time to terminate + + runtime.GC() + runtime.GC() + runtime.GC() + runtime.GC() + pprof.WriteHeapProfile(after) + } // TODO Incorrect test, it should be updated to take into consideration there are now mainloop and workerloop goroutines func TestGoroutinesLeaks(t *testing.T) { + var memBefore, memAfter runtime.MemStats + runtime.ReadMemStats(&memBefore) + heapBefore := int64(memBefore.HeapAlloc) before, _ := os.Create("/tmp/leanhelix-goroutine-shutdown-before.out") defer before.Close() after, _ := os.Create("/tmp/leanhelix-goroutine-shutdown-after.out") @@ -52,8 +76,11 @@ func TestGoroutinesLeaks(t *testing.T) { runtime.GC() time.Sleep(200 * time.Millisecond) // give goroutines time to terminate + runtime.ReadMemStats(&memAfter) + heapAfter := int64(memAfter.HeapAlloc) numGoroutineAfter := runtime.NumGoroutine() pprof.Lookup("goroutine").WriteTo(after, 1) + t.Logf("Memory: Before=%d After=%d", heapBefore, heapAfter) require.Equal(t, numGoroutineBefore, numGoroutineAfter, "number of goroutines should be equal, compare /tmp/leanhelix-goroutine-shutdown-before.out and /tmp/leanhelix-goroutine-shutdown-after.out to see stack traces of the leaks") } diff --git a/workerloop.go b/workerloop.go index 4247498..178501a 100644 --- a/workerloop.go +++ b/workerloop.go @@ -237,17 +237,19 @@ func (lh *WorkerLoop) onCommit(ctx context.Context, block interfaces.Block, bloc func (lh *WorkerLoop) onNewConsensusRound(ctx context.Context, prevBlock interfaces.Block, prevBlockProofBytes []byte, canBeFirstLeader bool) { - current, err := lh.state.SetHeightAndResetView(ctx, blockheight.GetBlockHeight(prevBlock) + 1) + if lh.leanHelixTerm != nil { + lh.leanHelixTerm.Dispose() + lh.leanHelixTerm = nil + } + + current, err := lh.state.SetHeightAndResetView(ctx, blockheight.GetBlockHeight(prevBlock)+1) if err != nil { lh.logger.Info("onNewConsensusRound() failed height increment %d: %s", current.Height(), err) return } lh.logger.Debug("onNewConsensusRound() INCREMENTED HEIGHT TO %d", current.Height()) - if lh.leanHelixTerm != nil { - lh.leanHelixTerm.Dispose() - lh.leanHelixTerm = nil - } + lh.leanHelixTerm = leanhelixterm.NewLeanHelixTerm(ctx, lh.logger, lh.config, lh.state, lh.electionTrigger, lh.onCommit, prevBlock, prevBlockProofBytes, canBeFirstLeader) lh.logger.Debug("onNewConsensusRound() Calling ConsumeCacheMessages for H=%d", lh.state.Height()) lh.filter.ConsumeCacheMessages(ctx, lh.leanHelixTerm)