From 617a9e2c1be63916b306dcf0c453d369cd70cfbd Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 2 Apr 2024 11:01:46 -0400 Subject: [PATCH] Interval tree syncing integration (#2855) --- chains/manager.go | 17 +- snow/engine/snowman/bootstrap/acceptor.go | 53 + snow/engine/snowman/bootstrap/block_job.go | 113 -- snow/engine/snowman/bootstrap/bootstrapper.go | 269 ++-- .../snowman/bootstrap/bootstrapper_test.go | 1173 ++++------------- snow/engine/snowman/bootstrap/config.go | 11 +- snow/engine/snowman/bootstrap/metrics.go | 7 - snow/engine/snowman/bootstrap/storage.go | 252 ++++ snow/engine/snowman/bootstrap/storage_test.go | 311 +++++ vms/platformvm/vm_test.go | 5 +- 10 files changed, 957 insertions(+), 1254 deletions(-) create mode 100644 snow/engine/snowman/bootstrap/acceptor.go delete mode 100644 snow/engine/snowman/bootstrap/block_job.go create mode 100644 snow/engine/snowman/bootstrap/storage.go create mode 100644 snow/engine/snowman/bootstrap/storage_test.go diff --git a/chains/manager.go b/chains/manager.go index 6393f7ca58e8..c62519c757c9 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -85,10 +85,10 @@ var ( VertexDBPrefix = []byte("vertex") VertexBootstrappingDBPrefix = []byte("vertex_bs") TxBootstrappingDBPrefix = []byte("tx_bs") - BlockBootstrappingDBPrefix = []byte("block_bs") + BlockBootstrappingDBPrefix = []byte("interval_block_bs") // Bootstrapping prefixes for ChainVMs - ChainBootstrappingDBPrefix = []byte("bs") + ChainBootstrappingDBPrefix = []byte("interval_bs") errUnknownVMType = errors.New("the vm should have type avalanche.DAGVM or snowman.ChainVM") errCreatePlatformVM = errors.New("attempted to create a chain running the PlatformVM") @@ -579,10 +579,6 @@ func (m *manager) createAvalancheChain( if err != nil { return nil, err } - blockBlocker, err := queue.NewWithMissing(blockBootstrappingDB, "block", ctx.Registerer) - if err != nil { - return nil, err - } // Passes messages from the avalanche engines to the network avalancheMessageSender, err := sender.New( @@ -837,7 +833,7 @@ func (m *manager) createAvalancheChain( BootstrapTracker: sb, Timer: h, AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived, - Blocked: blockBlocker, + DB: blockBootstrappingDB, VM: vmWrappingProposerVM, } var snowmanBootstrapper common.BootstrapableEngine @@ -952,11 +948,6 @@ func (m *manager) createSnowmanChain( vmDB := prefixdb.New(VMDBPrefix, prefixDB) bootstrappingDB := prefixdb.New(ChainBootstrappingDBPrefix, prefixDB) - blocked, err := queue.NewWithMissing(bootstrappingDB, "block", ctx.Registerer) - if err != nil { - return nil, err - } - // Passes messages from the consensus engine to the network messageSender, err := sender.New( ctx, @@ -1175,7 +1166,7 @@ func (m *manager) createSnowmanChain( BootstrapTracker: sb, Timer: h, AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived, - Blocked: blocked, + DB: bootstrappingDB, VM: vm, Bootstrapped: bootstrapFunc, } diff --git a/snow/engine/snowman/bootstrap/acceptor.go b/snow/engine/snowman/bootstrap/acceptor.go new file mode 100644 index 000000000000..eae4be879afa --- /dev/null +++ b/snow/engine/snowman/bootstrap/acceptor.go @@ -0,0 +1,53 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrap + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/consensus/snowman" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" +) + +var ( + _ block.Parser = (*parseAcceptor)(nil) + _ snowman.Block = (*blockAcceptor)(nil) +) + +type parseAcceptor struct { + parser block.Parser + ctx *snow.ConsensusContext + numAccepted prometheus.Counter +} + +func (p *parseAcceptor) ParseBlock(ctx context.Context, bytes []byte) (snowman.Block, error) { + blk, err := p.parser.ParseBlock(ctx, bytes) + if err != nil { + return nil, err + } + return &blockAcceptor{ + Block: blk, + ctx: p.ctx, + numAccepted: p.numAccepted, + }, nil +} + +type blockAcceptor struct { + snowman.Block + + ctx *snow.ConsensusContext + numAccepted prometheus.Counter +} + +func (b *blockAcceptor) Accept(ctx context.Context) error { + if err := b.ctx.BlockAcceptor.Accept(b.ctx, b.ID(), b.Bytes()); err != nil { + return err + } + err := b.Block.Accept(ctx) + b.numAccepted.Inc() + return err +} diff --git a/snow/engine/snowman/bootstrap/block_job.go b/snow/engine/snowman/bootstrap/block_job.go deleted file mode 100644 index 403327006f85..000000000000 --- a/snow/engine/snowman/bootstrap/block_job.go +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package bootstrap - -import ( - "context" - "errors" - "fmt" - - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/choices" - "github.com/ava-labs/avalanchego/snow/consensus/snowman" - "github.com/ava-labs/avalanchego/snow/engine/common/queue" - "github.com/ava-labs/avalanchego/snow/engine/snowman/block" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/set" -) - -var errMissingDependenciesOnAccept = errors.New("attempting to accept a block with missing dependencies") - -type parser struct { - log logging.Logger - numAccepted prometheus.Counter - vm block.ChainVM -} - -func (p *parser) Parse(ctx context.Context, blkBytes []byte) (queue.Job, error) { - blk, err := p.vm.ParseBlock(ctx, blkBytes) - if err != nil { - return nil, err - } - return &blockJob{ - log: p.log, - numAccepted: p.numAccepted, - blk: blk, - vm: p.vm, - }, nil -} - -type blockJob struct { - log logging.Logger - numAccepted prometheus.Counter - blk snowman.Block - vm block.Getter -} - -func (b *blockJob) ID() ids.ID { - return b.blk.ID() -} - -func (b *blockJob) MissingDependencies(ctx context.Context) (set.Set[ids.ID], error) { - missing := set.Set[ids.ID]{} - parentID := b.blk.Parent() - if parent, err := b.vm.GetBlock(ctx, parentID); err != nil || parent.Status() != choices.Accepted { - missing.Add(parentID) - } - return missing, nil -} - -func (b *blockJob) HasMissingDependencies(ctx context.Context) (bool, error) { - parentID := b.blk.Parent() - if parent, err := b.vm.GetBlock(ctx, parentID); err != nil || parent.Status() != choices.Accepted { - return true, nil - } - return false, nil -} - -func (b *blockJob) Execute(ctx context.Context) error { - hasMissingDeps, err := b.HasMissingDependencies(ctx) - if err != nil { - return err - } - if hasMissingDeps { - return errMissingDependenciesOnAccept - } - status := b.blk.Status() - switch status { - case choices.Unknown, choices.Rejected: - return fmt.Errorf("attempting to execute block with status %s", status) - case choices.Processing: - blkID := b.blk.ID() - if err := b.blk.Verify(ctx); err != nil { - b.log.Error("block failed verification during bootstrapping", - zap.Stringer("blkID", blkID), - zap.Error(err), - ) - return fmt.Errorf("failed to verify block in bootstrapping: %w", err) - } - - b.numAccepted.Inc() - b.log.Trace("accepting block in bootstrapping", - zap.Stringer("blkID", blkID), - zap.Uint64("height", b.blk.Height()), - zap.Time("timestamp", b.blk.Timestamp()), - ) - if err := b.blk.Accept(ctx); err != nil { - b.log.Debug("failed to accept block during bootstrapping", - zap.Stringer("blkID", blkID), - zap.Error(err), - ) - return fmt.Errorf("failed to accept block in bootstrapping: %w", err) - } - } - return nil -} - -func (b *blockJob) Bytes() []byte { - return b.blk.Bytes() -} diff --git a/snow/engine/snowman/bootstrap/bootstrapper.go b/snow/engine/snowman/bootstrap/bootstrapper.go index ebbbd6f13ff7..ca0584128269 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper.go +++ b/snow/engine/snowman/bootstrap/bootstrapper.go @@ -13,14 +13,15 @@ import ( "go.uber.org/zap" + "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/snow" - "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/consensus/snowman" "github.com/ava-labs/avalanchego/snow/consensus/snowman/bootstrapper" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap/interval" "github.com/ava-labs/avalanchego/utils/bimap" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/timer" @@ -95,13 +96,6 @@ type Bootstrapper struct { // tracks which validators were asked for which containers in which requests outstandingRequests *bimap.BiMap[common.Request, ids.ID] - // number of state transitions executed - executedStateTransitions int - - parser *parser - - awaitingTimeout bool - // fetchFrom is the set of nodes that we can fetch the next container from. // When a container is fetched, the nodeID is removed from [fetchFrom] to // attempt to limit a single request to a peer at any given time. When the @@ -111,6 +105,13 @@ type Bootstrapper struct { // again. fetchFrom set.Set[ids.NodeID] + // number of state transitions executed + executedStateTransitions uint64 + awaitingTimeout bool + + tree *interval.Tree + missingBlockIDs set.Set[ids.ID] + // bootstrappedOnce ensures that the [Bootstrapped] callback is only invoked // once, even if bootstrapping is retried. bootstrappedOnce sync.Once @@ -149,10 +150,7 @@ func (b *Bootstrapper) Clear(context.Context) error { b.Ctx.Lock.Lock() defer b.Ctx.Lock.Unlock() - if err := b.Config.Blocked.Clear(); err != nil { - return err - } - return b.Config.Blocked.Commit() + return database.AtomicClear(b.DB, b.DB) } func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error { @@ -163,30 +161,26 @@ func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error { State: snow.Bootstrapping, }) if err := b.VM.SetState(ctx, snow.Bootstrapping); err != nil { - return fmt.Errorf("failed to notify VM that bootstrapping has started: %w", - err) + return fmt.Errorf("failed to notify VM that bootstrapping has started: %w", err) } - b.parser = &parser{ - log: b.Ctx.Log, - numAccepted: b.numAccepted, - vm: b.VM, - } - if err := b.Blocked.SetParser(ctx, b.parser); err != nil { + // Set the starting height + lastAcceptedHeight, err := b.getLastAcceptedHeight(ctx) + if err != nil { return err } + b.startingHeight = lastAcceptedHeight + b.requestID = startReqID - // Set the starting height - lastAcceptedID, err := b.VM.LastAccepted(ctx) + b.tree, err = interval.NewTree(b.DB) if err != nil { - return fmt.Errorf("couldn't get last accepted ID: %w", err) + return fmt.Errorf("failed to initialize interval tree: %w", err) } - lastAccepted, err := b.VM.GetBlock(ctx, lastAcceptedID) + + b.missingBlockIDs, err = getMissingBlockIDs(ctx, b.DB, b.VM, b.tree, b.startingHeight) if err != nil { - return fmt.Errorf("couldn't get last accepted block: %w", err) + return fmt.Errorf("failed to initialize missing block IDs: %w", err) } - b.startingHeight = lastAccepted.Height() - b.requestID = startReqID return b.tryStartBootstrapping(ctx) } @@ -378,10 +372,8 @@ func (b *Bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs [] // Initialize the fetch from set to the currently preferred peers b.fetchFrom = b.StartupTracker.PreferredPeers() - pendingContainerIDs := b.Blocked.MissingIDs() - // Append the list of accepted container IDs to pendingContainerIDs to ensure - // we iterate over every container that must be traversed. - pendingContainerIDs = append(pendingContainerIDs, acceptedContainerIDs...) + b.missingBlockIDs.Add(acceptedContainerIDs...) + numMissingBlockIDs := b.missingBlockIDs.Len() log := b.Ctx.Log.Info if b.restarted { @@ -389,13 +381,11 @@ func (b *Bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs [] } log("starting to fetch blocks", zap.Int("numAcceptedBlocks", len(acceptedContainerIDs)), - zap.Int("numMissingBlocks", len(pendingContainerIDs)), + zap.Int("numMissingBlocks", numMissingBlockIDs), ) - toProcess := make([]snowman.Block, 0, len(pendingContainerIDs)) - for _, blkID := range pendingContainerIDs { - b.Blocked.AddMissingID(blkID) - + toProcess := make([]snowman.Block, 0, numMissingBlockIDs) + for blkID := range b.missingBlockIDs { // TODO: if `GetBlock` returns an error other than // `database.ErrNotFound`, then the error should be propagated. blk, err := b.VM.GetBlock(ctx, blkID) @@ -408,7 +398,7 @@ func (b *Bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs [] toProcess = append(toProcess, blk) } - b.initiallyFetched = b.Blocked.PendingJobs() + b.initiallyFetched = b.tree.Len() b.startTime = time.Now() // Process received blocks @@ -428,11 +418,6 @@ func (b *Bootstrapper) fetch(ctx context.Context, blkID ids.ID) error { return nil } - // Make sure we don't already have this block - if _, err := b.VM.GetBlock(ctx, blkID); err == nil { - return b.tryStartExecuting(ctx) - } - validatorID, ok := b.fetchFrom.Peek() if !ok { return fmt.Errorf("dropping request for %s as there are no validators", blkID) @@ -526,7 +511,11 @@ func (b *Bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request for _, block := range blocks[1:] { blockSet[block.ID()] = block } - return b.process(ctx, requestedBlock, blockSet) + if err := b.process(ctx, requestedBlock, blockSet); err != nil { + return err + } + + return b.tryStartExecuting(ctx) } func (b *Bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { @@ -566,133 +555,80 @@ func (b *Bootstrapper) markUnavailable(nodeID ids.NodeID) { // // - blk is a block that is assumed to have been marked as acceptable by the // bootstrapping engine. -// - processingBlocks is a set of blocks that can be used to lookup blocks. -// This enables the engine to process multiple blocks without relying on the -// VM to have stored blocks during `ParseBlock`. -// -// If [blk]'s height is <= the last accepted height, then it will be removed -// from the missingIDs set. -func (b *Bootstrapper) process(ctx context.Context, blk snowman.Block, processingBlocks map[ids.ID]snowman.Block) error { - for { - blkID := blk.ID() - if b.Halted() { - // We must add in [blkID] to the set of missing IDs so that we are - // guaranteed to continue processing from this state when the - // bootstrapper is restarted. - b.Blocked.AddMissingID(blkID) - return b.Blocked.Commit() - } - - b.Blocked.RemoveMissingID(blkID) - - status := blk.Status() - // The status should never be rejected here - but we check to fail as - // quickly as possible - if status == choices.Rejected { - return fmt.Errorf("bootstrapping wants to accept %s, however it was previously rejected", blkID) - } - - blkHeight := blk.Height() - if status == choices.Accepted || blkHeight <= b.startingHeight { - // We can stop traversing, as we have reached the accepted frontier - if err := b.Blocked.Commit(); err != nil { - return err - } - return b.tryStartExecuting(ctx) - } +// - ancestors is a set of blocks that can be used to optimistically lookup +// parent blocks. This enables the engine to process multiple blocks without +// relying on the VM to have stored blocks during `ParseBlock`. +func (b *Bootstrapper) process( + ctx context.Context, + blk snowman.Block, + ancestors map[ids.ID]snowman.Block, +) error { + lastAcceptedHeight, err := b.getLastAcceptedHeight(ctx) + if err != nil { + return err + } - // If this block is going to be accepted, make sure to update the - // tipHeight for logging - if blkHeight > b.tipHeight { - b.tipHeight = blkHeight - } + numPreviouslyFetched := b.tree.Len() - pushed, err := b.Blocked.Push(ctx, &blockJob{ - log: b.Ctx.Log, - numAccepted: b.numAccepted, - blk: blk, - vm: b.VM, - }) - if err != nil { - return err - } + batch := b.DB.NewBatch() + missingBlockID, foundNewMissingID, err := process( + batch, + b.tree, + b.missingBlockIDs, + lastAcceptedHeight, + blk, + ancestors, + ) + if err != nil { + return err + } - if !pushed { - // We can stop traversing, as we have reached a block that we - // previously pushed onto the jobs queue - if err := b.Blocked.Commit(); err != nil { - return err - } - return b.tryStartExecuting(ctx) - } + // Update metrics and log statuses + { + numFetched := b.tree.Len() + b.numFetched.Add(float64(b.tree.Len() - numPreviouslyFetched)) - // We added a new block to the queue, so track that it was fetched - b.numFetched.Inc() + height := blk.Height() + b.tipHeight = max(b.tipHeight, height) - // Periodically log progress - blocksFetchedSoFar := b.Blocked.Jobs.PendingJobs() - if blocksFetchedSoFar%statusUpdateFrequency == 0 { + if numPreviouslyFetched/statusUpdateFrequency != numFetched/statusUpdateFrequency { totalBlocksToFetch := b.tipHeight - b.startingHeight eta := timer.EstimateETA( b.startTime, - blocksFetchedSoFar-b.initiallyFetched, // Number of blocks we have fetched during this run + numFetched-b.initiallyFetched, // Number of blocks we have fetched during this run totalBlocksToFetch-b.initiallyFetched, // Number of blocks we expect to fetch during this run ) - b.fetchETA.Set(float64(eta)) if !b.restarted { b.Ctx.Log.Info("fetching blocks", - zap.Uint64("numFetchedBlocks", blocksFetchedSoFar), + zap.Uint64("numFetchedBlocks", numFetched), zap.Uint64("numTotalBlocks", totalBlocksToFetch), zap.Duration("eta", eta), ) } else { b.Ctx.Log.Debug("fetching blocks", - zap.Uint64("numFetchedBlocks", blocksFetchedSoFar), + zap.Uint64("numFetchedBlocks", numFetched), zap.Uint64("numTotalBlocks", totalBlocksToFetch), zap.Duration("eta", eta), ) } } + } - // Attempt to traverse to the next block - parentID := blk.Parent() - - // First check if the parent is in the processing blocks set - parent, ok := processingBlocks[parentID] - if ok { - blk = parent - continue - } - - // If the parent is not available in processing blocks, attempt to get - // the block from the vm - parent, err = b.VM.GetBlock(ctx, parentID) - if err == nil { - blk = parent - continue - } - // TODO: report errors that aren't `database.ErrNotFound` - - // If the block wasn't able to be acquired immediately, attempt to fetch - // it - b.Blocked.AddMissingID(parentID) - if err := b.fetch(ctx, parentID); err != nil { - return err - } - - if err := b.Blocked.Commit(); err != nil { - return err - } - return b.tryStartExecuting(ctx) + if err := batch.Write(); err != nil || !foundNewMissingID { + return err } + + b.missingBlockIDs.Add(missingBlockID) + // Attempt to fetch the newly discovered block + return b.fetch(ctx, missingBlockID) } // tryStartExecuting executes all pending blocks if there are no more blocks // being fetched. After executing all pending blocks it will either restart // bootstrapping, or transition into normal operations. func (b *Bootstrapper) tryStartExecuting(ctx context.Context) error { - if numPending := b.Blocked.NumMissingIDs(); numPending != 0 { + if numMissingBlockIDs := b.missingBlockIDs.Len(); numMissingBlockIDs != 0 { return nil } @@ -700,34 +636,41 @@ func (b *Bootstrapper) tryStartExecuting(ctx context.Context) error { return nil } - if !b.restarted { - b.Ctx.Log.Info("executing blocks", - zap.Uint64("numPendingJobs", b.Blocked.PendingJobs()), - ) - } else { - b.Ctx.Log.Debug("executing blocks", - zap.Uint64("numPendingJobs", b.Blocked.PendingJobs()), - ) + lastAcceptedHeight, err := b.getLastAcceptedHeight(ctx) + if err != nil { + return err } - executedBlocks, err := b.Blocked.ExecuteAll( + log := b.Ctx.Log.Info + if b.restarted { + log = b.Ctx.Log.Debug + } + + numToExecute := b.tree.Len() + err = execute( ctx, - b.Config.Ctx, b, - b.restarted, - b.Ctx.BlockAcceptor, + log, + b.DB, + &parseAcceptor{ + parser: b.VM, + ctx: b.Ctx, + numAccepted: b.numAccepted, + }, + b.tree, + lastAcceptedHeight, ) if err != nil || b.Halted() { return err } previouslyExecuted := b.executedStateTransitions - b.executedStateTransitions = executedBlocks + b.executedStateTransitions = numToExecute // Note that executedBlocks < c*previouslyExecuted ( 0 <= c < 1 ) is enforced // so that the bootstrapping process will terminate even as new blocks are // being issued. - if executedBlocks > 0 && executedBlocks < previouslyExecuted/2 { + if numToExecute > 0 && numToExecute < previouslyExecuted/2 { return b.restartBootstrapping(ctx) } @@ -743,21 +686,28 @@ func (b *Bootstrapper) tryStartExecuting(ctx context.Context) error { // If the subnet hasn't finished bootstrapping, this chain should remain // syncing. if !b.Config.BootstrapTracker.IsBootstrapped() { - if !b.restarted { - b.Ctx.Log.Info("waiting for the remaining chains in this subnet to finish syncing") - } else { - b.Ctx.Log.Debug("waiting for the remaining chains in this subnet to finish syncing") - } + log("waiting for the remaining chains in this subnet to finish syncing") // Restart bootstrapping after [bootstrappingDelay] to keep up to date // on the latest tip. b.Config.Timer.RegisterTimeout(bootstrappingDelay) b.awaitingTimeout = true return nil } - b.fetchETA.Set(0) return b.onFinished(ctx, b.requestID) } +func (b *Bootstrapper) getLastAcceptedHeight(ctx context.Context) (uint64, error) { + lastAcceptedID, err := b.VM.LastAccepted(ctx) + if err != nil { + return 0, fmt.Errorf("couldn't get last accepted ID: %w", err) + } + lastAccepted, err := b.VM.GetBlock(ctx, lastAcceptedID) + if err != nil { + return 0, fmt.Errorf("couldn't get last accepted block: %w", err) + } + return lastAccepted.Height(), nil +} + func (b *Bootstrapper) Timeout(ctx context.Context) error { if !b.awaitingTimeout { return errUnexpectedTimeout @@ -767,7 +717,6 @@ func (b *Bootstrapper) Timeout(ctx context.Context) error { if !b.Config.BootstrapTracker.IsBootstrapped() { return b.restartBootstrapping(ctx) } - b.fetchETA.Set(0) return b.onFinished(ctx, b.requestID) } diff --git a/snow/engine/snowman/bootstrap/bootstrapper_test.go b/snow/engine/snowman/bootstrap/bootstrapper_test.go index 846b082ce8f0..d5db4be5ae93 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper_test.go +++ b/snow/engine/snowman/bootstrap/bootstrapper_test.go @@ -6,11 +6,11 @@ package bootstrap import ( "bytes" "context" + "encoding/binary" "errors" "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/database" @@ -21,14 +21,13 @@ import ( "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/consensus/snowman" "github.com/ava-labs/avalanchego/snow/engine/common" - "github.com/ava-labs/avalanchego/snow/engine/common/queue" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap/interval" "github.com/ava-labs/avalanchego/snow/engine/snowman/getter" "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils" - "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" ) @@ -79,7 +78,6 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *common.SenderTest, *block.Tes snowGetHandler, err := getter.New(vm, sender, ctx.Log, time.Second, 2000, ctx.Registerer) require.NoError(err) - blocker, _ := queue.NewWithMissing(memdb.New(), "", prometheus.NewRegistry()) return Config{ AllGetsServer: snowGetHandler, Ctx: ctx, @@ -90,7 +88,7 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *common.SenderTest, *block.Tes BootstrapTracker: bootstrapTracker, Timer: &common.TimerTest{}, AncestorsMaxContainersReceived: 2000, - Blocked: blocker, + DB: memdb.New(), VM: vm, }, peer, sender, vm } @@ -117,7 +115,6 @@ func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) { startupTracker := tracker.NewStartup(peerTracker, startupAlpha) peers.RegisterCallbackListener(ctx.SubnetID, startupTracker) - blocker, _ := queue.NewWithMissing(memdb.New(), "", prometheus.NewRegistry()) snowGetHandler, err := getter.New(vm, sender, ctx.Log, time.Second, 2000, ctx.Registerer) require.NoError(err) cfg := Config{ @@ -130,7 +127,7 @@ func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) { BootstrapTracker: &common.BootstrapTrackerTest{}, Timer: &common.TimerTest{}, AncestorsMaxContainersReceived: 2000, - Blocked: blocker, + DB: memdb.New(), VM: vm, } @@ -201,38 +198,8 @@ func TestBootstrapperSingleFrontier(t *testing.T) { config, _, _, vm := newConfig(t) - blkID0 := ids.Empty.Prefix(0) - blkID1 := ids.Empty.Prefix(1) - - blkBytes0 := []byte{0} - blkBytes1 := []byte{1} - - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID0, - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: blkBytes0, - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID1, - StatusV: choices.Processing, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: blkBytes1, - } - - vm.CantLastAccepted = false - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - require.Equal(blk0.ID(), blkID) - return blk0, nil - } + blks := generateBlockchain(1) + initializeVMWithBlockchain(vm, blks) bs, err := New( config, @@ -246,91 +213,21 @@ func TestBootstrapperSingleFrontier(t *testing.T) { ) require.NoError(err) - vm.CantSetState = false require.NoError(bs.Start(context.Background(), 0)) - acceptedIDs := []ids.ID{blkID1} - - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blkID1: - return blk1, nil - case blkID0: - return blk0, nil - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blkBytes1): - return blk1, nil - case bytes.Equal(blkBytes, blkBytes0): - return blk0, nil - } - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } - - require.NoError(bs.startSyncing(context.Background(), acceptedIDs)) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[0:1]))) require.Equal(snow.NormalOp, config.Ctx.State.Get().State) - require.Equal(choices.Accepted, blk1.Status()) } -// Requests the unknown block and gets back a Ancestors with unexpected request ID. -// Requests again and gets response from unexpected peer. -// Requests again and gets an unexpected block. +// Requests the unknown block and gets back a Ancestors with unexpected block. // Requests again and gets the expected block. func TestBootstrapperUnknownByzantineResponse(t *testing.T) { require := require.New(t) config, peerID, sender, vm := newConfig(t) - blkID0 := ids.Empty.Prefix(0) - blkID1 := ids.Empty.Prefix(1) - blkID2 := ids.Empty.Prefix(2) - - blkBytes0 := []byte{0} - blkBytes1 := []byte{1} - blkBytes2 := []byte{2} - - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID0, - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: blkBytes0, - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID1, - StatusV: choices.Unknown, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: blkBytes1, - } - blk2 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID2, - StatusV: choices.Processing, - }, - ParentV: blk1.IDV, - HeightV: 2, - BytesV: blkBytes2, - } - - vm.CantSetState = false - vm.CantLastAccepted = false - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - require.Equal(blk0.ID(), blkID) - return blk0, nil - } + blks := generateBlockchain(2) + initializeVMWithBlockchain(vm, blks) bs, err := New( config, @@ -346,123 +243,36 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) { require.NoError(bs.Start(context.Background(), 0)) - parsedBlk1 := false - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blkID0: - return blk0, nil - case blkID1: - if parsedBlk1 { - return blk1, nil - } - return nil, database.ErrNotFound - case blkID2: - return blk2, nil - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blkBytes0): - return blk0, nil - case bytes.Equal(blkBytes, blkBytes1): - blk1.StatusV = choices.Processing - parsedBlk1 = true - return blk1, nil - case bytes.Equal(blkBytes, blkBytes2): - return blk2, nil - } - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } - var requestID uint32 - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, blkID ids.ID) { - require.Equal(peerID, vdr) - require.Equal(blkID1, blkID) + sender.SendGetAncestorsF = func(_ context.Context, nodeID ids.NodeID, reqID uint32, blkID ids.ID) { + require.Equal(peerID, nodeID) + require.Equal(blks[1].ID(), blkID) requestID = reqID } - vm.CantSetState = false - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blkID2})) // should request blk1 + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[1:2]))) // should request blk1 oldReqID := requestID - require.NoError(bs.Ancestors(context.Background(), peerID, requestID, [][]byte{blkBytes0})) // respond with wrong block + require.NoError(bs.Ancestors(context.Background(), peerID, requestID, blocksToBytes(blks[0:1]))) // respond with wrong block require.NotEqual(oldReqID, requestID) - require.NoError(bs.Ancestors(context.Background(), peerID, requestID, [][]byte{blkBytes1})) + require.NoError(bs.Ancestors(context.Background(), peerID, requestID, blocksToBytes(blks[1:2]))) require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) - require.Equal(choices.Accepted, blk0.Status()) - require.Equal(choices.Accepted, blk1.Status()) - require.Equal(choices.Accepted, blk2.Status()) + requireStatusIs(require, blks, choices.Accepted) - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blkID2})) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[1:2]))) require.Equal(snow.NormalOp, config.Ctx.State.Get().State) } -// There are multiple needed blocks and Ancestors returns one at a time +// There are multiple needed blocks and multiple Ancestors are required func TestBootstrapperPartialFetch(t *testing.T) { require := require.New(t) config, peerID, sender, vm := newConfig(t) - blkID0 := ids.Empty.Prefix(0) - blkID1 := ids.Empty.Prefix(1) - blkID2 := ids.Empty.Prefix(2) - blkID3 := ids.Empty.Prefix(3) - - blkBytes0 := []byte{0} - blkBytes1 := []byte{1} - blkBytes2 := []byte{2} - blkBytes3 := []byte{3} - - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID0, - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: blkBytes0, - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID1, - StatusV: choices.Unknown, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: blkBytes1, - } - blk2 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID2, - StatusV: choices.Unknown, - }, - ParentV: blk1.IDV, - HeightV: 2, - BytesV: blkBytes2, - } - blk3 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID3, - StatusV: choices.Processing, - }, - ParentV: blk2.IDV, - HeightV: 3, - BytesV: blkBytes3, - } - - vm.CantLastAccepted = false - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - require.Equal(blk0.ID(), blkID) - return blk0, nil - } + blks := generateBlockchain(4) + initializeVMWithBlockchain(vm, blks) bs, err := New( config, @@ -476,140 +286,43 @@ func TestBootstrapperPartialFetch(t *testing.T) { ) require.NoError(err) - vm.CantSetState = false require.NoError(bs.Start(context.Background(), 0)) - acceptedIDs := []ids.ID{blkID3} - - parsedBlk1 := false - parsedBlk2 := false - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blkID0: - return blk0, nil - case blkID1: - if parsedBlk1 { - return blk1, nil - } - return nil, database.ErrNotFound - case blkID2: - if parsedBlk2 { - return blk2, nil - } - return nil, database.ErrNotFound - case blkID3: - return blk3, nil - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blkBytes0): - return blk0, nil - case bytes.Equal(blkBytes, blkBytes1): - blk1.StatusV = choices.Processing - parsedBlk1 = true - return blk1, nil - case bytes.Equal(blkBytes, blkBytes2): - blk2.StatusV = choices.Processing - parsedBlk2 = true - return blk2, nil - case bytes.Equal(blkBytes, blkBytes3): - return blk3, nil - } - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } - - requestID := new(uint32) - requested := ids.Empty - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, blkID ids.ID) { - require.Equal(peerID, vdr) - require.Contains([]ids.ID{blkID1, blkID2}, blkID) - *requestID = reqID + var ( + requestID uint32 + requested ids.ID + ) + sender.SendGetAncestorsF = func(_ context.Context, nodeID ids.NodeID, reqID uint32, blkID ids.ID) { + require.Equal(peerID, nodeID) + require.Contains([]ids.ID{blks[1].ID(), blks[3].ID()}, blkID) + requestID = reqID requested = blkID } - require.NoError(bs.startSyncing(context.Background(), acceptedIDs)) // should request blk2 + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[3:4]))) // should request blk3 + require.Equal(blks[3].ID(), requested) - require.NoError(bs.Ancestors(context.Background(), peerID, *requestID, [][]byte{blkBytes2})) // respond with blk2 - require.Equal(blkID1, requested) + require.NoError(bs.Ancestors(context.Background(), peerID, requestID, blocksToBytes(blks[2:4]))) // respond with blk3 and blk2 + require.Equal(blks[1].ID(), requested) - require.NoError(bs.Ancestors(context.Background(), peerID, *requestID, [][]byte{blkBytes1})) // respond with blk1 - require.Equal(blkID1, requested) + require.NoError(bs.Ancestors(context.Background(), peerID, requestID, blocksToBytes(blks[1:2]))) // respond with blk1 require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) - require.Equal(choices.Accepted, blk0.Status()) - require.Equal(choices.Accepted, blk1.Status()) - require.Equal(choices.Accepted, blk2.Status()) + requireStatusIs(require, blks, choices.Accepted) - require.NoError(bs.startSyncing(context.Background(), acceptedIDs)) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[3:4]))) require.Equal(snow.NormalOp, config.Ctx.State.Get().State) } -// There are multiple needed blocks and some validators do not have all the blocks -// This test was modeled after TestBootstrapperPartialFetch. +// There are multiple needed blocks and some validators do not have all the +// blocks. func TestBootstrapperEmptyResponse(t *testing.T) { require := require.New(t) config, peerID, sender, vm := newConfig(t) - blkID0 := ids.Empty.Prefix(0) - blkID1 := ids.Empty.Prefix(1) - blkID2 := ids.Empty.Prefix(2) - blkID3 := ids.Empty.Prefix(3) - - blkBytes0 := []byte{0} - blkBytes1 := []byte{1} - blkBytes2 := []byte{2} - blkBytes3 := []byte{3} - - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID0, - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: blkBytes0, - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID1, - StatusV: choices.Unknown, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: blkBytes1, - } - blk2 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID2, - StatusV: choices.Unknown, - }, - ParentV: blk1.IDV, - HeightV: 2, - BytesV: blkBytes2, - } - blk3 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID3, - StatusV: choices.Processing, - }, - ParentV: blk2.IDV, - HeightV: 3, - BytesV: blkBytes3, - } - - vm.CantLastAccepted = false - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - require.Equal(blk0.ID(), blkID) - return blk0, nil - } + blks := generateBlockchain(2) + initializeVMWithBlockchain(vm, blks) bs, err := New( config, @@ -623,93 +336,34 @@ func TestBootstrapperEmptyResponse(t *testing.T) { ) require.NoError(err) - vm.CantSetState = false require.NoError(bs.Start(context.Background(), 0)) - acceptedIDs := []ids.ID{blkID3} - - parsedBlk1 := false - parsedBlk2 := false - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blkID0: - return blk0, nil - case blkID1: - if parsedBlk1 { - return blk1, nil - } - return nil, database.ErrNotFound - case blkID2: - if parsedBlk2 { - return blk2, nil - } - return nil, database.ErrNotFound - case blkID3: - return blk3, nil - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blkBytes0): - return blk0, nil - case bytes.Equal(blkBytes, blkBytes1): - blk1.StatusV = choices.Processing - parsedBlk1 = true - return blk1, nil - case bytes.Equal(blkBytes, blkBytes2): - blk2.StatusV = choices.Processing - parsedBlk2 = true - return blk2, nil - case bytes.Equal(blkBytes, blkBytes3): - return blk3, nil - } - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } - - requestedVdr := ids.EmptyNodeID - requestID := uint32(0) - requestedBlock := ids.Empty - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, blkID ids.ID) { - requestedVdr = vdr + var ( + requestedNodeID ids.NodeID + requestID uint32 + ) + sender.SendGetAncestorsF = func(_ context.Context, nodeID ids.NodeID, reqID uint32, blkID ids.ID) { + require.Equal(blks[1].ID(), blkID) + requestedNodeID = nodeID requestID = reqID - requestedBlock = blkID } - // should request blk2 - require.NoError(bs.startSyncing(context.Background(), acceptedIDs)) - require.Equal(peerID, requestedVdr) - require.Equal(blkID2, requestedBlock) - - // add another two validators to the fetch set to test behavior on empty response - newPeerID := ids.GenerateTestNodeID() - bs.fetchFrom.Add(newPeerID) - - newPeerID = ids.GenerateTestNodeID() - bs.fetchFrom.Add(newPeerID) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[1:2]))) + require.Equal(requestedNodeID, peerID) - require.NoError(bs.Ancestors(context.Background(), peerID, requestID, [][]byte{blkBytes2})) - require.Equal(blkID1, requestedBlock) + // add another 2 validators to the fetch set to test behavior on empty + // response + bs.fetchFrom.Add(ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) - peerToBlacklist := requestedVdr - - // respond with empty - require.NoError(bs.Ancestors(context.Background(), peerToBlacklist, requestID, nil)) - require.NotEqual(peerToBlacklist, requestedVdr) - require.Equal(blkID1, requestedBlock) - - require.NoError(bs.Ancestors(context.Background(), requestedVdr, requestID, [][]byte{blkBytes1})) // respond with blk1 + require.NoError(bs.Ancestors(context.Background(), requestedNodeID, requestID, nil)) // respond with empty + require.NotEqual(requestedNodeID, peerID) + require.NoError(bs.Ancestors(context.Background(), requestedNodeID, requestID, blocksToBytes(blks[1:2]))) require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) - require.Equal(choices.Accepted, blk0.Status()) - require.Equal(choices.Accepted, blk1.Status()) - require.Equal(choices.Accepted, blk2.Status()) + requireStatusIs(require, blks, choices.Accepted) - // check peerToBlacklist was removed from the fetch set - require.NotContains(bs.fetchFrom, peerToBlacklist) + // check that peerID was removed from the fetch set + require.NotContains(bs.fetchFrom, peerID) } // There are multiple needed blocks and Ancestors returns all at once @@ -718,61 +372,8 @@ func TestBootstrapperAncestors(t *testing.T) { config, peerID, sender, vm := newConfig(t) - blkID0 := ids.Empty.Prefix(0) - blkID1 := ids.Empty.Prefix(1) - blkID2 := ids.Empty.Prefix(2) - blkID3 := ids.Empty.Prefix(3) - - blkBytes0 := []byte{0} - blkBytes1 := []byte{1} - blkBytes2 := []byte{2} - blkBytes3 := []byte{3} - - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID0, - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: blkBytes0, - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID1, - StatusV: choices.Unknown, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: blkBytes1, - } - blk2 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID2, - StatusV: choices.Unknown, - }, - ParentV: blk1.IDV, - HeightV: 2, - BytesV: blkBytes2, - } - blk3 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID3, - StatusV: choices.Processing, - }, - ParentV: blk2.IDV, - HeightV: 3, - BytesV: blkBytes3, - } - - vm.CantSetState = false - vm.CantLastAccepted = false - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - require.Equal(blk0.ID(), blkID) - return blk0, nil - } + blks := generateBlockchain(4) + initializeVMWithBlockchain(vm, blks) bs, err := New( config, @@ -788,69 +389,26 @@ func TestBootstrapperAncestors(t *testing.T) { require.NoError(bs.Start(context.Background(), 0)) - acceptedIDs := []ids.ID{blkID3} - - parsedBlk1 := false - parsedBlk2 := false - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blkID0: - return blk0, nil - case blkID1: - if parsedBlk1 { - return blk1, nil - } - return nil, database.ErrNotFound - case blkID2: - if parsedBlk2 { - return blk2, nil - } - return nil, database.ErrNotFound - case blkID3: - return blk3, nil - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blkBytes0): - return blk0, nil - case bytes.Equal(blkBytes, blkBytes1): - blk1.StatusV = choices.Processing - parsedBlk1 = true - return blk1, nil - case bytes.Equal(blkBytes, blkBytes2): - blk2.StatusV = choices.Processing - parsedBlk2 = true - return blk2, nil - case bytes.Equal(blkBytes, blkBytes3): - return blk3, nil - } - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } - - requestID := new(uint32) - requested := ids.Empty - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, blkID ids.ID) { - require.Equal(peerID, vdr) - require.Contains([]ids.ID{blkID1, blkID2}, blkID) - *requestID = reqID + var ( + requestID uint32 + requested ids.ID + ) + sender.SendGetAncestorsF = func(_ context.Context, nodeID ids.NodeID, reqID uint32, blkID ids.ID) { + require.Equal(peerID, nodeID) + require.Equal(blks[3].ID(), blkID) + requestID = reqID requested = blkID } - require.NoError(bs.startSyncing(context.Background(), acceptedIDs)) // should request blk2 - require.NoError(bs.Ancestors(context.Background(), peerID, *requestID, [][]byte{blkBytes2, blkBytes1})) // respond with blk2 and blk1 - require.Equal(blkID2, requested) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[3:4]))) // should request blk3 + require.Equal(blks[3].ID(), requested) + + require.NoError(bs.Ancestors(context.Background(), peerID, requestID, blocksToBytes(blks))) // respond with all the blocks require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) - require.Equal(choices.Accepted, blk0.Status()) - require.Equal(choices.Accepted, blk1.Status()) - require.Equal(choices.Accepted, blk2.Status()) + requireStatusIs(require, blks, choices.Accepted) - require.NoError(bs.startSyncing(context.Background(), acceptedIDs)) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[3:4]))) require.Equal(snow.NormalOp, config.Ctx.State.Get().State) } @@ -859,49 +417,9 @@ func TestBootstrapperFinalized(t *testing.T) { config, peerID, sender, vm := newConfig(t) - blkID0 := ids.Empty.Prefix(0) - blkID1 := ids.Empty.Prefix(1) - blkID2 := ids.Empty.Prefix(2) - - blkBytes0 := []byte{0} - blkBytes1 := []byte{1} - blkBytes2 := []byte{2} - - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID0, - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: blkBytes0, - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID1, - StatusV: choices.Unknown, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: blkBytes1, - } - blk2 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID2, - StatusV: choices.Unknown, - }, - ParentV: blk1.IDV, - HeightV: 2, - BytesV: blkBytes2, - } + blks := generateBlockchain(3) + initializeVMWithBlockchain(vm, blks) - vm.CantLastAccepted = false - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - require.Equal(blk0.ID(), blkID) - return blk0, nil - } bs, err := New( config, func(context.Context, uint32) error { @@ -914,66 +432,25 @@ func TestBootstrapperFinalized(t *testing.T) { ) require.NoError(err) - vm.CantSetState = false require.NoError(bs.Start(context.Background(), 0)) - parsedBlk1 := false - parsedBlk2 := false - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blkID0: - return blk0, nil - case blkID1: - if parsedBlk1 { - return blk1, nil - } - return nil, database.ErrNotFound - case blkID2: - if parsedBlk2 { - return blk2, nil - } - return nil, database.ErrNotFound - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blkBytes0): - return blk0, nil - case bytes.Equal(blkBytes, blkBytes1): - blk1.StatusV = choices.Processing - parsedBlk1 = true - return blk1, nil - case bytes.Equal(blkBytes, blkBytes2): - blk2.StatusV = choices.Processing - parsedBlk2 = true - return blk2, nil - } - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } - requestIDs := map[ids.ID]uint32{} - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, blkID ids.ID) { - require.Equal(peerID, vdr) + sender.SendGetAncestorsF = func(_ context.Context, nodeID ids.NodeID, reqID uint32, blkID ids.ID) { + require.Equal(peerID, nodeID) requestIDs[blkID] = reqID } - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blkID1, blkID2})) // should request blk2 and blk1 + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[1:3]))) // should request blk1 and blk2 - reqIDBlk2, ok := requestIDs[blkID2] + reqIDBlk2, ok := requestIDs[blks[2].ID()] require.True(ok) - require.NoError(bs.Ancestors(context.Background(), peerID, reqIDBlk2, [][]byte{blkBytes2, blkBytes1})) + require.NoError(bs.Ancestors(context.Background(), peerID, reqIDBlk2, blocksToBytes(blks[1:3]))) require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) - require.Equal(choices.Accepted, blk0.Status()) - require.Equal(choices.Accepted, blk1.Status()) - require.Equal(choices.Accepted, blk2.Status()) + requireStatusIs(require, blks, choices.Accepted) - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blkID2})) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[2:3]))) require.Equal(snow.NormalOp, config.Ctx.State.Get().State) } @@ -982,124 +459,8 @@ func TestRestartBootstrapping(t *testing.T) { config, peerID, sender, vm := newConfig(t) - blkID0 := ids.Empty.Prefix(0) - blkID1 := ids.Empty.Prefix(1) - blkID2 := ids.Empty.Prefix(2) - blkID3 := ids.Empty.Prefix(3) - blkID4 := ids.Empty.Prefix(4) - - blkBytes0 := []byte{0} - blkBytes1 := []byte{1} - blkBytes2 := []byte{2} - blkBytes3 := []byte{3} - blkBytes4 := []byte{4} - - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID0, - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: blkBytes0, - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID1, - StatusV: choices.Unknown, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: blkBytes1, - } - blk2 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID2, - StatusV: choices.Unknown, - }, - ParentV: blk1.IDV, - HeightV: 2, - BytesV: blkBytes2, - } - blk3 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID3, - StatusV: choices.Unknown, - }, - ParentV: blk2.IDV, - HeightV: 3, - BytesV: blkBytes3, - } - blk4 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID4, - StatusV: choices.Unknown, - }, - ParentV: blk3.IDV, - HeightV: 4, - BytesV: blkBytes4, - } - - vm.CantLastAccepted = false - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } - parsedBlk1 := false - parsedBlk2 := false - parsedBlk3 := false - parsedBlk4 := false - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blkID0: - return blk0, nil - case blkID1: - if parsedBlk1 { - return blk1, nil - } - return nil, database.ErrNotFound - case blkID2: - if parsedBlk2 { - return blk2, nil - } - return nil, database.ErrNotFound - case blkID3: - if parsedBlk3 { - return blk3, nil - } - return nil, database.ErrNotFound - case blkID4: - if parsedBlk4 { - return blk4, nil - } - return nil, database.ErrNotFound - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blkBytes0): - return blk0, nil - case bytes.Equal(blkBytes, blkBytes1): - blk1.StatusV = choices.Processing - parsedBlk1 = true - return blk1, nil - case bytes.Equal(blkBytes, blkBytes2): - blk2.StatusV = choices.Processing - parsedBlk2 = true - return blk2, nil - case bytes.Equal(blkBytes, blkBytes3): - blk3.StatusV = choices.Processing - parsedBlk3 = true - return blk3, nil - case bytes.Equal(blkBytes, blkBytes4): - blk4.StatusV = choices.Processing - parsedBlk4 = true - return blk4, nil - } - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } + blks := generateBlockchain(5) + initializeVMWithBlockchain(vm, blks) bs, err := New( config, @@ -1113,51 +474,44 @@ func TestRestartBootstrapping(t *testing.T) { ) require.NoError(err) - vm.CantSetState = false require.NoError(bs.Start(context.Background(), 0)) requestIDs := map[ids.ID]uint32{} - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, blkID ids.ID) { - require.Equal(peerID, vdr) + sender.SendGetAncestorsF = func(_ context.Context, nodeID ids.NodeID, reqID uint32, blkID ids.ID) { + require.Equal(peerID, nodeID) requestIDs[blkID] = reqID } - // Force Accept blk3 - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blkID3})) // should request blk3 + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[3:4]))) // should request blk3 - reqID, ok := requestIDs[blkID3] + reqID, ok := requestIDs[blks[3].ID()] require.True(ok) - require.NoError(bs.Ancestors(context.Background(), peerID, reqID, [][]byte{blkBytes3, blkBytes2})) - - require.Contains(requestIDs, blkID1) + require.NoError(bs.Ancestors(context.Background(), peerID, reqID, blocksToBytes(blks[2:4]))) + require.Contains(requestIDs, blks[1].ID()) // Remove request, so we can restart bootstrapping via startSyncing - _, removed := bs.outstandingRequests.DeleteValue(blkID1) + _, removed := bs.outstandingRequests.DeleteValue(blks[1].ID()) require.True(removed) - requestIDs = map[ids.ID]uint32{} + clear(requestIDs) - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blkID4})) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[4:5]))) - blk1RequestID, ok := requestIDs[blkID1] + blk1RequestID, ok := requestIDs[blks[1].ID()] require.True(ok) - blk4RequestID, ok := requestIDs[blkID4] + blk4RequestID, ok := requestIDs[blks[4].ID()] require.True(ok) - require.NoError(bs.Ancestors(context.Background(), peerID, blk1RequestID, [][]byte{blkBytes1})) - - require.NotEqual(snow.NormalOp, config.Ctx.State.Get().State) - - require.NoError(bs.Ancestors(context.Background(), peerID, blk4RequestID, [][]byte{blkBytes4})) + require.NoError(bs.Ancestors(context.Background(), peerID, blk1RequestID, blocksToBytes(blks[1:2]))) + require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) + require.Equal(choices.Accepted, blks[0].Status()) + requireStatusIs(require, blks[1:], choices.Processing) + require.NoError(bs.Ancestors(context.Background(), peerID, blk4RequestID, blocksToBytes(blks[4:5]))) require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) - require.Equal(choices.Accepted, blk0.Status()) - require.Equal(choices.Accepted, blk1.Status()) - require.Equal(choices.Accepted, blk2.Status()) - require.Equal(choices.Accepted, blk3.Status()) - require.Equal(choices.Accepted, blk4.Status()) + requireStatusIs(require, blks, choices.Accepted) - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blkID4})) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[4:5]))) require.Equal(snow.NormalOp, config.Ctx.State.Get().State) } @@ -1166,48 +520,11 @@ func TestBootstrapOldBlockAfterStateSync(t *testing.T) { config, peerID, sender, vm := newConfig(t) - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - HeightV: 0, - BytesV: utils.RandomBytes(32), - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Accepted, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: utils.RandomBytes(32), - } + blks := generateBlockchain(2) + initializeVMWithBlockchain(vm, blks) - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk1.ID(), nil - } - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blk0.ID(): - return nil, database.ErrNotFound - case blk1.ID(): - return blk1, nil - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blk0.Bytes()): - return blk0, nil - case bytes.Equal(blkBytes, blk1.Bytes()): - return blk1, nil - } - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } + blks[0].(*snowman.TestBlock).StatusV = choices.Processing + require.NoError(blks[1].Accept(context.Background())) bs, err := New( config, @@ -1221,25 +538,24 @@ func TestBootstrapOldBlockAfterStateSync(t *testing.T) { ) require.NoError(err) - vm.CantSetState = false require.NoError(bs.Start(context.Background(), 0)) requestIDs := map[ids.ID]uint32{} - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, blkID ids.ID) { - require.Equal(peerID, vdr) + sender.SendGetAncestorsF = func(_ context.Context, nodeID ids.NodeID, reqID uint32, blkID ids.ID) { + require.Equal(peerID, nodeID) requestIDs[blkID] = reqID } // Force Accept, the already transitively accepted, blk0 - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blk0.ID()})) // should request blk0 + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[0:1]))) // should request blk0 - reqID, ok := requestIDs[blk0.ID()] + reqID, ok := requestIDs[blks[0].ID()] require.True(ok) - require.NoError(bs.Ancestors(context.Background(), peerID, reqID, [][]byte{blk0.Bytes()})) + require.NoError(bs.Ancestors(context.Background(), peerID, reqID, blocksToBytes(blks[0:1]))) require.Equal(snow.NormalOp, config.Ctx.State.Get().State) - require.Equal(choices.Processing, blk0.Status()) - require.Equal(choices.Accepted, blk1.Status()) + require.Equal(choices.Processing, blks[0].Status()) + require.Equal(choices.Accepted, blks[1].Status()) } func TestBootstrapContinueAfterHalt(t *testing.T) { @@ -1247,36 +563,8 @@ func TestBootstrapContinueAfterHalt(t *testing.T) { config, _, _, vm := newConfig(t) - blk0 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: utils.RandomBytes(32), - } - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - ParentV: blk0.IDV, - HeightV: 1, - BytesV: utils.RandomBytes(32), - } - blk2 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - ParentV: blk1.IDV, - HeightV: 2, - BytesV: utils.RandomBytes(32), - } - - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } + blks := generateBlockchain(2) + initializeVMWithBlockchain(vm, blks) bs, err := New( config, @@ -1290,27 +578,16 @@ func TestBootstrapContinueAfterHalt(t *testing.T) { ) require.NoError(err) - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blk0.ID(): - return blk0, nil - case blk1.ID(): - bs.Halt(context.Background()) - return blk1, nil - case blk2.ID(): - return blk2, nil - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } + getBlockF := vm.GetBlockF + vm.GetBlockF = func(ctx context.Context, blkID ids.ID) (snowman.Block, error) { + bs.Halt(ctx) + return getBlockF(ctx, blkID) } - vm.CantSetState = false require.NoError(bs.Start(context.Background(), 0)) - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blk2.ID()})) - - require.Equal(1, bs.Blocked.NumMissingIDs()) + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[1:2]))) + require.Equal(1, bs.missingBlockIDs.Len()) } func TestBootstrapNoParseOnNew(t *testing.T) { @@ -1355,10 +632,6 @@ func TestBootstrapNoParseOnNew(t *testing.T) { snowGetHandler, err := getter.New(vm, sender, ctx.Log, time.Second, 2000, ctx.Registerer) require.NoError(err) - queueDB := memdb.New() - blocker, err := queue.NewWithMissing(queueDB, "", prometheus.NewRegistry()) - require.NoError(err) - blk0 := &snowman.TestBlock{ TestDecidable: choices.TestDecidable{ IDV: ids.GenerateTestID(), @@ -1383,22 +656,14 @@ func TestBootstrapNoParseOnNew(t *testing.T) { return blk0, nil } - pushed, err := blocker.Push(context.Background(), &blockJob{ - log: logging.NoLog{}, - numAccepted: prometheus.NewCounter(prometheus.CounterOpts{}), - blk: blk1, - vm: vm, - }) + intervalDB := memdb.New() + tree, err := interval.NewTree(intervalDB) + require.NoError(err) + _, err = interval.Add(intervalDB, tree, 0, blk1.Height(), blk1.Bytes()) require.NoError(err) - require.True(pushed) - - require.NoError(blocker.Commit()) vm.GetBlockF = nil - blocker, err = queue.NewWithMissing(queueDB, "", prometheus.NewRegistry()) - require.NoError(err) - config := Config{ AllGetsServer: snowGetHandler, Ctx: ctx, @@ -1409,7 +674,7 @@ func TestBootstrapNoParseOnNew(t *testing.T) { BootstrapTracker: bootstrapTracker, Timer: &common.TimerTest{}, AncestorsMaxContainersReceived: 2000, - Blocked: blocker, + DB: intervalDB, VM: vm, } @@ -1431,50 +696,9 @@ func TestBootstrapperReceiveStaleAncestorsMessage(t *testing.T) { config, peerID, sender, vm := newConfig(t) - var ( - blkID0 = ids.GenerateTestID() - blkBytes0 = utils.RandomBytes(1024) - blk0 = &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID0, - StatusV: choices.Accepted, - }, - HeightV: 0, - BytesV: blkBytes0, - } - - blkID1 = ids.GenerateTestID() - blkBytes1 = utils.RandomBytes(1024) - blk1 = &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID1, - StatusV: choices.Processing, - }, - ParentV: blk0.IDV, - HeightV: blk0.HeightV + 1, - BytesV: blkBytes1, - } - - blkID2 = ids.GenerateTestID() - blkBytes2 = utils.RandomBytes(1024) - blk2 = &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: blkID2, - StatusV: choices.Processing, - }, - ParentV: blk1.IDV, - HeightV: blk1.HeightV + 1, - BytesV: blkBytes2, - } - ) + blks := generateBlockchain(3) + initializeVMWithBlockchain(vm, blks) - vm.LastAcceptedF = func(context.Context) (ids.ID, error) { - return blk0.ID(), nil - } - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - require.Equal(blkID0, blkID) - return blk0, nil - } bs, err := New( config, func(context.Context, uint32) error { @@ -1487,62 +711,111 @@ func TestBootstrapperReceiveStaleAncestorsMessage(t *testing.T) { ) require.NoError(err) - vm.CantSetState = false require.NoError(bs.Start(context.Background(), 0)) - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case blkID0: - return blk0, nil - case blkID1: - if blk1.StatusV == choices.Accepted { - return blk1, nil - } - return nil, database.ErrNotFound - case blkID2: - if blk2.StatusV == choices.Accepted { - return blk2, nil - } - return nil, database.ErrNotFound - default: - require.FailNow(database.ErrNotFound.Error()) - return nil, database.ErrNotFound - } - } - vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { - switch { - case bytes.Equal(blkBytes, blkBytes0): - return blk0, nil - case bytes.Equal(blkBytes, blkBytes1): - return blk1, nil - case bytes.Equal(blkBytes, blkBytes2): - return blk2, nil - default: - require.FailNow(errUnknownBlock.Error()) - return nil, errUnknownBlock - } - } - requestIDs := map[ids.ID]uint32{} - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, blkID ids.ID) { - require.Equal(peerID, vdr) + sender.SendGetAncestorsF = func(_ context.Context, nodeID ids.NodeID, reqID uint32, blkID ids.ID) { + require.Equal(peerID, nodeID) requestIDs[blkID] = reqID } - require.NoError(bs.startSyncing(context.Background(), []ids.ID{blkID1, blkID2})) // should request blk2 and blk1 + require.NoError(bs.startSyncing(context.Background(), blocksToIDs(blks[1:3]))) // should request blk1 and blk2 - reqIDBlk1, ok := requestIDs[blkID1] + reqIDBlk1, ok := requestIDs[blks[1].ID()] require.True(ok) - reqIDBlk2, ok := requestIDs[blkID2] + reqIDBlk2, ok := requestIDs[blks[2].ID()] require.True(ok) - require.NoError(bs.Ancestors(context.Background(), peerID, reqIDBlk2, [][]byte{blkBytes2, blkBytes1})) - + require.NoError(bs.Ancestors(context.Background(), peerID, reqIDBlk2, blocksToBytes(blks[1:3]))) require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) - require.Equal(choices.Accepted, blk0.Status()) - require.Equal(choices.Accepted, blk1.Status()) - require.Equal(choices.Accepted, blk2.Status()) + requireStatusIs(require, blks, choices.Accepted) - require.NoError(bs.Ancestors(context.Background(), peerID, reqIDBlk1, [][]byte{blkBytes1})) + require.NoError(bs.Ancestors(context.Background(), peerID, reqIDBlk1, blocksToBytes(blks[1:2]))) require.Equal(snow.Bootstrapping, config.Ctx.State.Get().State) } + +func generateBlockchain(length uint64) []snowman.Block { + if length == 0 { + return nil + } + + blocks := make([]snowman.Block, length) + blocks[0] = &snowman.TestBlock{ + TestDecidable: choices.TestDecidable{ + IDV: ids.GenerateTestID(), + StatusV: choices.Accepted, + }, + ParentV: ids.Empty, + HeightV: 0, + BytesV: binary.AppendUvarint(nil, 0), + } + for height := uint64(1); height < length; height++ { + blocks[height] = &snowman.TestBlock{ + TestDecidable: choices.TestDecidable{ + IDV: ids.GenerateTestID(), + StatusV: choices.Processing, + }, + ParentV: blocks[height-1].ID(), + HeightV: height, + BytesV: binary.AppendUvarint(nil, height), + } + } + return blocks +} + +func initializeVMWithBlockchain(vm *block.TestVM, blocks []snowman.Block) { + vm.CantSetState = false + vm.LastAcceptedF = func(context.Context) (ids.ID, error) { + var ( + lastAcceptedID ids.ID + lastAcceptedHeight uint64 + ) + for _, blk := range blocks { + height := blk.Height() + if blk.Status() == choices.Accepted && height >= lastAcceptedHeight { + lastAcceptedID = blk.ID() + lastAcceptedHeight = height + } + } + return lastAcceptedID, nil + } + vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { + for _, blk := range blocks { + if blk.Status() == choices.Accepted && blk.ID() == blkID { + return blk, nil + } + } + return nil, database.ErrNotFound + } + vm.ParseBlockF = func(_ context.Context, blkBytes []byte) (snowman.Block, error) { + for _, blk := range blocks { + if bytes.Equal(blk.Bytes(), blkBytes) { + return blk, nil + } + } + return nil, errUnknownBlock + } +} + +func requireStatusIs(require *require.Assertions, blocks []snowman.Block, status choices.Status) { + for i, blk := range blocks { + require.Equal(status, blk.Status(), i) + } +} + +func blocksToIDs(blocks []snowman.Block) []ids.ID { + blkIDs := make([]ids.ID, len(blocks)) + for i, blk := range blocks { + blkIDs[i] = blk.ID() + } + return blkIDs +} + +func blocksToBytes(blocks []snowman.Block) [][]byte { + numBlocks := len(blocks) + blkBytes := make([][]byte, numBlocks) + for i, blk := range blocks { + blkBytes[numBlocks-i-1] = blk.Bytes() + } + return blkBytes +} diff --git a/snow/engine/snowman/bootstrap/config.go b/snow/engine/snowman/bootstrap/config.go index 6fb8894db96f..5ddef07970d3 100644 --- a/snow/engine/snowman/bootstrap/config.go +++ b/snow/engine/snowman/bootstrap/config.go @@ -4,9 +4,9 @@ package bootstrap import ( + "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" - "github.com/ava-labs/avalanchego/snow/engine/common/queue" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/validators" @@ -28,12 +28,9 @@ type Config struct { // containers in an ancestors message it receives. AncestorsMaxContainersReceived int - // Blocked tracks operations that are blocked on blocks - // - // It should be guaranteed that `MissingIDs` should contain all IDs - // referenced by the `MissingDependencies` that have not already been added - // to the queue. - Blocked *queue.JobsWithMissing + // Database used to track the fetched, but not yet executed, blocks during + // bootstrapping. + DB database.Database VM block.ChainVM diff --git a/snow/engine/snowman/bootstrap/metrics.go b/snow/engine/snowman/bootstrap/metrics.go index aea46d2a93e8..311ed05f136d 100644 --- a/snow/engine/snowman/bootstrap/metrics.go +++ b/snow/engine/snowman/bootstrap/metrics.go @@ -11,7 +11,6 @@ import ( type metrics struct { numFetched, numAccepted prometheus.Counter - fetchETA prometheus.Gauge } func newMetrics(namespace string, registerer prometheus.Registerer) (*metrics, error) { @@ -26,17 +25,11 @@ func newMetrics(namespace string, registerer prometheus.Registerer) (*metrics, e Name: "accepted", Help: "Number of blocks accepted during bootstrapping", }), - fetchETA: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "eta_fetching_complete", - Help: "ETA in nanoseconds until fetching phase of bootstrapping finishes", - }), } err := utils.Err( registerer.Register(m.numFetched), registerer.Register(m.numAccepted), - registerer.Register(m.fetchETA), ) return m, err } diff --git a/snow/engine/snowman/bootstrap/storage.go b/snow/engine/snowman/bootstrap/storage.go new file mode 100644 index 000000000000..ee266e578692 --- /dev/null +++ b/snow/engine/snowman/bootstrap/storage.go @@ -0,0 +1,252 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrap + +import ( + "context" + "fmt" + "time" + + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/consensus/snowman" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap/interval" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/utils/timer" +) + +const ( + batchWritePeriod = 64 + iteratorReleasePeriod = 1024 + logPeriod = 5 * time.Second +) + +// getMissingBlockIDs returns the ID of the blocks that should be fetched to +// attempt to make a single continuous range from +// (lastAcceptedHeight, highestTrackedHeight]. +// +// For example, if the tree currently contains heights [1, 4, 6, 7] and the +// lastAcceptedHeight is 2, this function will return the IDs corresponding to +// blocks [3, 5]. +func getMissingBlockIDs( + ctx context.Context, + db database.KeyValueReader, + parser block.Parser, + tree *interval.Tree, + lastAcceptedHeight uint64, +) (set.Set[ids.ID], error) { + var ( + missingBlocks set.Set[ids.ID] + intervals = tree.Flatten() + lastHeightToFetch = lastAcceptedHeight + 1 + ) + for _, i := range intervals { + if i.LowerBound <= lastHeightToFetch { + continue + } + + blkBytes, err := interval.GetBlock(db, i.LowerBound) + if err != nil { + return nil, err + } + + blk, err := parser.ParseBlock(ctx, blkBytes) + if err != nil { + return nil, err + } + + parentID := blk.Parent() + missingBlocks.Add(parentID) + } + return missingBlocks, nil +} + +// process a series of consecutive blocks starting at [blk]. +// +// - blk is a block that is assumed to have been marked as acceptable by the +// bootstrapping engine. +// - ancestors is a set of blocks that can be used to lookup blocks. +// +// If [blk]'s height is <= the last accepted height, then it will be removed +// from the missingIDs set. +// +// Returns a newly discovered blockID that should be fetched. +func process( + db database.KeyValueWriterDeleter, + tree *interval.Tree, + missingBlockIDs set.Set[ids.ID], + lastAcceptedHeight uint64, + blk snowman.Block, + ancestors map[ids.ID]snowman.Block, +) (ids.ID, bool, error) { + for { + // It's possible that missingBlockIDs contain values contained inside of + // ancestors. So, it's important to remove IDs from the set for each + // iteration, not just the first block's ID. + blkID := blk.ID() + missingBlockIDs.Remove(blkID) + + height := blk.Height() + blkBytes := blk.Bytes() + wantsParent, err := interval.Add( + db, + tree, + lastAcceptedHeight, + height, + blkBytes, + ) + if err != nil || !wantsParent { + return ids.Empty, false, err + } + + // If the parent was provided in the ancestors set, we can immediately + // process it. + parentID := blk.Parent() + parent, ok := ancestors[parentID] + if !ok { + return parentID, true, nil + } + + blk = parent + } +} + +// execute all the blocks tracked by the tree. If a block is in the tree but is +// already accepted based on the lastAcceptedHeight, it will be removed from the +// tree but not executed. +// +// execute assumes that getMissingBlockIDs would return an empty set. +// +// TODO: Replace usage of haltable with context cancellation. +func execute( + ctx context.Context, + haltable common.Haltable, + log logging.Func, + db database.Database, + parser block.Parser, + tree *interval.Tree, + lastAcceptedHeight uint64, +) error { + var ( + batch = db.NewBatch() + processedSinceBatchWrite uint + writeBatch = func() error { + if processedSinceBatchWrite == 0 { + return nil + } + processedSinceBatchWrite = 0 + + if err := batch.Write(); err != nil { + return err + } + batch.Reset() + return nil + } + + iterator = interval.GetBlockIterator(db) + processedSinceIteratorRelease uint + + startTime = time.Now() + timeOfNextLog = startTime.Add(logPeriod) + totalNumberToProcess = tree.Len() + ) + defer func() { + iterator.Release() + }() + + log("executing blocks", + zap.Uint64("numToExecute", totalNumberToProcess), + ) + + for !haltable.Halted() && iterator.Next() { + blkBytes := iterator.Value() + blk, err := parser.ParseBlock(ctx, blkBytes) + if err != nil { + return err + } + + height := blk.Height() + if err := interval.Remove(batch, tree, height); err != nil { + return err + } + + // Periodically write the batch to disk to avoid memory pressure. + processedSinceBatchWrite++ + if processedSinceBatchWrite >= batchWritePeriod { + if err := writeBatch(); err != nil { + return err + } + } + + // Periodically release and re-grab the database iterator to avoid + // keeping a reference to an old database revision. + processedSinceIteratorRelease++ + if processedSinceIteratorRelease >= iteratorReleasePeriod { + if err := iterator.Error(); err != nil { + return err + } + + // The batch must be written here to avoid re-processing a block. + if err := writeBatch(); err != nil { + return err + } + + processedSinceIteratorRelease = 0 + iterator.Release() + iterator = interval.GetBlockIterator(db) + } + + if now := time.Now(); now.After(timeOfNextLog) { + var ( + numProcessed = totalNumberToProcess - tree.Len() + eta = timer.EstimateETA(startTime, numProcessed, totalNumberToProcess) + ) + log("executing blocks", + zap.Uint64("numExecuted", numProcessed), + zap.Uint64("numToExecute", totalNumberToProcess), + zap.Duration("eta", eta), + ) + timeOfNextLog = now.Add(logPeriod) + } + + if height <= lastAcceptedHeight { + continue + } + + if err := blk.Verify(ctx); err != nil { + return fmt.Errorf("failed to verify block %s (%d) in bootstrapping: %w", + blk.ID(), + height, + err, + ) + } + if err := blk.Accept(ctx); err != nil { + return fmt.Errorf("failed to accept block %s (%d) in bootstrapping: %w", + blk.ID(), + height, + err, + ) + } + } + if err := writeBatch(); err != nil { + return err + } + if err := iterator.Error(); err != nil { + return err + } + + numProcessed := totalNumberToProcess - tree.Len() + log("executed blocks", + zap.Uint64("numExecuted", numProcessed), + zap.Uint64("numToExecute", totalNumberToProcess), + zap.Bool("halted", haltable.Halted()), + zap.Duration("duration", time.Since(startTime)), + ) + return nil +} diff --git a/snow/engine/snowman/bootstrap/storage_test.go b/snow/engine/snowman/bootstrap/storage_test.go new file mode 100644 index 000000000000..6ac2761f8cd7 --- /dev/null +++ b/snow/engine/snowman/bootstrap/storage_test.go @@ -0,0 +1,311 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrap + +import ( + "bytes" + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/choices" + "github.com/ava-labs/avalanchego/snow/consensus/snowman" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap/interval" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" +) + +var _ block.Parser = testParser(nil) + +func TestGetMissingBlockIDs(t *testing.T) { + blocks := generateBlockchain(7) + parser := makeParser(blocks) + + tests := []struct { + name string + blocks []snowman.Block + lastAcceptedHeight uint64 + expected set.Set[ids.ID] + }{ + { + name: "initially empty", + blocks: nil, + lastAcceptedHeight: 0, + expected: nil, + }, + { + name: "wants one block", + blocks: []snowman.Block{blocks[4]}, + lastAcceptedHeight: 0, + expected: set.Of(blocks[3].ID()), + }, + { + name: "wants multiple blocks", + blocks: []snowman.Block{blocks[2], blocks[4]}, + lastAcceptedHeight: 0, + expected: set.Of(blocks[1].ID(), blocks[3].ID()), + }, + { + name: "doesn't want last accepted block", + blocks: []snowman.Block{blocks[1]}, + lastAcceptedHeight: 0, + expected: nil, + }, + { + name: "doesn't want known block", + blocks: []snowman.Block{blocks[2], blocks[3]}, + lastAcceptedHeight: 0, + expected: set.Of(blocks[1].ID()), + }, + { + name: "doesn't want already accepted block", + blocks: []snowman.Block{blocks[1]}, + lastAcceptedHeight: 4, + expected: nil, + }, + { + name: "doesn't underflow", + blocks: []snowman.Block{blocks[0]}, + lastAcceptedHeight: 0, + expected: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + db := memdb.New() + tree, err := interval.NewTree(db) + require.NoError(err) + for _, blk := range test.blocks { + _, err := interval.Add(db, tree, 0, blk.Height(), blk.Bytes()) + require.NoError(err) + } + + missingBlockIDs, err := getMissingBlockIDs( + context.Background(), + db, + parser, + tree, + test.lastAcceptedHeight, + ) + require.NoError(err) + require.Equal(test.expected, missingBlockIDs) + }) + } +} + +func TestProcess(t *testing.T) { + blocks := generateBlockchain(7) + + tests := []struct { + name string + initialBlocks []snowman.Block + lastAcceptedHeight uint64 + missingBlockIDs set.Set[ids.ID] + blk snowman.Block + ancestors map[ids.ID]snowman.Block + expectedParentID ids.ID + expectedShouldFetchParentID bool + expectedMissingBlockIDs set.Set[ids.ID] + expectedTrackedHeights []uint64 + }{ + { + name: "add single block", + initialBlocks: nil, + lastAcceptedHeight: 0, + missingBlockIDs: set.Of(blocks[5].ID()), + blk: blocks[5], + ancestors: nil, + expectedParentID: blocks[4].ID(), + expectedShouldFetchParentID: true, + expectedMissingBlockIDs: set.Set[ids.ID]{}, + expectedTrackedHeights: []uint64{5}, + }, + { + name: "add multiple blocks", + initialBlocks: nil, + lastAcceptedHeight: 0, + missingBlockIDs: set.Of(blocks[5].ID()), + blk: blocks[5], + ancestors: map[ids.ID]snowman.Block{ + blocks[4].ID(): blocks[4], + }, + expectedParentID: blocks[3].ID(), + expectedShouldFetchParentID: true, + expectedMissingBlockIDs: set.Set[ids.ID]{}, + expectedTrackedHeights: []uint64{4, 5}, + }, + { + name: "ignore non-consecutive blocks", + initialBlocks: nil, + lastAcceptedHeight: 0, + missingBlockIDs: set.Of(blocks[3].ID(), blocks[5].ID()), + blk: blocks[5], + ancestors: map[ids.ID]snowman.Block{ + blocks[3].ID(): blocks[3], + }, + expectedParentID: blocks[4].ID(), + expectedShouldFetchParentID: true, + expectedMissingBlockIDs: set.Of(blocks[3].ID()), + expectedTrackedHeights: []uint64{5}, + }, + { + name: "do not request the last accepted block", + initialBlocks: nil, + lastAcceptedHeight: 2, + missingBlockIDs: set.Of(blocks[3].ID()), + blk: blocks[3], + ancestors: nil, + expectedParentID: ids.Empty, + expectedShouldFetchParentID: false, + expectedMissingBlockIDs: set.Set[ids.ID]{}, + expectedTrackedHeights: []uint64{3}, + }, + { + name: "do not request already known block", + initialBlocks: []snowman.Block{blocks[2]}, + lastAcceptedHeight: 0, + missingBlockIDs: set.Of(blocks[1].ID(), blocks[3].ID()), + blk: blocks[3], + ancestors: nil, + expectedParentID: ids.Empty, + expectedShouldFetchParentID: false, + expectedMissingBlockIDs: set.Of(blocks[1].ID()), + expectedTrackedHeights: []uint64{2, 3}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + db := memdb.New() + tree, err := interval.NewTree(db) + require.NoError(err) + for _, blk := range test.initialBlocks { + _, err := interval.Add(db, tree, 0, blk.Height(), blk.Bytes()) + require.NoError(err) + } + + parentID, shouldFetchParentID, err := process( + db, + tree, + test.missingBlockIDs, + test.lastAcceptedHeight, + test.blk, + test.ancestors, + ) + require.NoError(err) + require.Equal(test.expectedShouldFetchParentID, shouldFetchParentID) + require.Equal(test.expectedParentID, parentID) + require.Equal(test.expectedMissingBlockIDs, test.missingBlockIDs) + + require.Equal(uint64(len(test.expectedTrackedHeights)), tree.Len()) + for _, height := range test.expectedTrackedHeights { + require.True(tree.Contains(height)) + } + }) + } +} + +func TestExecute(t *testing.T) { + const numBlocks = 7 + + unhalted := &common.Halter{} + halted := &common.Halter{} + halted.Halt(context.Background()) + + tests := []struct { + name string + haltable common.Haltable + lastAcceptedHeight uint64 + expectedProcessingHeights []uint64 + expectedAcceptedHeights []uint64 + }{ + { + name: "execute everything", + haltable: unhalted, + lastAcceptedHeight: 0, + expectedProcessingHeights: nil, + expectedAcceptedHeights: []uint64{0, 1, 2, 3, 4, 5, 6}, + }, + { + name: "do not execute blocks accepted by height", + haltable: unhalted, + lastAcceptedHeight: 3, + expectedProcessingHeights: []uint64{1, 2, 3}, + expectedAcceptedHeights: []uint64{0, 4, 5, 6}, + }, + { + name: "do not execute blocks when halted", + haltable: halted, + lastAcceptedHeight: 0, + expectedProcessingHeights: []uint64{1, 2, 3, 4, 5, 6}, + expectedAcceptedHeights: []uint64{0}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + db := memdb.New() + tree, err := interval.NewTree(db) + require.NoError(err) + + blocks := generateBlockchain(numBlocks) + parser := makeParser(blocks) + for _, blk := range blocks { + _, err := interval.Add(db, tree, 0, blk.Height(), blk.Bytes()) + require.NoError(err) + } + + require.NoError(execute( + context.Background(), + test.haltable, + logging.NoLog{}.Info, + db, + parser, + tree, + test.lastAcceptedHeight, + )) + for _, height := range test.expectedProcessingHeights { + require.Equal(choices.Processing, blocks[height].Status()) + } + for _, height := range test.expectedAcceptedHeights { + require.Equal(choices.Accepted, blocks[height].Status()) + } + + if test.haltable.Halted() { + return + } + + size, err := database.Count(db) + require.NoError(err) + require.Zero(size) + }) + } +} + +type testParser func(context.Context, []byte) (snowman.Block, error) + +func (f testParser) ParseBlock(ctx context.Context, bytes []byte) (snowman.Block, error) { + return f(ctx, bytes) +} + +func makeParser(blocks []snowman.Block) block.Parser { + return testParser(func(_ context.Context, b []byte) (snowman.Block, error) { + for _, block := range blocks { + if bytes.Equal(b, block.Bytes()) { + return block, nil + } + } + return nil, database.ErrNotFound + }) +} diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 1eb98a115da6..7fd32ef8a21e 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -24,7 +24,6 @@ import ( "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/consensus/snowball" "github.com/ava-labs/avalanchego/snow/engine/common" - "github.com/ava-labs/avalanchego/snow/engine/common/queue" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap" "github.com/ava-labs/avalanchego/snow/networking/benchlist" @@ -1307,8 +1306,6 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { baseDB := memdb.New() vmDB := prefixdb.New(chains.VMDBPrefix, baseDB) bootstrappingDB := prefixdb.New(chains.ChainBootstrappingDBPrefix, baseDB) - blocked, err := queue.NewWithMissing(bootstrappingDB, "", prometheus.NewRegistry()) - require.NoError(err) vm := &VM{Config: config.Config{ Chains: chains.TestManager, @@ -1484,7 +1481,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { Sender: sender, BootstrapTracker: bootstrapTracker, AncestorsMaxContainersReceived: 2000, - Blocked: blocked, + DB: bootstrappingDB, VM: vm, }