diff --git a/operator/validator/controller.go b/operator/validator/controller.go index 0b088c9ad9..b8b3654345 100644 --- a/operator/validator/controller.go +++ b/operator/validator/controller.go @@ -175,7 +175,7 @@ type controller struct { historySyncBatchSize int messageValidator validation.MessageValidator - // nonCommittees is a cache of initialized committeeObserver instances + // committeeObservers is a cache of initialized committeeObserver instances committeesObservers *ttlcache.Cache[spectypes.MessageID, *committeeObserver] committeesObserversMutex sync.Mutex attesterRoots *ttlcache.Cache[phase0.Root, struct{}] @@ -274,7 +274,7 @@ func NewController(logger *zap.Logger, options ControllerOptions) Controller { messageValidator: options.MessageValidator, } - // Start automatic expired item deletion in nonCommitteeValidators. + // Start automatic expired item deletion in committeeObserverValidators. go ctrl.committeesObservers.Start() // Delete old root and domain entries. go ctrl.attesterRoots.Start() @@ -350,57 +350,74 @@ func (c *controller) handleRouterMessages() { } } -var nonCommitteeValidatorTTLs = map[spectypes.RunnerRole]int{ - spectypes.RoleCommittee: 64, - spectypes.RoleProposer: 4, - spectypes.RoleAggregator: 4, - //spectypes.BNRoleSyncCommittee: 4, +var committeeObserverValidatorTTLs = map[spectypes.RunnerRole]int{ + spectypes.RoleCommittee: 64, + spectypes.RoleProposer: 4, + spectypes.RoleAggregator: 4, spectypes.RoleSyncCommitteeContribution: 4, } func (c *controller) handleWorkerMessages(msg network.DecodedSSVMessage) error { - var ncv *committeeObserver ssvMsg := msg.(*queue.SSVMessage) - item := c.getNonCommitteeValidators(ssvMsg.GetID()) - if item == nil { - committeeObserverOptions := validator.CommitteeObserverOptions{ - Logger: c.logger, - NetworkConfig: c.networkConfig, - ValidatorStore: c.validatorStore, - Network: c.validatorOptions.Network, - Storage: c.validatorOptions.Storage, - FullNode: c.validatorOptions.FullNode, - Operator: c.validatorOptions.Operator, - OperatorSigner: c.validatorOptions.OperatorSigner, - NewDecidedHandler: c.validatorOptions.NewDecidedHandler, - AttesterRoots: c.attesterRoots, - SyncCommRoots: c.syncCommRoots, - DomainCache: c.domainCache, - } - ncv = &committeeObserver{ - CommitteeObserver: validator.NewCommitteeObserver(ssvMsg.GetID(), committeeObserverOptions), - } - ttlSlots := nonCommitteeValidatorTTLs[ssvMsg.MsgID.GetRoleType()] - c.committeesObservers.Set( - ssvMsg.GetID(), - ncv, - time.Duration(ttlSlots)*c.beacon.GetBeaconNetwork().SlotDurationSec(), - ) - } else { - ncv = item - } - if err := c.handleNonCommitteeMessages(ssvMsg, ncv); err != nil { - return err + observer := c.getCommitteeObserver(ssvMsg) + + if err := c.handleCommitteeObserverMessage(ssvMsg, observer); err != nil { + return fmt.Errorf("failed to handle committee observer message: %w", err) } + return nil } -func (c *controller) handleNonCommitteeMessages(msg *queue.SSVMessage, ncv *committeeObserver) error { +func (c *controller) getCommitteeObserver(ssvMsg *queue.SSVMessage) *committeeObserver { c.committeesObserversMutex.Lock() defer c.committeesObserversMutex.Unlock() - if msg.MsgType == spectypes.SSVConsensusMsgType { + // Check if the observer already exists + existingObserver := c.committeesObservers.Get(ssvMsg.GetID()) + if existingObserver != nil { + return existingObserver.Value() + } + + // Create a new committee observer if it doesn't exist + committeeObserverOptions := validator.CommitteeObserverOptions{ + Logger: c.logger, + NetworkConfig: c.networkConfig, + ValidatorStore: c.validatorStore, + Network: c.validatorOptions.Network, + Storage: c.validatorOptions.Storage, + FullNode: c.validatorOptions.FullNode, + Operator: c.validatorOptions.Operator, + OperatorSigner: c.validatorOptions.OperatorSigner, + NewDecidedHandler: c.validatorOptions.NewDecidedHandler, + AttesterRoots: c.attesterRoots, + SyncCommRoots: c.syncCommRoots, + DomainCache: c.domainCache, + } + newObserver := &committeeObserver{ + CommitteeObserver: validator.NewCommitteeObserver(ssvMsg.GetID(), committeeObserverOptions), + } + + c.committeesObservers.Set( + ssvMsg.GetID(), + newObserver, + c.calculateObserverTTL(ssvMsg.MsgID.GetRoleType()), + ) + + return newObserver +} + +func (c *controller) calculateObserverTTL(roleType spectypes.RunnerRole) time.Duration { + ttlSlots := committeeObserverValidatorTTLs[roleType] + return time.Duration(ttlSlots) * c.beacon.GetBeaconNetwork().SlotDurationSec() +} + +func (c *controller) handleCommitteeObserverMessage(msg *queue.SSVMessage, observer *committeeObserver) error { + observer.Lock() + defer observer.Unlock() + + switch msg.GetType() { + case spectypes.SSVConsensusMsgType: // Process proposal messages for committee consensus only to get the roots if msg.MsgID.GetRoleType() != spectypes.RoleCommittee { return nil @@ -411,24 +428,17 @@ func (c *controller) handleNonCommitteeMessages(msg *queue.SSVMessage, ncv *comm return nil } - return ncv.OnProposalMsg(msg) - } else if msg.MsgType == spectypes.SSVPartialSignatureMsgType { + return observer.OnProposalMsg(msg) + case spectypes.SSVPartialSignatureMsgType: pSigMessages := &spectypes.PartialSignatureMessages{} if err := pSigMessages.Decode(msg.SignedSSVMessage.SSVMessage.GetData()); err != nil { - return err + return fmt.Errorf("failed to decode partial signature messages: %w", err) } - return ncv.ProcessMessage(msg) - } - return nil -} - -func (c *controller) getNonCommitteeValidators(messageId spectypes.MessageID) *committeeObserver { - item := c.committeesObservers.Get(messageId) - if item != nil { - return item.Value() + return observer.ProcessMessage(msg) + default: + return nil } - return nil } // StartValidators loads all persisted shares and setup the corresponding validators diff --git a/protocol/v2/ssv/validator/non_committee_validator.go b/protocol/v2/ssv/validator/non_committee_validator.go index 6902e87fd5..ddecd96d74 100644 --- a/protocol/v2/ssv/validator/non_committee_validator.go +++ b/protocol/v2/ssv/validator/non_committee_validator.go @@ -79,10 +79,10 @@ func NewCommitteeObserver(msgID spectypes.MessageID, opts CommitteeObserverOptio return co } -func (ncv *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error { +func (o *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error { role := msg.MsgID.GetRoleType() - logger := ncv.logger.With(fields.Role(role)) + logger := o.logger.With(fields.Role(role)) if role == spectypes.RoleCommittee { cid := spectypes.CommitteeID(msg.GetID().GetDutyExecutorID()[16:]) logger = logger.With(fields.CommitteeID(cid)) @@ -106,7 +106,7 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error { return fmt.Errorf("got invalid message %w", err) } - quorums, err := ncv.processMessage(partialSigMessages) + quorums, err := o.processMessage(partialSigMessages) if err != nil { return fmt.Errorf("could not process SignedPartialSignatureMessage %w", err) } @@ -121,24 +121,24 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error { operatorIDs = append(operatorIDs, strconv.FormatUint(share, 10)) } - validator, exists := ncv.ValidatorStore.ValidatorByIndex(key.ValidatorIndex) + validator, exists := o.ValidatorStore.ValidatorByIndex(key.ValidatorIndex) if !exists { return fmt.Errorf("could not find share for validator with index %d", key.ValidatorIndex) } - beaconRoles := ncv.getBeaconRoles(msg, key.Root) + beaconRoles := o.getBeaconRoles(msg, key.Root) if len(beaconRoles) == 0 { logger.Warn("no roles found for quorum root", zap.Uint64("validator_index", uint64(key.ValidatorIndex)), fields.Validator(validator.ValidatorPubKey[:]), zap.String("signers", strings.Join(operatorIDs, ", ")), fields.BlockRoot(key.Root), - zap.String("qbft_ctrl_identifier", hex.EncodeToString(ncv.msgID[:])), + zap.String("qbft_ctrl_identifier", hex.EncodeToString(o.msgID[:])), ) } for _, beaconRole := range beaconRoles { - roleStorage := ncv.Storage.Get(beaconRole) + roleStorage := o.Storage.Get(beaconRole) if roleStorage == nil { return fmt.Errorf("role storage doesn't exist: %v", beaconRole) } @@ -160,7 +160,7 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error { fields.BlockRoot(key.Root), ) - if ncv.newDecidedHandler != nil { + if o.newDecidedHandler != nil { p := qbftstorage.Participation{ ParticipantsRangeEntry: qbftstorage.ParticipantsRangeEntry{ Slot: slot, @@ -170,7 +170,7 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error { PubKey: validator.ValidatorPubKey, } - ncv.newDecidedHandler(p) + o.newDecidedHandler(p) } } } @@ -178,11 +178,11 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error { return nil } -func (ncv *CommitteeObserver) getBeaconRoles(msg *queue.SSVMessage, root phase0.Root) []spectypes.BeaconRole { +func (o *CommitteeObserver) getBeaconRoles(msg *queue.SSVMessage, root phase0.Root) []spectypes.BeaconRole { switch msg.MsgID.GetRoleType() { case spectypes.RoleCommittee: - attester := ncv.attesterRoots.Get(root) - syncCommittee := ncv.syncCommRoots.Get(root) + attester := o.attesterRoots.Get(root) + syncCommittee := o.syncCommRoots.Get(root) switch { case attester != nil && syncCommittee != nil: @@ -205,29 +205,37 @@ func (ncv *CommitteeObserver) getBeaconRoles(msg *queue.SSVMessage, root phase0. case spectypes.RoleVoluntaryExit: return []spectypes.BeaconRole{spectypes.BNRoleVoluntaryExit} } - return nil } +// nonCommitteeInstanceContainerCapacity returns the capacity of InstanceContainer for non-committee validators +func nonCommitteeInstanceContainerCapacity(fullNode bool) int { + if fullNode { + // Helps full nodes reduce + return 2 + } + return 1 +} + type validatorIndexAndRoot struct { ValidatorIndex phase0.ValidatorIndex Root phase0.Root } -func (ncv *CommitteeObserver) processMessage( +func (o *CommitteeObserver) processMessage( signedMsg *spectypes.PartialSignatureMessages, ) (map[validatorIndexAndRoot][]spectypes.OperatorID, error) { quorums := make(map[validatorIndexAndRoot][]spectypes.OperatorID) currentSlot := signedMsg.Slot - slotValidators, exist := ncv.postConsensusContainer[currentSlot] + slotValidators, exist := o.postConsensusContainer[currentSlot] if !exist { slotValidators = make(map[phase0.ValidatorIndex]*ssv.PartialSigContainer) - ncv.postConsensusContainer[signedMsg.Slot] = slotValidators + o.postConsensusContainer[signedMsg.Slot] = slotValidators } for _, msg := range signedMsg.Messages { - validator, exists := ncv.ValidatorStore.ValidatorByIndex(msg.ValidatorIndex) + validator, exists := o.ValidatorStore.ValidatorByIndex(msg.ValidatorIndex) if !exists { return nil, fmt.Errorf("could not find share for validator with index %d", msg.ValidatorIndex) } @@ -237,7 +245,7 @@ func (ncv *CommitteeObserver) processMessage( slotValidators[msg.ValidatorIndex] = container } if container.HasSignature(msg.ValidatorIndex, msg.Signer, msg.SigningRoot) { - ncv.resolveDuplicateSignature(container, msg, validator) + o.resolveDuplicateSignature(container, msg, validator) } else { container.AddSignature(msg) } @@ -258,12 +266,12 @@ func (ncv *CommitteeObserver) processMessage( } // Remove older slots container - if len(ncv.postConsensusContainer) >= ncv.postConsensusContainerCapacity() { + if len(o.postConsensusContainer) >= o.postConsensusContainerCapacity() { // #nosec G115 -- capacity must be low epoch not to cause overflow - thresholdSlot := currentSlot - phase0.Slot(ncv.postConsensusContainerCapacity()) - for slot := range ncv.postConsensusContainer { + thresholdSlot := currentSlot - phase0.Slot(o.postConsensusContainerCapacity()) + for slot := range o.postConsensusContainer { if slot < thresholdSlot { - delete(ncv.postConsensusContainer, slot) + delete(o.postConsensusContainer, slot) } } } @@ -273,11 +281,11 @@ func (ncv *CommitteeObserver) processMessage( // Stores the container's existing signature or the new one, depending on their validity. If both are invalid, remove the existing one // copied from BaseRunner -func (ncv *CommitteeObserver) resolveDuplicateSignature(container *ssv.PartialSigContainer, msg *spectypes.PartialSignatureMessage, share *ssvtypes.SSVShare) { +func (o *CommitteeObserver) resolveDuplicateSignature(container *ssv.PartialSigContainer, msg *spectypes.PartialSignatureMessage, share *ssvtypes.SSVShare) { // Check previous signature validity previousSignature, err := container.GetSignature(msg.ValidatorIndex, msg.Signer, msg.SigningRoot) if err == nil { - err = ncv.verifyBeaconPartialSignature(msg.Signer, previousSignature, msg.SigningRoot, share) + err = o.verifyBeaconPartialSignature(msg.Signer, previousSignature, msg.SigningRoot, share) if err == nil { // Keep the previous sigature since it's correct return @@ -288,14 +296,14 @@ func (ncv *CommitteeObserver) resolveDuplicateSignature(container *ssv.PartialSi container.Remove(msg.ValidatorIndex, msg.Signer, msg.SigningRoot) // Hold the new signature, if correct - err = ncv.verifyBeaconPartialSignature(msg.Signer, msg.PartialSignature, msg.SigningRoot, share) + err = o.verifyBeaconPartialSignature(msg.Signer, msg.PartialSignature, msg.SigningRoot, share) if err == nil { container.AddSignature(msg) } } // copied from BaseRunner -func (ncv *CommitteeObserver) verifyBeaconPartialSignature(signer uint64, signature spectypes.Signature, root phase0.Root, share *ssvtypes.SSVShare) error { +func (o *CommitteeObserver) verifyBeaconPartialSignature(signer uint64, signature spectypes.Signature, root phase0.Root, share *ssvtypes.SSVShare) error { for _, n := range share.Committee { if n.Signer == signer { pk, err := ssvtypes.DeserializeBLSPublicKey(n.SharePubKey) @@ -317,33 +325,33 @@ func (ncv *CommitteeObserver) verifyBeaconPartialSignature(signer uint64, signat return fmt.Errorf("unknown signer") } -func (ncv *CommitteeObserver) OnProposalMsg(msg *queue.SSVMessage) error { +func (o *CommitteeObserver) OnProposalMsg(msg *queue.SSVMessage) error { beaconVote := &spectypes.BeaconVote{} if err := beaconVote.Decode(msg.SignedSSVMessage.FullData); err != nil { - ncv.logger.Debug("❗ failed to get beacon vote data", zap.Error(err)) + o.logger.Debug("❗ failed to get beacon vote data", zap.Error(err)) return err } qbftMsg, ok := msg.Body.(*specqbft.Message) if !ok { - ncv.logger.Fatal("unreachable: OnProposalMsg must be called only on qbft messages") + o.logger.Fatal("unreachable: OnProposalMsg must be called only on qbft messages") } - epoch := ncv.beaconNetwork.EstimatedEpochAtSlot(phase0.Slot(qbftMsg.Height)) + epoch := o.beaconNetwork.EstimatedEpochAtSlot(phase0.Slot(qbftMsg.Height)) - if err := ncv.saveAttesterRoots(epoch, beaconVote, qbftMsg); err != nil { + if err := o.saveAttesterRoots(epoch, beaconVote, qbftMsg); err != nil { return err } - if err := ncv.saveSyncCommRoots(epoch, beaconVote); err != nil { + if err := o.saveSyncCommRoots(epoch, beaconVote); err != nil { return err } return nil } -func (ncv *CommitteeObserver) saveAttesterRoots(epoch phase0.Epoch, beaconVote *spectypes.BeaconVote, qbftMsg *specqbft.Message) error { - attesterDomain, err := ncv.domainCache.Get(epoch, spectypes.DomainAttester) +func (o *CommitteeObserver) saveAttesterRoots(epoch phase0.Epoch, beaconVote *spectypes.BeaconVote, qbftMsg *specqbft.Message) error { + attesterDomain, err := o.domainCache.Get(epoch, spectypes.DomainAttester) if err != nil { return err } @@ -355,14 +363,14 @@ func (ncv *CommitteeObserver) saveAttesterRoots(epoch phase0.Epoch, beaconVote * return err } - ncv.attesterRoots.Set(attesterRoot, struct{}{}, ttlcache.DefaultTTL) + o.attesterRoots.Set(attesterRoot, struct{}{}, ttlcache.DefaultTTL) } return nil } -func (ncv *CommitteeObserver) saveSyncCommRoots(epoch phase0.Epoch, beaconVote *spectypes.BeaconVote) error { - syncCommDomain, err := ncv.domainCache.Get(epoch, spectypes.DomainSyncCommittee) +func (o *CommitteeObserver) saveSyncCommRoots(epoch phase0.Epoch, beaconVote *spectypes.BeaconVote) error { + syncCommDomain, err := o.domainCache.Get(epoch, spectypes.DomainSyncCommittee) if err != nil { return err } @@ -373,14 +381,14 @@ func (ncv *CommitteeObserver) saveSyncCommRoots(epoch phase0.Epoch, beaconVote * return err } - ncv.syncCommRoots.Set(syncCommitteeRoot, struct{}{}, ttlcache.DefaultTTL) + o.syncCommRoots.Set(syncCommitteeRoot, struct{}{}, ttlcache.DefaultTTL) return nil } -func (ncv *CommitteeObserver) postConsensusContainerCapacity() int { +func (o *CommitteeObserver) postConsensusContainerCapacity() int { // #nosec G115 -- slots per epoch must be low epoch not to cause overflow - return int(ncv.networkConfig.SlotsPerEpoch()) + validation.LateSlotAllowance + return int(o.networkConfig.SlotsPerEpoch()) + validation.LateSlotAllowance } func constructAttestationData(vote *spectypes.BeaconVote, slot phase0.Slot, committeeIndex phase0.CommitteeIndex) *phase0.AttestationData {