From f4a23f6baca2f0eca8935825627da82bebc3ccd9 Mon Sep 17 00:00:00 2001 From: amir-blox <83904651+amir-blox@users.noreply.github.com> Date: Wed, 24 Nov 2021 13:37:54 +0200 Subject: [PATCH 1/4] Exporter - fix decided sequence 0 (#447) * remove the check for sequence 0 otherwise the decided message with sequence 0 won't be saved * add migration for decided 0, 1 * count number of saved decided * add test for sync range --- exporter/ibft/decided_genesis_migrator.go | 48 ++++++++++++ exporter/ibft/decided_reader.go | 35 +++++---- exporter/ibft/migrator.go | 42 ++++++++++ ibft/controller/controller_sync.go | 2 +- ibft/sync/history/fetch_decided.go | 9 ++- ibft/sync/history/fetch_decided_test.go | 4 +- ibft/sync/history/fetch_highest.go | 7 +- ibft/sync/history/fetch_highest_test.go | 2 +- ibft/sync/history/history.go | 40 +++++++++- ibft/sync/history/history_test.go | 93 ++++++++++++++++++++++- 10 files changed, 252 insertions(+), 30 deletions(-) create mode 100644 exporter/ibft/decided_genesis_migrator.go create mode 100644 exporter/ibft/migrator.go diff --git a/exporter/ibft/decided_genesis_migrator.go b/exporter/ibft/decided_genesis_migrator.go new file mode 100644 index 0000000000..b1b00a77d6 --- /dev/null +++ b/exporter/ibft/decided_genesis_migrator.go @@ -0,0 +1,48 @@ +package ibft + +import ( + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// decidedGenesisMigrator +type decidedGenesisMigrator struct { + logger *zap.Logger +} + +// Migrate take care of decided messages migration +func (m *decidedGenesisMigrator) Migrate(r Reader) error { + dr, ok := r.(*decidedReader) + if !ok { + return nil + } + n, err := m.migrate(dr) + if err != nil { + return errors.Wrap(err, "could not migrate decided 0") + } + m.logger.Debug("managed to migrate decided 0", + zap.String("identifier", string(dr.identifier)), zap.Int("items", n)) + return nil +} + +// migrate performing migration for decided messages +func (m *decidedGenesisMigrator) migrate(dr *decidedReader) (int, error) { + if migrateDecided0, err := m.check(dr); err != nil { + return 0, err + } else if migrateDecided0 { + return dr.newHistorySync().StartRange(uint64(0), uint64(1)) + } + return 0, nil +} + +// check determines if the given reader should migrate +func (m *decidedGenesisMigrator) check(dr *decidedReader) (bool, error) { + _, found, err := dr.storage.GetDecided(dr.identifier, uint64(0)) + if err != nil { + return false, err + } + if found { + return false, nil + } + return true, nil +} diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go index d7f66af328..7fe3f1a127 100644 --- a/exporter/ibft/decided_reader.go +++ b/exporter/ibft/decided_reader.go @@ -65,17 +65,10 @@ func newDecidedReader(opts DecidedReaderOptions) Reader { return &r } -// sync starts to fetch best known decided message (highest sequence) from the network and sync to it. -func (r *decidedReader) sync() error { - r.logger.Debug("syncing ibft data") - // creating HistorySync and starts it - hs := history.New(r.logger, r.validatorShare.PublicKey.Serialize(), r.identifier, r.network, - r.storage, r.validateDecidedMsg) - err := hs.Start() - if err != nil { - r.logger.Error("could not sync validator's data", zap.Error(err)) - } - return err +// newHistorySync creates a new instance of history sync +func (r *decidedReader) newHistorySync() history.Syncer { + return history.New(r.logger, r.validatorShare.PublicKey.Serialize(), r.validatorShare.CommitteeSize(), r.identifier, + r.network, r.storage, r.validateDecidedMsg) } // Share returns the reader's share @@ -88,6 +81,10 @@ func (r *decidedReader) Start() error { if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil { return errors.Wrap(err, "failed to subscribe topic") } + // call migration before starting the other stuff + if err := GetMainMigrator().Migrate(r); err != nil { + r.logger.Error("could not run migration", zap.Error(err)) + } if err := tasks.Retry(func() error { if err := r.sync(); err != nil { r.logger.Error("could not sync validator", zap.Error(err)) @@ -113,6 +110,18 @@ func (r *decidedReader) Start() error { return nil } +// sync starts to fetch best known decided message (highest sequence) from the network and sync to it. +func (r *decidedReader) sync() error { + r.logger.Debug("syncing ibft data") + // creating HistorySync and starts it + hs := r.newHistorySync() + err := hs.Start() + if err != nil { + r.logger.Error("could not sync validator's data", zap.Error(err)) + } + return err +} + func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) { r.logger.Debug("listening to decided messages") for msg := range cn { @@ -124,10 +133,6 @@ func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) { logger.Debug("received invalid decided message") continue } - if msg.Message.SeqNumber == 0 { - logger.Debug("received invalid sequence") - continue - } go func(msg *proto.SignedMessage) { defer logger.Debug("done with decided msg") if saved, err := r.handleNewDecidedMessage(msg); err != nil { diff --git a/exporter/ibft/migrator.go b/exporter/ibft/migrator.go new file mode 100644 index 0000000000..5630bae2e1 --- /dev/null +++ b/exporter/ibft/migrator.go @@ -0,0 +1,42 @@ +package ibft + +import ( + "github.com/bloxapp/ssv/utils/logex" + "go.uber.org/zap" + "sync" +) + +// Migrator is an interface for migrating messages +type Migrator interface { + Migrate(r Reader) error +} + +type mainMigrator struct { + migrators []Migrator +} + +var migrator mainMigrator + +var migratorOnce sync.Once + +// GetMainMigrator returns the instance of migrator +func GetMainMigrator() Migrator { + migratorOnce.Do(func() { + logger := logex.GetLogger(zap.String("who", "migrateManager")) + migrators := []Migrator{&decidedGenesisMigrator{logger}} + migrator = mainMigrator{ + migrators: migrators, + } + }) + return &migrator +} + +// Migrate applies the existing migrators on the given reader +func (mm *mainMigrator) Migrate(r Reader) error { + for _, migrator := range mm.migrators { + if err := migrator.Migrate(r); err != nil { + return err + } + } + return nil +} diff --git a/ibft/controller/controller_sync.go b/ibft/controller/controller_sync.go index 8ffe51c6b1..03cc04af15 100644 --- a/ibft/controller/controller_sync.go +++ b/ibft/controller/controller_sync.go @@ -62,7 +62,7 @@ func (i *Controller) SyncIBFT() error { func (i *Controller) syncIBFT() error { // TODO: use controller context once added return tasks.RetryWithContext(context.Background(), func() error { - s := history.New(i.logger, i.ValidatorShare.PublicKey.Serialize(), i.GetIdentifier(), i.network, i.ibftStorage, i.ValidateDecidedMsg) + s := history.New(i.logger, i.ValidatorShare.PublicKey.Serialize(), i.ValidatorShare.CommitteeSize(), i.GetIdentifier(), i.network, i.ibftStorage, i.ValidateDecidedMsg) err := s.Start() if err != nil { return errors.Wrap(err, "history sync failed") diff --git a/ibft/sync/history/fetch_decided.go b/ibft/sync/history/fetch_decided.go index c52364d0cc..b23a2e0c49 100644 --- a/ibft/sync/history/fetch_decided.go +++ b/ibft/sync/history/fetch_decided.go @@ -10,17 +10,17 @@ import ( // FetchValidateAndSaveInstances fetches, validates and saves decided messages from the P2P network. // Range is start to end seq including -func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, endSeq uint64) (highestSaved *proto.SignedMessage, err error) { +func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, endSeq uint64) (highestSaved *proto.SignedMessage, n int, err error) { failCount := 0 start := startSeq done := false var latestError error for { if failCount == 5 { - return highestSaved, latestError + return highestSaved, n, latestError } if done { - return highestSaved, nil + return highestSaved, n, nil } // conform to max batch @@ -70,8 +70,9 @@ func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, e // save if err := s.ibftStorage.SaveDecided(msg); err != nil { - return highestSaved, err + return highestSaved, n, err } + n++ // set highest if highestSaved == nil || highestSaved.Message.SeqNumber < msg.Message.SeqNumber { diff --git a/ibft/sync/history/fetch_decided_test.go b/ibft/sync/history/fetch_decided_test.go index 746a6c220b..d17de0e4fd 100644 --- a/ibft/sync/history/fetch_decided_test.go +++ b/ibft/sync/history/fetch_decided_test.go @@ -117,10 +117,10 @@ func TestFetchDecided(t *testing.T) { require.NoError(t, err) storage := collections.NewIbft(db, logger, "attestation") network := sync.NewTestNetwork(t, test.peers, int(test.rangeParams[2]), nil, nil, test.decidedArr, nil, nil) - s := New(logger, test.validatorPk, test.identifier, network, &storage, func(msg *proto.SignedMessage) error { + s := New(logger, test.validatorPk, 4, test.identifier, network, &storage, func(msg *proto.SignedMessage) error { return nil }) - res, err := s.fetchValidateAndSaveInstances(test.fromPeer, test.rangeParams[0], test.rangeParams[1]) + res, _, err := s.fetchValidateAndSaveInstances(test.fromPeer, test.rangeParams[0], test.rangeParams[1]) if len(test.expectedError) > 0 { require.EqualError(t, err, test.expectedError) diff --git a/ibft/sync/history/fetch_highest.go b/ibft/sync/history/fetch_highest.go index 469f74e490..4d495f6f5d 100644 --- a/ibft/sync/history/fetch_highest.go +++ b/ibft/sync/history/fetch_highest.go @@ -3,7 +3,7 @@ package history import ( "encoding/hex" "github.com/bloxapp/ssv/ibft/proto" - sync2 "github.com/bloxapp/ssv/ibft/sync" + ibftsync "github.com/bloxapp/ssv/ibft/sync" "github.com/bloxapp/ssv/network" "github.com/bloxapp/ssv/storage/kv" "github.com/pkg/errors" @@ -13,9 +13,8 @@ import ( // findHighestInstance returns the highest found decided signed message and the peer it was received from func (s *Sync) findHighestInstance() (*proto.SignedMessage, string, error) { - // pick up to 4 peers - // TODO - why 4? should be set as param? - usedPeers, err := sync2.GetPeers(s.network, s.publicKey, 4) + // pick up to committee peers + usedPeers, err := ibftsync.GetPeers(s.network, s.publicKey, s.committeeSize) if err != nil { return nil, "", err } diff --git a/ibft/sync/history/fetch_highest_test.go b/ibft/sync/history/fetch_highest_test.go index 60613fb331..afba6ceb6b 100644 --- a/ibft/sync/history/fetch_highest_test.go +++ b/ibft/sync/history/fetch_highest_test.go @@ -186,7 +186,7 @@ func TestFindHighest(t *testing.T) { return nil } } - s := New(zap.L(), test.valdiatorPK, test.identifier, sync.NewTestNetwork(t, test.peers, 100, + s := New(zap.L(), test.valdiatorPK, 4, test.identifier, sync.NewTestNetwork(t, test.peers, 100, test.highestMap, test.errorMap, nil, nil, nil), nil, test.validateMsg) res, _, err := s.findHighestInstance() diff --git a/ibft/sync/history/history.go b/ibft/sync/history/history.go index 4daff6a665..6179fdeaad 100644 --- a/ibft/sync/history/history.go +++ b/ibft/sync/history/history.go @@ -9,6 +9,12 @@ import ( "time" ) +// Syncer is the interface for history sync +type Syncer interface { + Start() error + StartRange(from, to uint64) (int, error) +} + // Sync is responsible for syncing and iBFT instance when needed by // fetching decided messages from the network type Sync struct { @@ -20,10 +26,11 @@ type Sync struct { identifier []byte // paginationMaxSize is the max number of returned elements in a single response paginationMaxSize uint64 + committeeSize int } // New returns a new instance of Sync -func New(logger *zap.Logger, publicKey []byte, identifier []byte, network network.Network, ibftStorage collections.Iibft, validateDecidedMsgF func(msg *proto.SignedMessage) error) *Sync { +func New(logger *zap.Logger, publicKey []byte, committeeSize int, identifier []byte, network network.Network, ibftStorage collections.Iibft, validateDecidedMsgF func(msg *proto.SignedMessage) error) *Sync { return &Sync{ logger: logger.With(zap.String("sync", "history")), publicKey: publicKey, @@ -32,6 +39,7 @@ func New(logger *zap.Logger, publicKey []byte, identifier []byte, network networ validateDecidedMsgF: validateDecidedMsgF, ibftStorage: ibftStorage, paginationMaxSize: network.MaxBatch(), + committeeSize: committeeSize, } } @@ -69,7 +77,7 @@ func (s *Sync) Start() error { } // fetch, validate and save missing data - highestSaved, err := s.fetchValidateAndSaveInstances(fromPeer, syncStartSeqNumber, remoteHighest.Message.SeqNumber) + highestSaved, _, err := s.fetchValidateAndSaveInstances(fromPeer, syncStartSeqNumber, remoteHighest.Message.SeqNumber) if err != nil { return errors.Wrap(err, "could not fetch decided by range during sync") } @@ -84,3 +92,31 @@ func (s *Sync) Start() error { s.logger.Info("finished syncing", zap.Uint64("highest seq", highestSaved.Message.SeqNumber), zap.String("duration", time.Since(start).String())) return nil } + +// StartRange starts to sync old messages in a specific range +// first it tries to find a synced peer and then ask a specific range of decided messages +func (s *Sync) StartRange(from, to uint64) (int, error) { + var n int + start := time.Now() + // fetch remote highest + remoteHighest, fromPeer, err := s.findHighestInstance() + if err != nil { + return n, errors.Wrap(err, "could not fetch highest instance during sync") + } + if remoteHighest == nil { // could not find highest, there isn't one + s.logger.Info("node is synced: could not find any peer with highest decided, assuming sequence number is 0", + zap.String("duration", time.Since(start).String())) + return n, nil + } + if remoteHighest.Message.SeqNumber < from { + return n, errors.New("range is out of decided sequence boundaries") + } + // fetch, validate and save missing data + _, n, err = s.fetchValidateAndSaveInstances(fromPeer, from, to) + if err != nil { + return n, errors.Wrap(err, "could not fetch decided by range during sync") + } + s.logger.Info("finished syncing in range", zap.Uint64("from", from), zap.Uint64("to", to), + zap.String("duration", time.Since(start).String()), zap.Int("items", n)) + return n, nil +} diff --git a/ibft/sync/history/history_test.go b/ibft/sync/history/history_test.go index b922221608..45e32dc8bd 100644 --- a/ibft/sync/history/history_test.go +++ b/ibft/sync/history/history_test.go @@ -182,7 +182,7 @@ func TestSync(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { storage := sync.TestingIbftStorage(t) - s := New(zap.L(), test.valdiatorPK, test.identifier, sync.NewTestNetwork(t, test.peers, 100, test.highestMap, test.errorMap, test.decidedArrMap, nil, nil), &storage, func(msg *proto.SignedMessage) error { + s := New(zap.L(), test.valdiatorPK, 4, test.identifier, sync.NewTestNetwork(t, test.peers, 100, test.highestMap, test.errorMap, test.decidedArrMap, nil, nil), &storage, func(msg *proto.SignedMessage) error { return nil }) err := s.Start() @@ -211,3 +211,94 @@ func TestSync(t *testing.T) { }) } } + +func TestSync_StartRange(t *testing.T) { + sks, _ := sync.GenerateNodes(4) + decided0 := sync.MultiSignMsg(t, []uint64{1, 2, 3}, sks, &proto.Message{ + Type: proto.RoundState_Decided, + Round: 1, + Lambda: []byte("lambda"), + SeqNumber: 0, + }) + decided1 := sync.MultiSignMsg(t, []uint64{1, 2, 3}, sks, &proto.Message{ + Type: proto.RoundState_Decided, + Round: 1, + Lambda: []byte("lambda"), + SeqNumber: 1, + }) + decided2 := sync.MultiSignMsg(t, []uint64{1, 2, 3}, sks, &proto.Message{ + Type: proto.RoundState_Decided, + Round: 1, + Lambda: []byte("lambda"), + SeqNumber: 2, + }) + + tests := []struct { + name string + valdiatorPK []byte + identifier []byte + peers []string + highestMap map[string]*proto.SignedMessage + decidedArrMap map[string][]*proto.SignedMessage + errorMap map[string]error + rangeFrom uint64 + rangeTo uint64 + expectedSaved int + expectedError string + }{ + { + "sync decided in range", + []byte{1, 2, 3, 4}, + []byte("lambda"), + []string{"2"}, + map[string]*proto.SignedMessage{ + "2": decided2, + "3": decided2, + }, + map[string][]*proto.SignedMessage{ + "2": {decided0, decided1, decided2}, + "3": {decided0, decided1, decided2}, + }, + nil, + uint64(0), + uint64(1), + 2, + "", + }, + { + "sync decided out of range", + []byte{1, 2, 3, 4}, + []byte("lambda"), + []string{"2"}, + map[string]*proto.SignedMessage{ + "2": decided2, + "3": decided2, + }, + map[string][]*proto.SignedMessage{ + "2": {decided0, decided1, decided2}, + "3": {decided0, decided1, decided2}, + }, + nil, + uint64(4), + uint64(8), + 0, + "range is out of decided sequence boundaries", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + storage := sync.TestingIbftStorage(t) + s := New(zap.L(), test.valdiatorPK, 4, test.identifier, sync.NewTestNetwork(t, test.peers, 100, test.highestMap, test.errorMap, test.decidedArrMap, nil, nil), &storage, func(msg *proto.SignedMessage) error { + return nil + }) + n, err := s.StartRange(test.rangeFrom, test.rangeTo) + if len(test.expectedError) > 0 { + require.EqualError(t, err, test.expectedError) + } else { + require.NoError(t, err) + } + require.Equal(t, test.expectedSaved, n) + }) + } +} From d8f386a18362313f8e89a6b114de82824ebcfc9b Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 24 Nov 2021 11:44:45 +0000 Subject: [PATCH 2/4] update code coverage badge --- docs/resources/cov-badge.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/resources/cov-badge.svg b/docs/resources/cov-badge.svg index 86702abd15..2a91710a37 100644 --- a/docs/resources/cov-badge.svg +++ b/docs/resources/cov-badge.svg @@ -1 +1 @@ -coverage: 58.5%coverage58.5% \ No newline at end of file +coverage: 58.7%coverage58.7% \ No newline at end of file From 3cf57dcdd66d43c6f7e8b6df66aa17d9cd81a9ef Mon Sep 17 00:00:00 2001 From: nivBlox <47101796+nivBlox@users.noreply.github.com> Date: Wed, 24 Nov 2021 17:36:43 +0200 Subject: [PATCH 3/4] After Instance Panic Fix (#448) * add validation for res and error * ci points to instance-nil-pointer * ci prod points to instance-nil-pointer --- .gitlab-ci.yml | 4 ++-- ibft/controller/controller_running_instance.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 45c835e024..7487ea86a4 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -75,7 +75,7 @@ Build prod Docker image: - docker tag $IMAGE_NAME:$CI_BUILD_REF $DOCKER_REPO_INFRA_PROD:$CI_BUILD_REF - $DOCKER_LOGIN_TO_INFRA_PROD_REPO && docker push $DOCKER_REPO_INFRA_PROD:$CI_BUILD_REF only: - - main + - instance-nil-pointer Deploy ssv exporter to blox-infra-prod cluster: @@ -104,4 +104,4 @@ Deploy ssv-nodes to blox-infra-prod cluster: - mv ./kubectl /usr/bin/kubectl - .k8/scripts/deploy-ssv-nodes-yamls-on-k8s.sh $DOCKER_REPO_INFRA_PROD $CI_BUILD_REF ssv $APP_REPLICAS_INFRA_PROD blox-infra-prod kubernetes-admin@blox-infra-prod ssv.network $K8S_API_VERSION $PROD_HEALTH_CHECK_IMAGE only: - - main + - instance-nil-pointer diff --git a/ibft/controller/controller_running_instance.go b/ibft/controller/controller_running_instance.go index 992ed03c7a..6bcf0088e1 100644 --- a/ibft/controller/controller_running_instance.go +++ b/ibft/controller/controller_running_instance.go @@ -77,15 +77,15 @@ instanceLoop: i.currentInstance = nil i.logger.Debug("iBFT instance result loop stopped") - i.afterInstance(seq, retRes) + i.afterInstance(seq, retRes, err) return retRes, err } // afterInstance is triggered after the instance was finished -func (i *Controller) afterInstance(seq uint64, res *ibft.InstanceResult) { +func (i *Controller) afterInstance(seq uint64, res *ibft.InstanceResult, err error) { // if instance was decided -> wait for late commit messages - if res.Decided { + if err != nil && res != nil && res.Decided { go i.listenToLateCommitMsgs(i.Identifier[:], seq) } else { i.msgQueue.PurgeIndexedMessages(msgqueue.IBFTMessageIndexKey(i.Identifier[:], seq)) From 8829e672a632b2533f405b229732a4ebd3e7acc5 Mon Sep 17 00:00:00 2001 From: nivBlox Date: Wed, 24 Nov 2021 17:38:21 +0200 Subject: [PATCH 4/4] ci fix --- .gitlab-ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7487ea86a4..45c835e024 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -75,7 +75,7 @@ Build prod Docker image: - docker tag $IMAGE_NAME:$CI_BUILD_REF $DOCKER_REPO_INFRA_PROD:$CI_BUILD_REF - $DOCKER_LOGIN_TO_INFRA_PROD_REPO && docker push $DOCKER_REPO_INFRA_PROD:$CI_BUILD_REF only: - - instance-nil-pointer + - main Deploy ssv exporter to blox-infra-prod cluster: @@ -104,4 +104,4 @@ Deploy ssv-nodes to blox-infra-prod cluster: - mv ./kubectl /usr/bin/kubectl - .k8/scripts/deploy-ssv-nodes-yamls-on-k8s.sh $DOCKER_REPO_INFRA_PROD $CI_BUILD_REF ssv $APP_REPLICAS_INFRA_PROD blox-infra-prod kubernetes-admin@blox-infra-prod ssv.network $K8S_API_VERSION $PROD_HEALTH_CHECK_IMAGE only: - - instance-nil-pointer + - main