diff --git a/chains/manager.go b/chains/manager.go index 97f80e1a79c5..cab073fc4b59 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -870,6 +870,10 @@ func (m *manager) createAvalancheChain( ConnectedValidators: connectedValidators, Params: consensusParams, Consensus: snowmanConsensus, + + // block backfilling stuff + AncestorsMaxContainersSent: m.BootstrapAncestorsMaxContainersSent, + AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived, } snowmanEngine, err := smeng.New(snowmanEngineConfig) if err != nil { @@ -1217,6 +1221,10 @@ func (m *manager) createSnowmanChain( Params: consensusParams, Consensus: consensus, PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID, + + // block backfilling stuff + AncestorsMaxContainersSent: m.BootstrapAncestorsMaxContainersSent, + AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived, } engine, err := smeng.New(engineConfig) if err != nil { diff --git a/snow/engine/snowman/config.go b/snow/engine/snowman/config.go index 3162471a2476..a75ff957aedb 100644 --- a/snow/engine/snowman/config.go +++ b/snow/engine/snowman/config.go @@ -25,4 +25,8 @@ type Config struct { Params snowball.Parameters Consensus snowman.Consensus PartialSync bool + + // used for block-backfilling + AncestorsMaxContainersSent int + AncestorsMaxContainersReceived int } diff --git a/snow/engine/snowman/config_test.go b/snow/engine/snowman/config_test.go index fe66256c68db..99847c8bc319 100644 --- a/snow/engine/snowman/config_test.go +++ b/snow/engine/snowman/config_test.go @@ -36,5 +36,8 @@ func DefaultConfig(t testing.TB) Config { MaxItemProcessingTime: 1, }, Consensus: &snowman.Topological{}, + + AncestorsMaxContainersSent: 2000, + AncestorsMaxContainersReceived: 2000, } } diff --git a/snow/engine/snowman/syncer/blocks_backfiller.go b/snow/engine/snowman/syncer/blocks_backfiller.go new file mode 100644 index 000000000000..cfbb093a8c45 --- /dev/null +++ b/snow/engine/snowman/syncer/blocks_backfiller.go @@ -0,0 +1,224 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package syncer + +import ( + "context" + "errors" + "fmt" + + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/common/tracker" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/bimap" + "github.com/ava-labs/avalanchego/utils/set" +) + +var ( + _ common.AncestorsHandler = (*BlockBackfiller)(nil) + + ErrNoPeersToDownloadBlocksFrom = errors.New("no connected peers to download blocks from") +) + +type BlockBackfillerConfig struct { + Ctx *snow.ConsensusContext + VM block.ChainVM + Sender common.Sender + Validators validators.Manager + Peers tracker.Peers + AncestorsMaxContainersSent int + AncestorsMaxContainersReceived int + + // BlockBackfiller is supposed to be embedded into the engine. + // So requestID is shared among BlockBackfiller and the engine to + // avoid duplications. + SharedRequestID *uint32 +} + +type BlockBackfiller struct { + BlockBackfillerConfig + + fetchFrom set.Set[ids.NodeID] // picked from bootstrapper + outstandingRequests *bimap.BiMap[common.Request, ids.ID] // tracks which validators were asked for which block in which requests + interrupted bool // flag to allow backfilling restart after recovering from validators disconnections +} + +func NewBlockBackfiller(cfg BlockBackfillerConfig) *BlockBackfiller { + return &BlockBackfiller{ + BlockBackfillerConfig: cfg, + + fetchFrom: set.Of[ids.NodeID](cfg.Validators.GetValidatorIDs(cfg.Ctx.SubnetID)...), + outstandingRequests: bimap.New[common.Request, ids.ID](), + interrupted: len(cfg.Peers.PreferredPeers()) > 0, + } +} + +func (bb *BlockBackfiller) Start(ctx context.Context) error { + ssVM, ok := bb.VM.(block.StateSyncableVM) + if !ok { + bb.Ctx.StateSyncing.Set(false) + return nil // nothing to do + } + + switch wantedBlk, _, err := ssVM.BackfillBlocksEnabled(ctx); { + case errors.Is(err, block.ErrBlockBackfillingNotEnabled): + bb.Ctx.Log.Info("block backfilling not enabled") + bb.Ctx.StateSyncing.Set(false) + return nil + case err != nil: + return fmt.Errorf("failed checking if state sync block backfilling is enabled: %w", err) + default: + return bb.fetch(ctx, wantedBlk) + } +} + +// Ancestors handles the receipt of multiple containers. Should be received in +// response to a GetAncestors message to [nodeID] with request ID [requestID] +func (bb *BlockBackfiller) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, blks [][]byte) error { + // Make sure this is in response to a request we made + wantedBlkID, ok := bb.outstandingRequests.DeleteKey(common.Request{ + NodeID: nodeID, + RequestID: requestID, + }) + if !ok { // this message isn't in response to a request we made + bb.Ctx.Log.Debug("received unexpected Ancestors", + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + ) + return nil + } + + lenBlks := len(blks) + if lenBlks == 0 { + bb.Ctx.Log.Debug("received Ancestors with no block", + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + ) + + bb.markUnavailable(nodeID) + + // Send another request for this + return bb.fetch(ctx, wantedBlkID) + } + + // This node has responded - so add it back into the set + bb.fetchFrom.Add(nodeID) + + if lenBlks > bb.AncestorsMaxContainersReceived { + blks = blks[:bb.AncestorsMaxContainersReceived] + bb.Ctx.Log.Debug("ignoring containers in Ancestors", + zap.Int("numContainers", lenBlks-bb.AncestorsMaxContainersReceived), + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + ) + } + + ssVM, ok := bb.VM.(block.StateSyncableVM) + if !ok { + return nil // nothing to do + } + + switch nextWantedBlkID, _, err := ssVM.BackfillBlocks(ctx, blks); { + case errors.Is(err, block.ErrStopBlockBackfilling): + bb.Ctx.Log.Info("block backfilling done") + bb.Ctx.StateSyncing.Set(false) + return nil + case errors.Is(err, block.ErrInternalBlockBackfilling): + bb.Ctx.Log.Debug("internal error while backfilling blocks", + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + zap.Error(err), + ) + return err + case err != nil: + bb.Ctx.Log.Debug("failed to backfill blocks in Ancestors", + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + zap.Error(err), + ) + return bb.fetch(ctx, wantedBlkID) + default: + return bb.fetch(ctx, nextWantedBlkID) + } +} + +func (bb *BlockBackfiller) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + blkID, ok := bb.outstandingRequests.DeleteKey(common.Request{ + NodeID: nodeID, + RequestID: requestID, + }) + if !ok { + bb.Ctx.Log.Debug("unexpectedly called GetAncestorsFailed", + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + ) + return nil + } + + // This node timed out their request, so we can add them back to [fetchFrom] + bb.fetchFrom.Add(nodeID) + + // Send another request for this + return bb.fetch(ctx, blkID) +} + +// Get block [blkID] and its ancestors from a peer +func (bb *BlockBackfiller) fetch(ctx context.Context, blkID ids.ID) error { + validatorID, ok := bb.fetchFrom.Peek() + if !ok { + return fmt.Errorf("dropping request for %s: %w", blkID, ErrNoPeersToDownloadBlocksFrom) + } + + // We only allow one outbound request at a time from a node + bb.markUnavailable(validatorID) + *bb.SharedRequestID++ + bb.outstandingRequests.Put( + common.Request{ + NodeID: validatorID, + RequestID: *bb.SharedRequestID, + }, + blkID, + ) + + bb.Sender.SendGetAncestors(ctx, validatorID, *bb.SharedRequestID, blkID) + return nil +} + +func (bb *BlockBackfiller) markUnavailable(nodeID ids.NodeID) { + bb.fetchFrom.Remove(nodeID) + + // if [fetchFrom] has become empty, reset it to the currently preferred + // peers + if bb.fetchFrom.Len() == 0 { + bb.fetchFrom = bb.Peers.PreferredPeers() + } +} + +func (bb *BlockBackfiller) Connected(ctx context.Context, nodeID ids.NodeID) error { + // Ensure fetchFrom reflects proper validator list + if _, ok := bb.Validators.GetValidator(bb.Ctx.SubnetID, nodeID); ok { + bb.fetchFrom.Add(nodeID) + } + + if !bb.interrupted { + return nil + } + + // first validator reconnected. Resume blocks backfilling if needed + bb.interrupted = false + return bb.Start(ctx) +} + +func (bb *BlockBackfiller) Disconnected(nodeID ids.NodeID) error { + bb.markUnavailable(nodeID) + + // if there is no validator left, flag that blocks backfilling is interrupted. + bb.interrupted = len(bb.fetchFrom) == 0 + return nil +} diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 34954885a66d..41e07fc65312 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -22,8 +22,8 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/snowman/ancestor" + "github.com/ava-labs/avalanchego/snow/engine/snowman/syncer" "github.com/ava-labs/avalanchego/snow/event" - "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/bag" "github.com/ava-labs/avalanchego/utils/bimap" "github.com/ava-labs/avalanchego/utils/constants" @@ -32,6 +32,7 @@ import ( "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/units" "github.com/ava-labs/avalanchego/utils/wrappers" + "github.com/ava-labs/avalanchego/version" ) const ( @@ -64,9 +65,9 @@ type Transitive struct { common.AcceptedStateSummaryHandler common.AcceptedFrontierHandler common.AcceptedHandler - common.AncestorsHandler common.AppHandler - validators.Connector + + *syncer.BlockBackfiller requestID uint32 @@ -145,9 +146,7 @@ func newTransitive(config Config) (*Transitive, error) { AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(config.Ctx.Log), AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(config.Ctx.Log), AcceptedHandler: common.NewNoOpAcceptedHandler(config.Ctx.Log), - AncestorsHandler: common.NewNoOpAncestorsHandler(config.Ctx.Log), AppHandler: config.VM, - Connector: config.VM, pending: make(map[ids.ID]snowman.Block), nonVerifieds: ancestor.NewTree(), nonVerifiedCache: nonVerifiedCache, @@ -156,6 +155,18 @@ func newTransitive(config Config) (*Transitive, error) { blkReqs: bimap.New[common.Request, ids.ID](), blkReqSourceMetric: make(map[common.Request]prometheus.Counter), } + t.BlockBackfiller = syncer.NewBlockBackfiller( + syncer.BlockBackfillerConfig{ + Ctx: config.Ctx, + VM: config.VM, + Sender: config.Sender, + Validators: config.Validators, + Peers: config.ConnectedValidators, + AncestorsMaxContainersSent: config.AncestorsMaxContainersSent, + AncestorsMaxContainersReceived: config.AncestorsMaxContainersReceived, + SharedRequestID: &t.requestID, + }, + ) return t, t.metrics.Initialize("", config.Ctx.Registerer) } @@ -476,8 +487,7 @@ func (t *Transitive) Notify(ctx context.Context, msg common.Message) error { t.pendingBuildBlocks++ return t.buildBlocks(ctx) case common.StateSyncDone: - t.Ctx.StateSyncing.Set(false) - return nil + return t.BlockBackfiller.Start(ctx) default: t.Ctx.Log.Warn("received an unexpected message from the VM", zap.Stringer("messageString", msg), @@ -549,7 +559,9 @@ func (t *Transitive) Start(ctx context.Context, startReqID uint32) error { return fmt.Errorf("failed to notify VM that consensus is starting: %w", err) } - return nil + + // Start Block backfilling if needed + return t.BlockBackfiller.Start(ctx) } func (t *Transitive) HealthCheck(ctx context.Context) (interface{}, error) { @@ -1141,3 +1153,17 @@ func (t *Transitive) addUnverifiedBlockToConsensus( tree: t.nonVerifieds, }) } + +func (t *Transitive) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error { + if err := t.VM.Connected(ctx, nodeID, nodeVersion); err != nil { + return err + } + return t.BlockBackfiller.Connected(ctx, nodeID) +} + +func (t *Transitive) Disconnected(ctx context.Context, nodeID ids.NodeID) error { + if err := t.VM.Disconnected(ctx, nodeID); err != nil { + return err + } + return t.BlockBackfiller.Disconnected(nodeID) +} diff --git a/snow/engine/snowman/transitive_block_backfilling_test.go b/snow/engine/snowman/transitive_block_backfilling_test.go new file mode 100644 index 000000000000..e4cd3bbea50a --- /dev/null +++ b/snow/engine/snowman/transitive_block_backfilling_test.go @@ -0,0 +1,539 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package snowman + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/choices" + "github.com/ava-labs/avalanchego/snow/consensus/snowman" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/engine/snowman/syncer" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/wrappers" + "github.com/ava-labs/avalanchego/version" +) + +func TestGetAncestorsRequestIssuedIfBlockBackfillingIsEnabled(t *testing.T) { + require := require.New(t) + + engCfg, vm, sender, err := setupBlockBackfillingTests(t) + require.NoError(err) + + // create the engine + te, err := newTransitive(engCfg) + require.NoError(err) + + // enable block backfilling and check blocks request starts with block provided by VM + reqBlk := ids.GenerateTestID() + dummyHeight := uint64(1492) + vm.BackfillBlocksEnabledF = func(ctx context.Context) (ids.ID, uint64, error) { + return reqBlk, dummyHeight, nil + } + + var issuedBlkID ids.ID + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlkID = blkID + } + + dummyCtx := context.Background() + reqNum := uint32(0) + require.NoError(te.Start(dummyCtx, reqNum)) + require.Equal(reqBlk, issuedBlkID) +} + +func TestGetAncestorsRequestNotIssuedIfBlockBackfillingIsNotEnabled(t *testing.T) { + require := require.New(t) + + engCfg, vm, sender, err := setupBlockBackfillingTests(t) + require.NoError(err) + + // create the engine + te, err := newTransitive(engCfg) + require.NoError(err) + + // disable block backfilling + dummyHeight := uint64(1492) + vm.BackfillBlocksEnabledF = func(ctx context.Context) (ids.ID, uint64, error) { + return ids.Empty, dummyHeight, block.ErrBlockBackfillingNotEnabled + } + + // this will make engine Start fail if SendGetAncestor is attempted + sender.CantSendGetAncestors = true + + dummyCtx := context.Background() + reqNum := uint32(0) + require.NoError(te.Start(dummyCtx, reqNum)) +} + +func TestEngineErrsIfBlockBackfillingIsEnabledCheckErrs(t *testing.T) { + require := require.New(t) + + engCfg, vm, _, err := setupBlockBackfillingTests(t) + require.NoError(err) + + // create the engine + te, err := newTransitive(engCfg) + require.NoError(err) + + // let BackfillBlocksEnabled err with non-flag error + customErr := errors.New("a custom error") + dummyHeight := uint64(1492) + vm.BackfillBlocksEnabledF = func(ctx context.Context) (ids.ID, uint64, error) { + return ids.Empty, dummyHeight, customErr + } + + dummyCtx := context.Background() + reqNum := uint32(0) + err = te.Start(dummyCtx, reqNum) + require.ErrorIs(err, customErr) +} + +func TestEngineErrsIfThereAreNoPeersToDownloadBlocksFrom(t *testing.T) { + require := require.New(t) + + engCfg, vm, sender, err := setupBlockBackfillingTests(t) + require.NoError(err) + + // create the engine + te, err := newTransitive(engCfg) + require.NoError(err) + + // enable block backfilling + reqBlk := ids.GenerateTestID() + dummyHeight := uint64(1492) + vm.BackfillBlocksEnabledF = func(ctx context.Context) (ids.ID, uint64, error) { + return reqBlk, dummyHeight, nil + } + + var ( + issuedBlkRequest = false + issuedBlkID ids.ID + ) + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlkRequest = true + issuedBlkID = blkID + } + + // disconnect all validators, so that there are no peers to download blocks from + dummyCtx := context.Background() + for _, valID := range engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID) { + require.NoError(te.Disconnected(dummyCtx, valID)) + } + + reqNum := uint32(0) + err = te.Start(dummyCtx, reqNum) + require.ErrorIs(err, syncer.ErrNoPeersToDownloadBlocksFrom) + + // riconnect at least a validator and show that GetAncestors requests are issued + for _, valID := range engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID) { + require.NoError(te.Connected(dummyCtx, valID, version.CurrentApp)) + } + + // check that GetAncestors request is issued once a validator has reconnected + require.True(issuedBlkRequest) + require.Equal(reqBlk, issuedBlkID) +} + +func TestAncestorsProcessing(t *testing.T) { + require := require.New(t) + + engCfg, vm, sender, err := setupBlockBackfillingTests(t) + require.NoError(err) + + // create the engine + te, err := newTransitive(engCfg) + require.NoError(err) + + // for current test we need a single validator. Disconnect the other + dummyCtx := context.Background() + require.NoError(te.Disconnected(dummyCtx, engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[1])) + + // enable block backfilling + reqBlkFirst := ids.GenerateTestID() + dummyHeight := uint64(1492) + vm.BackfillBlocksEnabledF = func(ctx context.Context) (ids.ID, uint64, error) { + return reqBlkFirst, dummyHeight, nil + } + issuedBlk := ids.Empty + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlk = blkID + } + + // issue blocks request + startReqNum := uint32(0) + require.NoError(te.Start(dummyCtx, startReqNum)) + + // process GetAncestor response + var ( + nodeID = engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[0] + responseReqID = startReqNum + 1 + blkBytes = [][]byte{{1}, {2}, {3}} + pushedBlks [][]byte + reqBlkSecond = ids.GenerateTestID() + ) + + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = b + return reqBlkSecond, dummyHeight, nil + } + + { + // handle Ancestor response from unexpected nodeID + wrongNodeID := ids.GenerateTestNodeID() + require.NotEqual(nodeID, wrongNodeID) + require.NoError(te.Ancestors(dummyCtx, wrongNodeID, responseReqID, blkBytes)) + require.Nil(pushedBlks) // blocks from wrong NodeID are not pushed to VM + } + { + // handle Ancestor response with wrong requestID + wrongReqID := uint32(2023) + require.NotEqual(responseReqID, wrongReqID) + require.NoError(te.Ancestors(dummyCtx, nodeID, wrongReqID, blkBytes)) + require.Nil(pushedBlks) // blocks from wrong NodeID are not pushed to VM + } + { + // handle empty Ancestor response + + // Connect second validator, to allow requesting it the block following faulty response from first validator + require.NoError(te.Connected(dummyCtx, engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[1], version.CurrentApp)) + + emptyBlkBytes := [][]byte{} + require.NoError(te.Ancestors(dummyCtx, nodeID, responseReqID, emptyBlkBytes)) + require.Nil(pushedBlks) // blocks from wrong NodeID are not pushed to VM + require.Equal(reqBlkFirst, issuedBlk) // check that VM controls next block ID to be requested + } + { + // success + nodeID := engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[1] + responseReqID++ // previous consumed by empty Ancestor response case + + require.NoError(te.Ancestors(dummyCtx, nodeID, responseReqID, blkBytes)) + require.Equal(blkBytes, pushedBlks) // blocks are pushed to VM + require.Equal(reqBlkSecond, issuedBlk) // check that VM controls next block ID to be requested + } +} + +func TestGetAncestorsFailedProcessing(t *testing.T) { + require := require.New(t) + + engCfg, vm, sender, err := setupBlockBackfillingTests(t) + require.NoError(err) + + // create the engine + te, err := newTransitive(engCfg) + require.NoError(err) + + // enable block backfilling + reqBlkFirst := ids.GenerateTestID() + dummyHeight := uint64(1492) + vm.BackfillBlocksEnabledF = func(ctx context.Context) (ids.ID, uint64, error) { + return reqBlkFirst, dummyHeight, nil + } + issuedBlk := ids.Empty + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlk = blkID + } + + // issue blocks request + dummyCtx := context.Background() + startReqNum := uint32(0) + require.NoError(te.Start(dummyCtx, startReqNum)) + + // process GetAncestor response + var ( + nodeID = engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[0] + responseReqID = startReqNum + 1 + pushedBlks [][]byte + reqBlkSecond = ids.GenerateTestID() + ) + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = b + return reqBlkSecond, dummyHeight, nil + } + { + // handle Ancestor response from unexpected nodeID + wrongNodeID := ids.GenerateTestNodeID() + require.NotEqual(nodeID, wrongNodeID) + require.NoError(te.GetAncestorsFailed(dummyCtx, wrongNodeID, responseReqID)) + require.Nil(pushedBlks) // blocks from wrong NodeID are not pushed to VM + } + { + // handle Ancestor response with wrong requestID + wrongReqID := uint32(2023) + require.NotEqual(responseReqID, wrongReqID) + require.NoError(te.GetAncestorsFailed(dummyCtx, nodeID, wrongReqID)) + require.Nil(pushedBlks) // blocks from wrong NodeID are not pushed to VM + } + { + // success + require.NoError(te.GetAncestorsFailed(dummyCtx, nodeID, responseReqID)) + require.Nil(pushedBlks) // no blocks are pushed to VM + require.Equal(reqBlkFirst, issuedBlk) // check that the same blk is requested again + } +} + +func TestBackfillingTerminatedCleanlyByVM(t *testing.T) { + require := require.New(t) + + engCfg, vm, sender, err := setupBlockBackfillingTests(t) + require.NoError(err) + + // create the engine + te, err := newTransitive(engCfg) + require.NoError(err) + + // for current test we need a single validator. Disconnect the other + dummyCtx := context.Background() + require.NoError(te.Disconnected(dummyCtx, engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[1])) + + // enable block backfilling + reqBlkFirst := ids.GenerateTestID() + dummyHeight := uint64(1492) + vm.BackfillBlocksEnabledF = func(ctx context.Context) (ids.ID, uint64, error) { + return reqBlkFirst, dummyHeight, nil + } + + // start the engine + startReqNum := uint32(0) + require.NoError(te.Start(dummyCtx, startReqNum)) + + var ( + nodeID = engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[0] + responseReqID = startReqNum + blkBytes = [][]byte{{1}} // content does not matter here. We just need it non-empty + + pushedBlks = false + nextRequestedBlk = ids.GenerateTestID() + issuedBlk = ids.Empty + ) + + // 1. Successfully request and download some blocks + { + responseReqID++ + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = true + return nextRequestedBlk, dummyHeight, nil // requestedBlkID does not really matter here + } + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlk = blkID + } + require.NoError(te.Ancestors(dummyCtx, nodeID, responseReqID, blkBytes)) + require.True(pushedBlks) + require.Equal(nextRequestedBlk, issuedBlk) + } + + // 2. Successfully request and download some more blocks + { + pushedBlks = false + nextRequestedBlk = ids.GenerateTestID() + issuedBlk = ids.Empty + responseReqID++ + + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = true + return nextRequestedBlk, dummyHeight, nil // requestedBlkID does not really matter here + } + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlk = blkID + } + + require.NoError(te.Ancestors(dummyCtx, nodeID, responseReqID, blkBytes)) + require.True(pushedBlks) + require.Equal(nextRequestedBlk, issuedBlk) + } + + // 3. If block backfilling fails in VM, the same blocks are requested to a different VM + { + pushedBlks = false + issuedBlk = ids.Empty + responseReqID++ + + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = true + return ids.Empty, dummyHeight, errors.New("custom error upon backfilling") + } + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlk = blkID + } + + require.NoError(te.Ancestors(dummyCtx, nodeID, responseReqID, blkBytes)) + require.True(pushedBlks) + require.Equal(nextRequestedBlk, issuedBlk) // we expect to ask again block requested at step 2 + } + + // 4. Let the VM stop block downloading (block backfilling complete) + { + issuedBlkRequest := false + pushedBlks = false + responseReqID++ + + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = true + return ids.Empty, dummyHeight, block.ErrStopBlockBackfilling + } + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlkRequest = true + } + + require.NoError(te.Ancestors(dummyCtx, nodeID, responseReqID, blkBytes)) + require.True(pushedBlks) + require.False(issuedBlkRequest) // no more requests, block backfilling done + } +} + +func TestBackfillingTerminatedWithErrorByVM(t *testing.T) { + require := require.New(t) + + engCfg, vm, sender, err := setupBlockBackfillingTests(t) + require.NoError(err) + + // create the engine + te, err := newTransitive(engCfg) + require.NoError(err) + + // for current test we need a single validator. Disconnect the other + dummyCtx := context.Background() + require.NoError(te.Disconnected(dummyCtx, engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[1])) + + // enable block backfilling + reqBlkFirst := ids.GenerateTestID() + dummyHeight := uint64(1492) + vm.BackfillBlocksEnabledF = func(ctx context.Context) (ids.ID, uint64, error) { + return reqBlkFirst, dummyHeight, nil + } + + // start the engine + startReqNum := uint32(0) + require.NoError(te.Start(dummyCtx, startReqNum)) + + var ( + nodeID = engCfg.Validators.GetValidatorIDs(engCfg.Ctx.SubnetID)[0] + responseReqID = startReqNum + blkBytes = [][]byte{{1}} // content does not matter here. We just need it non-empty + + pushedBlks = false + nextRequestedBlk = ids.GenerateTestID() + issuedBlk = ids.Empty + ) + + // 1. Successfully request and download some blocks + { + responseReqID++ + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = true + return nextRequestedBlk, dummyHeight, nil // requestedBlkID does not really matter here + } + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlk = blkID + } + require.NoError(te.Ancestors(dummyCtx, nodeID, responseReqID, blkBytes)) + require.True(pushedBlks) + require.Equal(nextRequestedBlk, issuedBlk) + } + + // 2. Successfully request and download some more blocks + { + pushedBlks = false + nextRequestedBlk = ids.GenerateTestID() + issuedBlk = ids.Empty + responseReqID++ + + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = true + return nextRequestedBlk, dummyHeight, nil // requestedBlkID does not really matter here + } + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlk = blkID + } + + require.NoError(te.Ancestors(dummyCtx, nodeID, responseReqID, blkBytes)) + require.True(pushedBlks) + require.Equal(nextRequestedBlk, issuedBlk) + } + + // 3. Let the VM have an internal error while processing backfilled blocks. + { + issuedBlkRequest := false + pushedBlks = false + responseReqID++ + + vm.BackfillBlocksF = func(ctx context.Context, b [][]byte) (ids.ID, uint64, error) { + pushedBlks = true + return ids.Empty, dummyHeight, block.ErrInternalBlockBackfilling + } + sender.SendGetAncestorsF = func(ctx context.Context, ni ids.NodeID, u uint32, blkID ids.ID) { + issuedBlkRequest = true + } + + err := te.Ancestors(dummyCtx, nodeID, responseReqID, blkBytes) + require.ErrorIs(err, block.ErrInternalBlockBackfilling) + require.True(pushedBlks) + require.False(issuedBlkRequest) // no more requests, block backfilling done + } +} + +type fullVM struct { + *block.TestVM + *block.TestStateSyncableVM +} + +func setupBlockBackfillingTests(t *testing.T) (Config, *fullVM, *common.SenderTest, error) { + engCfg := DefaultConfig(t) + + var ( + vm = &fullVM{ + TestVM: &block.TestVM{ + TestVM: common.TestVM{ + T: t, + }, + }, + TestStateSyncableVM: &block.TestStateSyncableVM{ + T: t, + }, + } + sender = &common.SenderTest{ + T: t, + } + ) + engCfg.VM = vm + engCfg.Sender = sender + + lastAcceptedBlk := &snowman.TestBlock{TestDecidable: choices.TestDecidable{ + IDV: ids.GenerateTestID(), + StatusV: choices.Accepted, + }} + + vm.LastAcceptedF = func(context.Context) (ids.ID, error) { + return lastAcceptedBlk.ID(), nil + } + + vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { + switch blkID { + case lastAcceptedBlk.ID(): + return lastAcceptedBlk, nil + default: + return nil, errUnknownBlock + } + } + + // add at least a peer to be reached out for blocks + vals := validators.NewManager() + engCfg.Validators = vals + vdr1 := ids.NodeID{1} + vdr2 := ids.NodeID{2} + errs := wrappers.Errs{} + errs.Add( + vals.AddStaker(engCfg.Ctx.SubnetID, vdr1, nil, ids.Empty, 1), + vals.AddStaker(engCfg.Ctx.SubnetID, vdr2, nil, ids.Empty, 1), + ) + + return engCfg, vm, sender, errs.Err +} diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 23e88a646368..acf32505f6d5 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -1454,6 +1454,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { ) require.NoError(err) + peers = tracker.NewPeers() h, err := handler.New( bootstrapConfig.Ctx, beacons, @@ -1463,16 +1464,17 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { cpuTracker, vm, subnets.New(ctx.NodeID, subnets.Config{}), - tracker.NewPeers(), + peers, ) require.NoError(err) engineConfig := smeng.Config{ - Ctx: bootstrapConfig.Ctx, - AllGetsServer: snowGetHandler, - VM: bootstrapConfig.VM, - Sender: bootstrapConfig.Sender, - Validators: beacons, + Ctx: bootstrapConfig.Ctx, + AllGetsServer: snowGetHandler, + VM: bootstrapConfig.VM, + Sender: bootstrapConfig.Sender, + Validators: beacons, + ConnectedValidators: peers, Params: snowball.Parameters{ K: 1, AlphaPreference: 1, @@ -1485,6 +1487,9 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { MaxItemProcessingTime: 1, }, Consensus: &smcon.Topological{}, + + AncestorsMaxContainersSent: 2000, + AncestorsMaxContainersReceived: 2000, } engine, err := smeng.New(engineConfig) require.NoError(err)