Skip to content

Commit

Permalink
Merge pull request #77 from orbs-network/feature/add-mem-reporting-to…
Browse files Browse the repository at this point in the history
…-test

Fixed memory leak in message storage
  • Loading branch information
IdoZilberberg authored Sep 23, 2019
2 parents 80d5f7f + b2836aa commit c35178c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 13 deletions.
7 changes: 7 additions & 0 deletions services/storage/in_memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ func (storage *InMemoryStorage) ClearBlockHeightLogs(blockHeight primitives.Bloc
storage.mutext.Lock()
defer storage.mutext.Unlock()

if blockHeight > 0 {
delete(storage.preprepareStorage, blockHeight-1)
delete(storage.prepareStorage, blockHeight-1)
delete(storage.commitStorage, blockHeight-1)
delete(storage.viewChangeStorage, blockHeight-1)
}

storage.resetPreprepareStorage(blockHeight)
storage.resetPrepareStorage(blockHeight)
storage.resetCommitStorage(blockHeight)
Expand Down
4 changes: 3 additions & 1 deletion services/termincommittee/term_in_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ func (tic *TermInCommittee) initView(ctx context.Context, newView primitives.Vie

func (tic *TermInCommittee) Dispose() {
tic.electionTrigger.Stop()
tic.storage.ClearBlockHeightLogs(tic.State.Height())
height := tic.State.Height()
tic.storage.ClearBlockHeightLogs(height)
tic.logger.Debug("LHFLOW Dispose() for H=%d", height)
}

func (tic *TermInCommittee) calcLeaderMemberId(view primitives.View) primitives.MemberId {
Expand Down
41 changes: 34 additions & 7 deletions test/goroutineleak/goroutine_leak_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,47 @@ import (
)

func test2HeavyNetworks(t *testing.T) {
runtime.MemProfileRate = 1
before, _ := os.Create("/tmp/lh-mem-before.prof")
defer before.Close()
after, _ := os.Create("/tmp/lh-mem-after4.prof")
defer after.Close()

runtime.GC()
runtime.GC()
runtime.GC()
runtime.GC()
pprof.WriteHeapProfile(before)

test.WithContext(func(ctx context.Context) {
net1 := network.ATestNetworkBuilder(21).
LogToConsole(t). // This is a very long test, running with logs lets you view progress
//LogToConsole(t). // This is a very long test, running with logs lets you view progress
Build(ctx).
StartConsensus(ctx)
net1.WaitUntilNodesEventuallyReachASpecificHeight(ctx, 20)
net1.WaitUntilSubsetOfNodesEventuallyReachASpecificHeight(ctx, 70, 1)

net2 := network.ATestNetworkBuilder(31).
LogToConsole(t).
Build(ctx).
StartConsensus(ctx)
net2.WaitUntilNodesEventuallyReachASpecificHeight(ctx, 20)
//net2 := network.ATestNetworkBuilder(4).
// LogToConsole(t).
// Build(ctx).
// StartConsensus(ctx)
//net2.WaitUntilNodesEventuallyReachASpecificHeight(ctx, 20)
})

time.Sleep(20 * time.Millisecond) // give goroutines time to terminate

runtime.GC()
runtime.GC()
runtime.GC()
runtime.GC()
pprof.WriteHeapProfile(after)

}

// TODO Incorrect test, it should be updated to take into consideration there are now mainloop and workerloop goroutines
func TestGoroutinesLeaks(t *testing.T) {
var memBefore, memAfter runtime.MemStats
runtime.ReadMemStats(&memBefore)
heapBefore := int64(memBefore.HeapAlloc)
before, _ := os.Create("/tmp/leanhelix-goroutine-shutdown-before.out")
defer before.Close()
after, _ := os.Create("/tmp/leanhelix-goroutine-shutdown-after.out")
Expand All @@ -52,8 +76,11 @@ func TestGoroutinesLeaks(t *testing.T) {
runtime.GC()
time.Sleep(200 * time.Millisecond) // give goroutines time to terminate

runtime.ReadMemStats(&memAfter)
heapAfter := int64(memAfter.HeapAlloc)
numGoroutineAfter := runtime.NumGoroutine()
pprof.Lookup("goroutine").WriteTo(after, 1)
t.Logf("Memory: Before=%d After=%d", heapBefore, heapAfter)

require.Equal(t, numGoroutineBefore, numGoroutineAfter, "number of goroutines should be equal, compare /tmp/leanhelix-goroutine-shutdown-before.out and /tmp/leanhelix-goroutine-shutdown-after.out to see stack traces of the leaks")
}
12 changes: 7 additions & 5 deletions workerloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,19 @@ func (lh *WorkerLoop) onCommit(ctx context.Context, block interfaces.Block, bloc

func (lh *WorkerLoop) onNewConsensusRound(ctx context.Context, prevBlock interfaces.Block, prevBlockProofBytes []byte, canBeFirstLeader bool) {

current, err := lh.state.SetHeightAndResetView(ctx, blockheight.GetBlockHeight(prevBlock) + 1)
if lh.leanHelixTerm != nil {
lh.leanHelixTerm.Dispose()
lh.leanHelixTerm = nil
}

current, err := lh.state.SetHeightAndResetView(ctx, blockheight.GetBlockHeight(prevBlock)+1)
if err != nil {
lh.logger.Info("onNewConsensusRound() failed height increment %d: %s", current.Height(), err)
return
}

lh.logger.Debug("onNewConsensusRound() INCREMENTED HEIGHT TO %d", current.Height())
if lh.leanHelixTerm != nil {
lh.leanHelixTerm.Dispose()
lh.leanHelixTerm = nil
}

lh.leanHelixTerm = leanhelixterm.NewLeanHelixTerm(ctx, lh.logger, lh.config, lh.state, lh.electionTrigger, lh.onCommit, prevBlock, prevBlockProofBytes, canBeFirstLeader)
lh.logger.Debug("onNewConsensusRound() Calling ConsumeCacheMessages for H=%d", lh.state.Height())
lh.filter.ConsumeCacheMessages(ctx, lh.leanHelixTerm)
Expand Down

0 comments on commit c35178c

Please sign in to comment.