-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
hotfix(exporter): ensure thread-safe committee observer management #1883
base: stage
Are you sure you want to change the base?
Changes from 5 commits
a33c9bf
4da54b8
5711ac6
d36e736
7e39964
24c78eb
37f9e53
fd7c764
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types" | ||
specqbft "github.com/ssvlabs/ssv-spec/qbft" | ||
spectypes "github.com/ssvlabs/ssv-spec/types" | ||
|
||
"github.com/ssvlabs/ssv/exporter/convert" | ||
"github.com/ssvlabs/ssv/ibft/genesisstorage" | ||
"github.com/ssvlabs/ssv/ibft/storage" | ||
|
@@ -210,7 +211,7 @@ type controller struct { | |
historySyncBatchSize int | ||
messageValidator validation.MessageValidator | ||
|
||
// nonCommittees is a cache of initialized committeeObserver instances | ||
// committeeObservers is a cache of initialized committeeObserver instances | ||
committeesObservers *ttlcache.Cache[spectypes.MessageID, *committeeObserver] | ||
committeesObserversMutex sync.Mutex | ||
attesterRoots *ttlcache.Cache[phase0.Root, struct{}] | ||
|
@@ -344,7 +345,7 @@ func NewController(logger *zap.Logger, options ControllerOptions) Controller { | |
} | ||
ctrl.genesisCtx, ctrl.cancelGenesisCtx = context.WithCancel(options.Context) | ||
|
||
// Start automatic expired item deletion in nonCommitteeValidators. | ||
// Start automatic expired item deletion in committeeObserverValidators. | ||
go ctrl.committeesObservers.Start() | ||
// Delete old root and domain entries. | ||
go ctrl.attesterRoots.Start() | ||
|
@@ -448,57 +449,74 @@ func (c *controller) handleRouterMessages() { | |
} | ||
} | ||
|
||
var nonCommitteeValidatorTTLs = map[spectypes.RunnerRole]int{ | ||
spectypes.RoleCommittee: 64, | ||
spectypes.RoleProposer: 4, | ||
spectypes.RoleAggregator: 4, | ||
//spectypes.BNRoleSyncCommittee: 4, | ||
var committeeObserverValidatorTTLs = map[spectypes.RunnerRole]int{ | ||
spectypes.RoleCommittee: 64, | ||
spectypes.RoleProposer: 4, | ||
spectypes.RoleAggregator: 4, | ||
spectypes.RoleSyncCommitteeContribution: 4, | ||
} | ||
|
||
func (c *controller) handleWorkerMessages(msg network.DecodedSSVMessage) error { | ||
var ncv *committeeObserver | ||
ssvMsg := msg.(*queue.SSVMessage) | ||
|
||
item := c.getNonCommitteeValidators(ssvMsg.GetID()) | ||
if item == nil { | ||
committeeObserverOptions := validator.CommitteeObserverOptions{ | ||
Logger: c.logger, | ||
NetworkConfig: c.networkConfig, | ||
ValidatorStore: c.validatorStore, | ||
Network: c.validatorOptions.Network, | ||
Storage: c.validatorOptions.Storage, | ||
FullNode: c.validatorOptions.FullNode, | ||
Operator: c.validatorOptions.Operator, | ||
OperatorSigner: c.validatorOptions.OperatorSigner, | ||
NewDecidedHandler: c.validatorOptions.NewDecidedHandler, | ||
AttesterRoots: c.attesterRoots, | ||
SyncCommRoots: c.syncCommRoots, | ||
DomainCache: c.domainCache, | ||
} | ||
ncv = &committeeObserver{ | ||
CommitteeObserver: validator.NewCommitteeObserver(convert.MessageID(ssvMsg.MsgID), committeeObserverOptions), | ||
} | ||
ttlSlots := nonCommitteeValidatorTTLs[ssvMsg.MsgID.GetRoleType()] | ||
c.committeesObservers.Set( | ||
ssvMsg.GetID(), | ||
ncv, | ||
time.Duration(ttlSlots)*c.beacon.GetBeaconNetwork().SlotDurationSec(), | ||
) | ||
} else { | ||
ncv = item | ||
} | ||
if err := c.handleNonCommitteeMessages(ssvMsg, ncv); err != nil { | ||
return err | ||
observer := c.getCommitteeObserver(ssvMsg) | ||
|
||
if err := c.handleCommitteeObserverMessage(ssvMsg, observer); err != nil { | ||
return fmt.Errorf("failed to handle committee observer message: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *controller) handleNonCommitteeMessages(msg *queue.SSVMessage, ncv *committeeObserver) error { | ||
func (c *controller) getCommitteeObserver(ssvMsg *queue.SSVMessage) *committeeObserver { | ||
c.committeesObserversMutex.Lock() | ||
defer c.committeesObserversMutex.Unlock() | ||
|
||
if msg.MsgType == spectypes.SSVConsensusMsgType { | ||
// Check if the observer already exists | ||
existingObserver := c.committeesObservers.Get(ssvMsg.GetID()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without the mutex, there’s a risk of race conditions where multiple goroutines might attempt to create and set the same observer simultaneously. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be also solved by the cache's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right that |
||
if existingObserver != nil { | ||
return existingObserver.Value() | ||
} | ||
|
||
// Create a new committee observer if it doesn't exist | ||
committeeObserverOptions := validator.CommitteeObserverOptions{ | ||
Logger: c.logger, | ||
NetworkConfig: c.networkConfig, | ||
ValidatorStore: c.validatorStore, | ||
Network: c.validatorOptions.Network, | ||
Storage: c.validatorOptions.Storage, | ||
FullNode: c.validatorOptions.FullNode, | ||
Operator: c.validatorOptions.Operator, | ||
OperatorSigner: c.validatorOptions.OperatorSigner, | ||
NewDecidedHandler: c.validatorOptions.NewDecidedHandler, | ||
AttesterRoots: c.attesterRoots, | ||
SyncCommRoots: c.syncCommRoots, | ||
DomainCache: c.domainCache, | ||
} | ||
newObserver := &committeeObserver{ | ||
CommitteeObserver: validator.NewCommitteeObserver(convert.MessageID(ssvMsg.MsgID), committeeObserverOptions), | ||
} | ||
|
||
c.committeesObservers.Set( | ||
ssvMsg.GetID(), | ||
newObserver, | ||
c.calculateObserverTTL(ssvMsg.MsgID.GetRoleType()), | ||
) | ||
|
||
return newObserver | ||
} | ||
|
||
func (c *controller) calculateObserverTTL(roleType spectypes.RunnerRole) time.Duration { | ||
ttlSlots := committeeObserverValidatorTTLs[roleType] | ||
return time.Duration(ttlSlots) * c.beacon.GetBeaconNetwork().SlotDurationSec() | ||
} | ||
Comment on lines
+410
to
+413
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's probably better to "panic" than "default to 0" in case there is a new type introduced and we forgot to add it to |
||
|
||
func (c *controller) handleCommitteeObserverMessage(msg *queue.SSVMessage, observer *committeeObserver) error { | ||
observer.Lock() | ||
defer observer.Unlock() | ||
|
||
switch msg.GetType() { | ||
case spectypes.SSVConsensusMsgType: | ||
// Process proposal messages for committee consensus only to get the roots | ||
if msg.MsgID.GetRoleType() != spectypes.RoleCommittee { | ||
return nil | ||
|
@@ -509,24 +527,17 @@ func (c *controller) handleNonCommitteeMessages(msg *queue.SSVMessage, ncv *comm | |
return nil | ||
} | ||
|
||
return ncv.OnProposalMsg(msg) | ||
} else if msg.MsgType == spectypes.SSVPartialSignatureMsgType { | ||
return observer.OnProposalMsg(msg) | ||
case spectypes.SSVPartialSignatureMsgType: | ||
pSigMessages := &spectypes.PartialSignatureMessages{} | ||
if err := pSigMessages.Decode(msg.SignedSSVMessage.SSVMessage.GetData()); err != nil { | ||
return err | ||
return fmt.Errorf("failed to decode partial signature messages: %w", err) | ||
} | ||
|
||
return ncv.ProcessMessage(msg) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *controller) getNonCommitteeValidators(messageId spectypes.MessageID) *committeeObserver { | ||
item := c.committeesObservers.Get(messageId) | ||
if item != nil { | ||
return item.Value() | ||
return observer.ProcessMessage(msg) | ||
default: | ||
return nil | ||
} | ||
return nil | ||
} | ||
|
||
// StartValidators loads all persisted shares and setup the corresponding validators | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe clarify that's denominated in slots (and not seconds or something)