diff --git a/docs/resources/cov-badge.svg b/docs/resources/cov-badge.svg
index 86702abd15..2a91710a37 100644
--- a/docs/resources/cov-badge.svg
+++ b/docs/resources/cov-badge.svg
@@ -1 +1 @@
-
\ No newline at end of file
+
\ No newline at end of file
diff --git a/exporter/ibft/decided_genesis_migrator.go b/exporter/ibft/decided_genesis_migrator.go
new file mode 100644
index 0000000000..b1b00a77d6
--- /dev/null
+++ b/exporter/ibft/decided_genesis_migrator.go
@@ -0,0 +1,48 @@
+package ibft
+
+import (
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+)
+
+// decidedGenesisMigrator
+type decidedGenesisMigrator struct {
+ logger *zap.Logger
+}
+
+// Migrate take care of decided messages migration
+func (m *decidedGenesisMigrator) Migrate(r Reader) error {
+ dr, ok := r.(*decidedReader)
+ if !ok {
+ return nil
+ }
+ n, err := m.migrate(dr)
+ if err != nil {
+ return errors.Wrap(err, "could not migrate decided 0")
+ }
+ m.logger.Debug("managed to migrate decided 0",
+ zap.String("identifier", string(dr.identifier)), zap.Int("items", n))
+ return nil
+}
+
+// migrate performing migration for decided messages
+func (m *decidedGenesisMigrator) migrate(dr *decidedReader) (int, error) {
+ if migrateDecided0, err := m.check(dr); err != nil {
+ return 0, err
+ } else if migrateDecided0 {
+ return dr.newHistorySync().StartRange(uint64(0), uint64(1))
+ }
+ return 0, nil
+}
+
+// check determines if the given reader should migrate
+func (m *decidedGenesisMigrator) check(dr *decidedReader) (bool, error) {
+ _, found, err := dr.storage.GetDecided(dr.identifier, uint64(0))
+ if err != nil {
+ return false, err
+ }
+ if found {
+ return false, nil
+ }
+ return true, nil
+}
diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go
index d7f66af328..7fe3f1a127 100644
--- a/exporter/ibft/decided_reader.go
+++ b/exporter/ibft/decided_reader.go
@@ -65,17 +65,10 @@ func newDecidedReader(opts DecidedReaderOptions) Reader {
return &r
}
-// sync starts to fetch best known decided message (highest sequence) from the network and sync to it.
-func (r *decidedReader) sync() error {
- r.logger.Debug("syncing ibft data")
- // creating HistorySync and starts it
- hs := history.New(r.logger, r.validatorShare.PublicKey.Serialize(), r.identifier, r.network,
- r.storage, r.validateDecidedMsg)
- err := hs.Start()
- if err != nil {
- r.logger.Error("could not sync validator's data", zap.Error(err))
- }
- return err
+// newHistorySync creates a new instance of history sync
+func (r *decidedReader) newHistorySync() history.Syncer {
+ return history.New(r.logger, r.validatorShare.PublicKey.Serialize(), r.validatorShare.CommitteeSize(), r.identifier,
+ r.network, r.storage, r.validateDecidedMsg)
}
// Share returns the reader's share
@@ -88,6 +81,10 @@ func (r *decidedReader) Start() error {
if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil {
return errors.Wrap(err, "failed to subscribe topic")
}
+ // call migration before starting the other stuff
+ if err := GetMainMigrator().Migrate(r); err != nil {
+ r.logger.Error("could not run migration", zap.Error(err))
+ }
if err := tasks.Retry(func() error {
if err := r.sync(); err != nil {
r.logger.Error("could not sync validator", zap.Error(err))
@@ -113,6 +110,18 @@ func (r *decidedReader) Start() error {
return nil
}
+// sync starts to fetch best known decided message (highest sequence) from the network and sync to it.
+func (r *decidedReader) sync() error {
+ r.logger.Debug("syncing ibft data")
+ // creating HistorySync and starts it
+ hs := r.newHistorySync()
+ err := hs.Start()
+ if err != nil {
+ r.logger.Error("could not sync validator's data", zap.Error(err))
+ }
+ return err
+}
+
func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
r.logger.Debug("listening to decided messages")
for msg := range cn {
@@ -124,10 +133,6 @@ func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
logger.Debug("received invalid decided message")
continue
}
- if msg.Message.SeqNumber == 0 {
- logger.Debug("received invalid sequence")
- continue
- }
go func(msg *proto.SignedMessage) {
defer logger.Debug("done with decided msg")
if saved, err := r.handleNewDecidedMessage(msg); err != nil {
diff --git a/exporter/ibft/migrator.go b/exporter/ibft/migrator.go
new file mode 100644
index 0000000000..5630bae2e1
--- /dev/null
+++ b/exporter/ibft/migrator.go
@@ -0,0 +1,42 @@
+package ibft
+
+import (
+ "github.com/bloxapp/ssv/utils/logex"
+ "go.uber.org/zap"
+ "sync"
+)
+
+// Migrator is an interface for migrating messages
+type Migrator interface {
+ Migrate(r Reader) error
+}
+
+type mainMigrator struct {
+ migrators []Migrator
+}
+
+var migrator mainMigrator
+
+var migratorOnce sync.Once
+
+// GetMainMigrator returns the instance of migrator
+func GetMainMigrator() Migrator {
+ migratorOnce.Do(func() {
+ logger := logex.GetLogger(zap.String("who", "migrateManager"))
+ migrators := []Migrator{&decidedGenesisMigrator{logger}}
+ migrator = mainMigrator{
+ migrators: migrators,
+ }
+ })
+ return &migrator
+}
+
+// Migrate applies the existing migrators on the given reader
+func (mm *mainMigrator) Migrate(r Reader) error {
+ for _, migrator := range mm.migrators {
+ if err := migrator.Migrate(r); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/ibft/controller/controller_running_instance.go b/ibft/controller/controller_running_instance.go
index 992ed03c7a..6bcf0088e1 100644
--- a/ibft/controller/controller_running_instance.go
+++ b/ibft/controller/controller_running_instance.go
@@ -77,15 +77,15 @@ instanceLoop:
i.currentInstance = nil
i.logger.Debug("iBFT instance result loop stopped")
- i.afterInstance(seq, retRes)
+ i.afterInstance(seq, retRes, err)
return retRes, err
}
// afterInstance is triggered after the instance was finished
-func (i *Controller) afterInstance(seq uint64, res *ibft.InstanceResult) {
+func (i *Controller) afterInstance(seq uint64, res *ibft.InstanceResult, err error) {
// if instance was decided -> wait for late commit messages
- if res.Decided {
+ if err != nil && res != nil && res.Decided {
go i.listenToLateCommitMsgs(i.Identifier[:], seq)
} else {
i.msgQueue.PurgeIndexedMessages(msgqueue.IBFTMessageIndexKey(i.Identifier[:], seq))
diff --git a/ibft/controller/controller_sync.go b/ibft/controller/controller_sync.go
index 8ffe51c6b1..03cc04af15 100644
--- a/ibft/controller/controller_sync.go
+++ b/ibft/controller/controller_sync.go
@@ -62,7 +62,7 @@ func (i *Controller) SyncIBFT() error {
func (i *Controller) syncIBFT() error {
// TODO: use controller context once added
return tasks.RetryWithContext(context.Background(), func() error {
- s := history.New(i.logger, i.ValidatorShare.PublicKey.Serialize(), i.GetIdentifier(), i.network, i.ibftStorage, i.ValidateDecidedMsg)
+ s := history.New(i.logger, i.ValidatorShare.PublicKey.Serialize(), i.ValidatorShare.CommitteeSize(), i.GetIdentifier(), i.network, i.ibftStorage, i.ValidateDecidedMsg)
err := s.Start()
if err != nil {
return errors.Wrap(err, "history sync failed")
diff --git a/ibft/sync/history/fetch_decided.go b/ibft/sync/history/fetch_decided.go
index c52364d0cc..b23a2e0c49 100644
--- a/ibft/sync/history/fetch_decided.go
+++ b/ibft/sync/history/fetch_decided.go
@@ -10,17 +10,17 @@ import (
// FetchValidateAndSaveInstances fetches, validates and saves decided messages from the P2P network.
// Range is start to end seq including
-func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, endSeq uint64) (highestSaved *proto.SignedMessage, err error) {
+func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, endSeq uint64) (highestSaved *proto.SignedMessage, n int, err error) {
failCount := 0
start := startSeq
done := false
var latestError error
for {
if failCount == 5 {
- return highestSaved, latestError
+ return highestSaved, n, latestError
}
if done {
- return highestSaved, nil
+ return highestSaved, n, nil
}
// conform to max batch
@@ -70,8 +70,9 @@ func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, e
// save
if err := s.ibftStorage.SaveDecided(msg); err != nil {
- return highestSaved, err
+ return highestSaved, n, err
}
+ n++
// set highest
if highestSaved == nil || highestSaved.Message.SeqNumber < msg.Message.SeqNumber {
diff --git a/ibft/sync/history/fetch_decided_test.go b/ibft/sync/history/fetch_decided_test.go
index 746a6c220b..d17de0e4fd 100644
--- a/ibft/sync/history/fetch_decided_test.go
+++ b/ibft/sync/history/fetch_decided_test.go
@@ -117,10 +117,10 @@ func TestFetchDecided(t *testing.T) {
require.NoError(t, err)
storage := collections.NewIbft(db, logger, "attestation")
network := sync.NewTestNetwork(t, test.peers, int(test.rangeParams[2]), nil, nil, test.decidedArr, nil, nil)
- s := New(logger, test.validatorPk, test.identifier, network, &storage, func(msg *proto.SignedMessage) error {
+ s := New(logger, test.validatorPk, 4, test.identifier, network, &storage, func(msg *proto.SignedMessage) error {
return nil
})
- res, err := s.fetchValidateAndSaveInstances(test.fromPeer, test.rangeParams[0], test.rangeParams[1])
+ res, _, err := s.fetchValidateAndSaveInstances(test.fromPeer, test.rangeParams[0], test.rangeParams[1])
if len(test.expectedError) > 0 {
require.EqualError(t, err, test.expectedError)
diff --git a/ibft/sync/history/fetch_highest.go b/ibft/sync/history/fetch_highest.go
index 469f74e490..4d495f6f5d 100644
--- a/ibft/sync/history/fetch_highest.go
+++ b/ibft/sync/history/fetch_highest.go
@@ -3,7 +3,7 @@ package history
import (
"encoding/hex"
"github.com/bloxapp/ssv/ibft/proto"
- sync2 "github.com/bloxapp/ssv/ibft/sync"
+ ibftsync "github.com/bloxapp/ssv/ibft/sync"
"github.com/bloxapp/ssv/network"
"github.com/bloxapp/ssv/storage/kv"
"github.com/pkg/errors"
@@ -13,9 +13,8 @@ import (
// findHighestInstance returns the highest found decided signed message and the peer it was received from
func (s *Sync) findHighestInstance() (*proto.SignedMessage, string, error) {
- // pick up to 4 peers
- // TODO - why 4? should be set as param?
- usedPeers, err := sync2.GetPeers(s.network, s.publicKey, 4)
+ // pick up to committee peers
+ usedPeers, err := ibftsync.GetPeers(s.network, s.publicKey, s.committeeSize)
if err != nil {
return nil, "", err
}
diff --git a/ibft/sync/history/fetch_highest_test.go b/ibft/sync/history/fetch_highest_test.go
index 60613fb331..afba6ceb6b 100644
--- a/ibft/sync/history/fetch_highest_test.go
+++ b/ibft/sync/history/fetch_highest_test.go
@@ -186,7 +186,7 @@ func TestFindHighest(t *testing.T) {
return nil
}
}
- s := New(zap.L(), test.valdiatorPK, test.identifier, sync.NewTestNetwork(t, test.peers, 100,
+ s := New(zap.L(), test.valdiatorPK, 4, test.identifier, sync.NewTestNetwork(t, test.peers, 100,
test.highestMap, test.errorMap, nil, nil, nil), nil, test.validateMsg)
res, _, err := s.findHighestInstance()
diff --git a/ibft/sync/history/history.go b/ibft/sync/history/history.go
index 4daff6a665..6179fdeaad 100644
--- a/ibft/sync/history/history.go
+++ b/ibft/sync/history/history.go
@@ -9,6 +9,12 @@ import (
"time"
)
+// Syncer is the interface for history sync
+type Syncer interface {
+ Start() error
+ StartRange(from, to uint64) (int, error)
+}
+
// Sync is responsible for syncing and iBFT instance when needed by
// fetching decided messages from the network
type Sync struct {
@@ -20,10 +26,11 @@ type Sync struct {
identifier []byte
// paginationMaxSize is the max number of returned elements in a single response
paginationMaxSize uint64
+ committeeSize int
}
// New returns a new instance of Sync
-func New(logger *zap.Logger, publicKey []byte, identifier []byte, network network.Network, ibftStorage collections.Iibft, validateDecidedMsgF func(msg *proto.SignedMessage) error) *Sync {
+func New(logger *zap.Logger, publicKey []byte, committeeSize int, identifier []byte, network network.Network, ibftStorage collections.Iibft, validateDecidedMsgF func(msg *proto.SignedMessage) error) *Sync {
return &Sync{
logger: logger.With(zap.String("sync", "history")),
publicKey: publicKey,
@@ -32,6 +39,7 @@ func New(logger *zap.Logger, publicKey []byte, identifier []byte, network networ
validateDecidedMsgF: validateDecidedMsgF,
ibftStorage: ibftStorage,
paginationMaxSize: network.MaxBatch(),
+ committeeSize: committeeSize,
}
}
@@ -69,7 +77,7 @@ func (s *Sync) Start() error {
}
// fetch, validate and save missing data
- highestSaved, err := s.fetchValidateAndSaveInstances(fromPeer, syncStartSeqNumber, remoteHighest.Message.SeqNumber)
+ highestSaved, _, err := s.fetchValidateAndSaveInstances(fromPeer, syncStartSeqNumber, remoteHighest.Message.SeqNumber)
if err != nil {
return errors.Wrap(err, "could not fetch decided by range during sync")
}
@@ -84,3 +92,31 @@ func (s *Sync) Start() error {
s.logger.Info("finished syncing", zap.Uint64("highest seq", highestSaved.Message.SeqNumber), zap.String("duration", time.Since(start).String()))
return nil
}
+
+// StartRange starts to sync old messages in a specific range
+// first it tries to find a synced peer and then ask a specific range of decided messages
+func (s *Sync) StartRange(from, to uint64) (int, error) {
+ var n int
+ start := time.Now()
+ // fetch remote highest
+ remoteHighest, fromPeer, err := s.findHighestInstance()
+ if err != nil {
+ return n, errors.Wrap(err, "could not fetch highest instance during sync")
+ }
+ if remoteHighest == nil { // could not find highest, there isn't one
+ s.logger.Info("node is synced: could not find any peer with highest decided, assuming sequence number is 0",
+ zap.String("duration", time.Since(start).String()))
+ return n, nil
+ }
+ if remoteHighest.Message.SeqNumber < from {
+ return n, errors.New("range is out of decided sequence boundaries")
+ }
+ // fetch, validate and save missing data
+ _, n, err = s.fetchValidateAndSaveInstances(fromPeer, from, to)
+ if err != nil {
+ return n, errors.Wrap(err, "could not fetch decided by range during sync")
+ }
+ s.logger.Info("finished syncing in range", zap.Uint64("from", from), zap.Uint64("to", to),
+ zap.String("duration", time.Since(start).String()), zap.Int("items", n))
+ return n, nil
+}
diff --git a/ibft/sync/history/history_test.go b/ibft/sync/history/history_test.go
index b922221608..45e32dc8bd 100644
--- a/ibft/sync/history/history_test.go
+++ b/ibft/sync/history/history_test.go
@@ -182,7 +182,7 @@ func TestSync(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
storage := sync.TestingIbftStorage(t)
- s := New(zap.L(), test.valdiatorPK, test.identifier, sync.NewTestNetwork(t, test.peers, 100, test.highestMap, test.errorMap, test.decidedArrMap, nil, nil), &storage, func(msg *proto.SignedMessage) error {
+ s := New(zap.L(), test.valdiatorPK, 4, test.identifier, sync.NewTestNetwork(t, test.peers, 100, test.highestMap, test.errorMap, test.decidedArrMap, nil, nil), &storage, func(msg *proto.SignedMessage) error {
return nil
})
err := s.Start()
@@ -211,3 +211,94 @@ func TestSync(t *testing.T) {
})
}
}
+
+func TestSync_StartRange(t *testing.T) {
+ sks, _ := sync.GenerateNodes(4)
+ decided0 := sync.MultiSignMsg(t, []uint64{1, 2, 3}, sks, &proto.Message{
+ Type: proto.RoundState_Decided,
+ Round: 1,
+ Lambda: []byte("lambda"),
+ SeqNumber: 0,
+ })
+ decided1 := sync.MultiSignMsg(t, []uint64{1, 2, 3}, sks, &proto.Message{
+ Type: proto.RoundState_Decided,
+ Round: 1,
+ Lambda: []byte("lambda"),
+ SeqNumber: 1,
+ })
+ decided2 := sync.MultiSignMsg(t, []uint64{1, 2, 3}, sks, &proto.Message{
+ Type: proto.RoundState_Decided,
+ Round: 1,
+ Lambda: []byte("lambda"),
+ SeqNumber: 2,
+ })
+
+ tests := []struct {
+ name string
+ valdiatorPK []byte
+ identifier []byte
+ peers []string
+ highestMap map[string]*proto.SignedMessage
+ decidedArrMap map[string][]*proto.SignedMessage
+ errorMap map[string]error
+ rangeFrom uint64
+ rangeTo uint64
+ expectedSaved int
+ expectedError string
+ }{
+ {
+ "sync decided in range",
+ []byte{1, 2, 3, 4},
+ []byte("lambda"),
+ []string{"2"},
+ map[string]*proto.SignedMessage{
+ "2": decided2,
+ "3": decided2,
+ },
+ map[string][]*proto.SignedMessage{
+ "2": {decided0, decided1, decided2},
+ "3": {decided0, decided1, decided2},
+ },
+ nil,
+ uint64(0),
+ uint64(1),
+ 2,
+ "",
+ },
+ {
+ "sync decided out of range",
+ []byte{1, 2, 3, 4},
+ []byte("lambda"),
+ []string{"2"},
+ map[string]*proto.SignedMessage{
+ "2": decided2,
+ "3": decided2,
+ },
+ map[string][]*proto.SignedMessage{
+ "2": {decided0, decided1, decided2},
+ "3": {decided0, decided1, decided2},
+ },
+ nil,
+ uint64(4),
+ uint64(8),
+ 0,
+ "range is out of decided sequence boundaries",
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ storage := sync.TestingIbftStorage(t)
+ s := New(zap.L(), test.valdiatorPK, 4, test.identifier, sync.NewTestNetwork(t, test.peers, 100, test.highestMap, test.errorMap, test.decidedArrMap, nil, nil), &storage, func(msg *proto.SignedMessage) error {
+ return nil
+ })
+ n, err := s.StartRange(test.rangeFrom, test.rangeTo)
+ if len(test.expectedError) > 0 {
+ require.EqualError(t, err, test.expectedError)
+ } else {
+ require.NoError(t, err)
+ }
+ require.Equal(t, test.expectedSaved, n)
+ })
+ }
+}