Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(exporter): memory leak and lock contentions #2034

Open
wants to merge 48 commits into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
c8c925c
don't allocate SeenSigners if not needed
nkryuchkov Nov 22, 2024
9f703dd
use SignersBitMask as SeenSigners key
nkryuchkov Nov 22, 2024
55c7525
store full data hash instead of full data
nkryuchkov Nov 22, 2024
8fbb447
compress MessageCounts from 48 bytes to 1
nkryuchkov Nov 22, 2024
5e15f09
fix file name
nkryuchkov Nov 22, 2024
8a92a27
get rid of consensusID
nkryuchkov Nov 22, 2024
9dc78fe
add comments for SeenSigners
nkryuchkov Nov 22, 2024
331d490
use slice instead of map in consensusState
nkryuchkov Nov 22, 2024
a33c9bf
draft
olegshmuelov Nov 26, 2024
4da54b8
revert concurrency
olegshmuelov Nov 26, 2024
5711ac6
lint
olegshmuelov Nov 26, 2024
1ae8380
fix panic message
nkryuchkov Nov 26, 2024
d36e736
lint fix
olegshmuelov Nov 26, 2024
7e39964
improve naming
olegshmuelov Nov 26, 2024
4d47d4d
refactor getting signer index in committee
nkryuchkov Nov 26, 2024
a658aaf
clarify ErrDuplicatedMessage
nkryuchkov Nov 26, 2024
5dd47ea
fix linter
nkryuchkov Nov 26, 2024
203b9be
get rid of panic
nkryuchkov Nov 27, 2024
4d9e546
fix imports
nkryuchkov Nov 27, 2024
dc7f0d6
reduce amount of stored slots
nkryuchkov Nov 27, 2024
66e970a
change RWMutex to Mutex
nkryuchkov Nov 27, 2024
6dc13bc
rename states
nkryuchkov Nov 27, 2024
91c43e1
use ttlcache for state
nkryuchkov Nov 27, 2024
055ac8a
fix linter
nkryuchkov Nov 27, 2024
6bb4f65
Merge branch 'stage' into msgvalidation-optimize-signer-state
nkryuchkov Dec 6, 2024
24c78eb
Merge branch 'stage' into exporter-race-fix
olegshmuelov Dec 16, 2024
eac592a
exporter with jemalloc
moshe-blox Feb 11, 2025
06411f7
remove jemalloc check
moshe-blox Feb 11, 2025
49ed538
increase worker count
moshe-blox Feb 11, 2025
875c9b7
revert worker count increase
moshe-blox Feb 11, 2025
a31ac82
Merge branch 'stage' into msgvalidation-optimize-signer-state
nkryuchkov Feb 11, 2025
26c9bc1
fix issues after merging
nkryuchkov Feb 11, 2025
49c44fd
udpate discard ratio; run until no err
anatolie-ssv Feb 11, 2025
0a23a0d
Merge branch 'msgvalidation-optimize-signer-state' into exporter-with…
moshe-blox Feb 11, 2025
37f9e53
Merge branch 'stage' into exporter-race-fix
y0sher Feb 11, 2025
fd7c764
imports and fmt
y0sher Feb 11, 2025
6b92f1e
Merge branch 'exporter-race-fix' into exporter-with-jemalloc
moshe-blox Feb 11, 2025
1cbbade
don't reopen database after migrations
moshe-blox Feb 11, 2025
b4a8e71
Merge branch 'fix/badger-quickgc-rate' into exporter-with-jemalloc
moshe-blox Feb 11, 2025
e4992fc
simple mtx usage
y0sher Feb 11, 2025
127aa5e
simple mtx usage
y0sher Feb 12, 2025
aa29eb7
cr: RW mutex is not needed here.
y0sher Feb 12, 2025
7995a8e
cr: remove unused code
y0sher Feb 12, 2025
1b28899
remove mutexes in ValidatorState and OperatorState
nkryuchkov Feb 12, 2025
550b344
cr: remove unused code
y0sher Feb 12, 2025
0addd00
don't save or get participates with tx.
y0sher Feb 12, 2025
93e78df
revert jemalloc
moshe-blox Feb 13, 2025
439da52
readd jemalloc check
moshe-blox Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,13 +538,6 @@ func setupDB(logger *zap.Logger, eth2Network beaconprotocol.Network) (*kv.Badger
if err != nil {
return nil, errors.Wrap(err, "failed to open db")
}
reopenDb := func() error {
if err := db.Close(); err != nil {
return errors.Wrap(err, "failed to close db")
}
db, err = kv.New(logger, cfg.DBOptions)
return errors.Wrap(err, "failed to reopen db")
}

migrationOpts := migrations.Options{
Db: db,
Expand All @@ -561,24 +554,13 @@ func setupDB(logger *zap.Logger, eth2Network beaconprotocol.Network) (*kv.Badger

// If migrations were applied, we run a full garbage collection cycle
// to reclaim any space that may have been freed up.
// Close & reopen the database to trigger any unknown internal
// startup/shutdown procedures that the storage engine may have.
start := time.Now()
if err := reopenDb(); err != nil {
return nil, err
}

// Run a long garbage collection cycle with a timeout.
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Minute)
defer cancel()
if err := db.FullGC(ctx); err != nil {
return nil, errors.Wrap(err, "failed to collect garbage")
}

// Close & reopen again.
if err := reopenDb(); err != nil {
return nil, err
}
logger.Debug("post-migrations garbage collection completed", fields.Duration(start))

return db, nil
Expand Down
39 changes: 14 additions & 25 deletions ibft/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
// participantStorage struct
// instanceType is what separates different iBFT eth2 duty types (attestation, proposal and aggregation)
type participantStorage struct {
prefix []byte
oldPrefix string // kept back for cleanup
db basedb.Database
participantsMu sync.Mutex
prefix []byte
oldPrefix string // kept back for cleanup
db basedb.Database
}

// New create new participant store
Expand Down Expand Up @@ -163,13 +162,7 @@
}

func (i *participantStorage) SaveParticipants(pk spectypes.ValidatorPK, slot phase0.Slot, newParticipants []spectypes.OperatorID) (updated bool, err error) {
i.participantsMu.Lock()
defer i.participantsMu.Unlock()

txn := i.db.Begin()
defer txn.Discard()

existingParticipants, err := i.getParticipants(txn, pk, slot)
existingParticipants, err := i.getParticipants(pk, slot)
if err != nil {
return false, fmt.Errorf("get participants %w", err)
}
Expand All @@ -179,14 +172,10 @@
return false, nil
}

if err := i.saveParticipants(txn, pk, slot, mergedParticipants); err != nil {
if err := i.saveParticipants(pk, slot, mergedParticipants); err != nil {
return false, fmt.Errorf("save participants: %w", err)
}

if err := txn.Commit(); err != nil {
return false, fmt.Errorf("commit transaction: %w", err)
}

return true, nil
}

Expand Down Expand Up @@ -237,11 +226,11 @@
}

func (i *participantStorage) GetParticipants(pk spectypes.ValidatorPK, slot phase0.Slot) ([]spectypes.OperatorID, error) {
return i.getParticipants(nil, pk, slot)
return i.getParticipants(pk, slot)

Check warning on line 229 in ibft/storage/store.go

View check run for this annotation

Codecov / codecov/patch

ibft/storage/store.go#L229

Added line #L229 was not covered by tests
}

func (i *participantStorage) getParticipants(txn basedb.ReadWriter, pk spectypes.ValidatorPK, slot phase0.Slot) ([]spectypes.OperatorID, error) {
val, found, err := i.get(txn, pk[:], slotToByteSlice(slot))
func (i *participantStorage) getParticipants(pk spectypes.ValidatorPK, slot phase0.Slot) ([]spectypes.OperatorID, error) {
val, found, err := i.get(pk[:], slotToByteSlice(slot))
if err != nil {
return nil, err
}
Expand All @@ -253,12 +242,12 @@
return operators, nil
}

func (i *participantStorage) saveParticipants(txn basedb.ReadWriter, pk spectypes.ValidatorPK, slot phase0.Slot, operators []spectypes.OperatorID) error {
func (i *participantStorage) saveParticipants(pk spectypes.ValidatorPK, slot phase0.Slot, operators []spectypes.OperatorID) error {
bytes, err := encodeOperators(operators)
if err != nil {
return fmt.Errorf("encode operators: %w", err)
}
if err := i.save(txn, bytes, pk[:], slotToByteSlice(slot)); err != nil {
if err := i.save(bytes, pk[:], slotToByteSlice(slot)); err != nil {
return fmt.Errorf("save to DB: %w", err)
}

Expand All @@ -271,14 +260,14 @@
return slices.Compact(allParticipants)
}

func (i *participantStorage) save(txn basedb.ReadWriter, value []byte, pk, slot []byte) error {
func (i *participantStorage) save(value []byte, pk, slot []byte) error {
prefix := i.makePrefix(slot)
return i.db.Using(txn).Set(prefix, pk, value)
return i.db.Set(prefix, pk, value)
}

func (i *participantStorage) get(txn basedb.ReadWriter, pk, slot []byte) ([]byte, bool, error) {
func (i *participantStorage) get(pk, slot []byte) ([]byte, bool, error) {
prefix := i.makePrefix(slot)
obj, found, err := i.db.Using(txn).Get(prefix, pk)
obj, found, err := i.db.Get(prefix, pk)
if err != nil {
return nil, false, err
}
Expand Down
36 changes: 36 additions & 0 deletions message/validation/committee_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package validation

import (
"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
)

type CommitteeInfo struct {
committeeID spectypes.CommitteeID
committee []spectypes.OperatorID
signerIndices map[spectypes.OperatorID]int
validatorIndices []phase0.ValidatorIndex
}

func newCommitteeInfo(
committeeID spectypes.CommitteeID,
operators []spectypes.OperatorID,
validatorIndices []phase0.ValidatorIndex,
) CommitteeInfo {
signerIndices := make(map[spectypes.OperatorID]int)
for i, operator := range operators {
signerIndices[operator] = i
}

return CommitteeInfo{
committeeID: committeeID,
committee: operators,
signerIndices: signerIndices,
validatorIndices: validatorIndices,
}
}

// keeping the method for readability and the comment
func (ci *CommitteeInfo) signerIndex(signer spectypes.OperatorID) int {
return ci.signerIndices[signer] // existence must be checked by ErrSignerNotInCommittee
}
2 changes: 1 addition & 1 deletion message/validation/common_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (mv *messageValidator) messageLateness(slot phase0.Slot, role spectypes.Run
case spectypes.RoleProposer, spectypes.RoleSyncCommitteeContribution:
ttl = 1 + LateSlotAllowance
case spectypes.RoleCommittee, spectypes.RoleAggregator:
ttl = phase0.Slot(mv.netCfg.Beacon.SlotsPerEpoch()) + LateSlotAllowance
ttl = MaxStoredSlots(mv.netCfg)
case spectypes.RoleValidatorRegistration, spectypes.RoleVoluntaryExit:
return 0
}
Expand Down
48 changes: 11 additions & 37 deletions message/validation/consensus_state.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,25 @@
package validation

import (
"sync"

"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
)

// consensusID uniquely identifies a public key and role pair to keep track of state.
type consensusID struct {
DutyExecutorID string
Role spectypes.RunnerRole
}

// consensusState keeps track of the signers for a given public key and role.
type consensusState struct {
state map[spectypes.OperatorID]*OperatorState
// ValidatorState keeps track of the signers for a given public key and role.
type ValidatorState struct {
operators []*OperatorState
storedSlotCount phase0.Slot
mu sync.Mutex
}

func (cs *consensusState) GetOrCreate(signer spectypes.OperatorID) *OperatorState {
cs.mu.Lock()
defer cs.mu.Unlock()

if _, ok := cs.state[signer]; !ok {
cs.state[signer] = newOperatorState(cs.storedSlotCount)
func (cs *ValidatorState) Signer(idx int) *OperatorState {
if cs.operators[idx] == nil {
cs.operators[idx] = newOperatorState(cs.storedSlotCount)
}

return cs.state[signer]
return cs.operators[idx]
}

type OperatorState struct {
mu sync.RWMutex
state []*SignerState // the slice index is slot % storedSlotCount
signers []*SignerState // the slice index is slot % storedSlotCount
maxSlot phase0.Slot
maxEpoch phase0.Epoch
lastEpochDuties uint64
Expand All @@ -42,15 +28,12 @@ type OperatorState struct {

func newOperatorState(size phase0.Slot) *OperatorState {
return &OperatorState{
state: make([]*SignerState, size),
signers: make([]*SignerState, size),
}
}

func (os *OperatorState) Get(slot phase0.Slot) *SignerState {
os.mu.RLock()
defer os.mu.RUnlock()

s := os.state[(uint64(slot) % uint64(len(os.state)))]
s := os.signers[(uint64(slot) % uint64(len(os.signers)))]
if s == nil || s.Slot != slot {
return nil
}
Expand All @@ -59,10 +42,7 @@ func (os *OperatorState) Get(slot phase0.Slot) *SignerState {
}

func (os *OperatorState) Set(slot phase0.Slot, epoch phase0.Epoch, state *SignerState) {
os.mu.Lock()
defer os.mu.Unlock()

os.state[uint64(slot)%uint64(len(os.state))] = state
os.signers[uint64(slot)%uint64(len(os.signers))] = state
if slot > os.maxSlot {
os.maxSlot = slot
}
Expand All @@ -76,16 +56,10 @@ func (os *OperatorState) Set(slot phase0.Slot, epoch phase0.Epoch, state *Signer
}

func (os *OperatorState) MaxSlot() phase0.Slot {
os.mu.RLock()
defer os.mu.RUnlock()

return os.maxSlot
}

func (os *OperatorState) DutyCount(epoch phase0.Epoch) uint64 {
os.mu.RLock()
defer os.mu.RUnlock()

if epoch == os.maxEpoch {
return os.lastEpochDuties
}
Expand Down
10 changes: 5 additions & 5 deletions message/validation/consensus_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestOperatorState(t *testing.T) {
size := phase0.Slot(10)
os := newOperatorState(size)
require.NotNil(t, os)
require.Equal(t, len(os.state), int(size))
require.Equal(t, len(os.signers), int(size))
})

t.Run("TestGetAndSet", func(t *testing.T) {
Expand Down Expand Up @@ -58,9 +58,9 @@ func TestOperatorState(t *testing.T) {

slot := phase0.Slot(5)
epoch := phase0.Epoch(1)
signerState := &SignerState{Slot: slot}
signerState1 := &SignerState{Slot: slot}

os.Set(slot, epoch, signerState)
os.Set(slot, epoch, signerState1)

require.Equal(t, os.DutyCount(epoch), uint64(1))
require.Equal(t, os.DutyCount(epoch-1), uint64(0))
Expand All @@ -82,9 +82,9 @@ func TestOperatorState(t *testing.T) {

slot := phase0.Slot(5)
epoch := phase0.Epoch(1)
signerState := &SignerState{Slot: slot}
signerState1 := &SignerState{Slot: slot}

os.Set(slot, epoch, signerState)
os.Set(slot, epoch, signerState1)
require.Equal(t, os.DutyCount(epoch), uint64(1))

slot2 := phase0.Slot(6)
Expand Down
Loading