From 043644f969ff1a4af0d302fd25cce84506aaa99e Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Thu, 16 Nov 2023 12:13:09 -0500 Subject: [PATCH] Refactor bootstrapper implementation into consensus (#2300) Co-authored-by: Dan Laine --- .../snowman/bootstrapper/majority.go | 110 +++++ .../snowman/bootstrapper/majority_test.go | 396 ++++++++++++++++++ .../snowman/bootstrapper/minority.go | 77 ++++ .../snowman/bootstrapper/minority_test.go | 242 +++++++++++ snow/consensus/snowman/bootstrapper/noop.go | 27 ++ .../snowman/bootstrapper/noop_test.go | 23 + snow/consensus/snowman/bootstrapper/poll.go | 23 + .../snowman/bootstrapper/poll_test.go | 15 + .../snowman/bootstrapper/requests.go | 48 +++ .../consensus/snowman/bootstrapper/sampler.go | 49 +++ .../snowman/bootstrapper/sampler_test.go | 75 ++++ snow/engine/common/bootstrapper.go | 332 ++++----------- 12 files changed, 1166 insertions(+), 251 deletions(-) create mode 100644 snow/consensus/snowman/bootstrapper/majority.go create mode 100644 snow/consensus/snowman/bootstrapper/majority_test.go create mode 100644 snow/consensus/snowman/bootstrapper/minority.go create mode 100644 snow/consensus/snowman/bootstrapper/minority_test.go create mode 100644 snow/consensus/snowman/bootstrapper/noop.go create mode 100644 snow/consensus/snowman/bootstrapper/noop_test.go create mode 100644 snow/consensus/snowman/bootstrapper/poll.go create mode 100644 snow/consensus/snowman/bootstrapper/poll_test.go create mode 100644 snow/consensus/snowman/bootstrapper/requests.go create mode 100644 snow/consensus/snowman/bootstrapper/sampler.go create mode 100644 snow/consensus/snowman/bootstrapper/sampler_test.go diff --git a/snow/consensus/snowman/bootstrapper/majority.go b/snow/consensus/snowman/bootstrapper/majority.go new file mode 100644 index 000000000000..1decb837ef40 --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/majority.go @@ -0,0 +1,110 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "context" + + "go.uber.org/zap" + + "golang.org/x/exp/maps" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/set" +) + +var _ Poll = (*Majority)(nil) + +// Majority implements the bootstrapping poll to filter the initial set of +// potentially accaptable blocks into a set of accepted blocks to sync to. +// +// Once the last accepted blocks have been fetched from the initial set of +// peers, the set of blocks are sent to all peers. Each peer is expected to +// filter the provided blocks and report which of them they consider accepted. +// If a majority of the peers report that a block is accepted, then the node +// will consider that block to be accepted by the network. This assumes that a +// majority of the network is correct. If a majority of the network is +// malicious, the node may accept an incorrect block. +type Majority struct { + requests + + log logging.Logger + nodeWeights map[ids.NodeID]uint64 + + // received maps the blockID to the total sum of weight that has reported + // that block as accepted. + received map[ids.ID]uint64 + accepted []ids.ID +} + +func NewMajority( + log logging.Logger, + nodeWeights map[ids.NodeID]uint64, + maxOutstanding int, +) *Majority { + return &Majority{ + requests: requests{ + maxOutstanding: maxOutstanding, + pendingSend: set.Of(maps.Keys(nodeWeights)...), + }, + log: log, + nodeWeights: nodeWeights, + received: make(map[ids.ID]uint64), + } +} + +func (m *Majority) RecordOpinion(_ context.Context, nodeID ids.NodeID, blkIDs set.Set[ids.ID]) error { + if !m.recordResponse(nodeID) { + // The chain router should have already dropped unexpected messages. + m.log.Error("received unexpected opinion", + zap.String("pollType", "majority"), + zap.Stringer("nodeID", nodeID), + zap.Reflect("blkIDs", blkIDs), + ) + return nil + } + + weight := m.nodeWeights[nodeID] + for blkID := range blkIDs { + newWeight, err := math.Add64(m.received[blkID], weight) + if err != nil { + return err + } + m.received[blkID] = newWeight + } + + if !m.finished() { + return nil + } + + var ( + totalWeight uint64 + err error + ) + for _, weight := range m.nodeWeights { + totalWeight, err = math.Add64(totalWeight, weight) + if err != nil { + return err + } + } + + requiredWeight := totalWeight/2 + 1 + for blkID, weight := range m.received { + if weight >= requiredWeight { + m.accepted = append(m.accepted, blkID) + } + } + + m.log.Debug("finalized bootstrapping poll", + zap.String("pollType", "majority"), + zap.Stringers("accepted", m.accepted), + ) + return nil +} + +func (m *Majority) Result(context.Context) ([]ids.ID, bool) { + return m.accepted, m.finished() +} diff --git a/snow/consensus/snowman/bootstrapper/majority_test.go b/snow/consensus/snowman/bootstrapper/majority_test.go new file mode 100644 index 000000000000..d276566fb910 --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/majority_test.go @@ -0,0 +1,396 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "context" + "math" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + + safemath "github.com/ava-labs/avalanchego/utils/math" +) + +func TestNewMajority(t *testing.T) { + majority := NewMajority( + logging.NoLog{}, // log + map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, // nodeWeights + 2, // maxOutstanding + ) + + expectedMajority := &Majority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Of(nodeID0, nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: make(map[ids.ID]uint64), + } + require.Equal(t, expectedMajority, majority) +} + +func TestMajorityGetPeers(t *testing.T) { + tests := []struct { + name string + majority Poll + expectedState Poll + expectedPeers set.Set[ids.NodeID] + }{ + { + name: "max outstanding", + majority: &Majority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: make(map[ids.ID]uint64), + }, + expectedState: &Majority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: make(map[ids.ID]uint64), + }, + expectedPeers: nil, + }, + { + name: "send until max outstanding", + majority: &Majority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Of(nodeID0, nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: make(map[ids.ID]uint64), + }, + expectedState: &Majority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Set[ids.NodeID]{}, + outstanding: set.Of(nodeID0, nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: make(map[ids.ID]uint64), + }, + expectedPeers: set.Of(nodeID0, nodeID1), + }, + { + name: "send until no more to send", + majority: &Majority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Of(nodeID0), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + }, + received: make(map[ids.ID]uint64), + }, + expectedState: &Majority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Set[ids.NodeID]{}, + outstanding: set.Of(nodeID0), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + }, + received: make(map[ids.ID]uint64), + }, + expectedPeers: set.Of(nodeID0), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + peers := test.majority.GetPeers(context.Background()) + require.Equal(test.expectedState, test.majority) + require.Equal(test.expectedPeers, peers) + }) + } +} + +func TestMajorityRecordOpinion(t *testing.T) { + tests := []struct { + name string + majority Poll + nodeID ids.NodeID + blkIDs set.Set[ids.ID] + expectedState Poll + expectedErr error + }{ + { + name: "unexpected response", + majority: &Majority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: make(map[ids.ID]uint64), + }, + nodeID: nodeID0, + blkIDs: nil, + expectedState: &Majority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: make(map[ids.ID]uint64), + }, + expectedErr: nil, + }, + { + name: "unfinished after response", + majority: &Majority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 2, + nodeID1: 3, + }, + received: make(map[ids.ID]uint64), + }, + nodeID: nodeID1, + blkIDs: set.Of(blkID0), + expectedState: &Majority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Set[ids.NodeID]{}, + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 2, + nodeID1: 3, + }, + received: map[ids.ID]uint64{ + blkID0: 3, + }, + }, + expectedErr: nil, + }, + { + name: "overflow during response", + majority: &Majority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: math.MaxUint64, + }, + received: map[ids.ID]uint64{ + blkID0: 1, + }, + }, + nodeID: nodeID1, + blkIDs: set.Of(blkID0), + expectedState: &Majority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Set[ids.NodeID]{}, + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: math.MaxUint64, + }, + received: map[ids.ID]uint64{ + blkID0: 1, + }, + }, + expectedErr: safemath.ErrOverflow, + }, + { + name: "overflow during final response", + majority: &Majority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: math.MaxUint64, + }, + received: make(map[ids.ID]uint64), + }, + nodeID: nodeID1, + blkIDs: set.Of(blkID0), + expectedState: &Majority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Set[ids.NodeID]{}, + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: math.MaxUint64, + }, + received: map[ids.ID]uint64{ + blkID0: math.MaxUint64, + }, + }, + expectedErr: safemath.ErrOverflow, + }, + { + name: "finished after response", + majority: &Majority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Of(nodeID2), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + nodeID2: 1, + }, + received: map[ids.ID]uint64{ + blkID0: 1, + blkID1: 1, + }, + }, + nodeID: nodeID2, + blkIDs: set.Of(blkID1), + expectedState: &Majority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Set[ids.NodeID]{}, + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + nodeID2: 1, + }, + received: map[ids.ID]uint64{ + blkID0: 1, + blkID1: 2, + }, + accepted: []ids.ID{blkID1}, + }, + expectedErr: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + err := test.majority.RecordOpinion(context.Background(), test.nodeID, test.blkIDs) + require.Equal(test.expectedState, test.majority) + require.ErrorIs(err, test.expectedErr) + }) + } +} + +func TestMajorityResult(t *testing.T) { + tests := []struct { + name string + majority Poll + expectedAccepted []ids.ID + expectedFinalized bool + }{ + { + name: "not finalized", + majority: &Majority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: make(map[ids.ID]uint64), + accepted: nil, + }, + expectedAccepted: nil, + expectedFinalized: false, + }, + { + name: "finalized", + majority: &Majority{ + requests: requests{ + maxOutstanding: 1, + }, + log: logging.NoLog{}, + nodeWeights: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + received: map[ids.ID]uint64{ + blkID0: 2, + }, + accepted: []ids.ID{blkID0}, + }, + expectedAccepted: []ids.ID{blkID0}, + expectedFinalized: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + accepted, finalized := test.majority.Result(context.Background()) + require.Equal(test.expectedAccepted, accepted) + require.Equal(test.expectedFinalized, finalized) + }) + } +} diff --git a/snow/consensus/snowman/bootstrapper/minority.go b/snow/consensus/snowman/bootstrapper/minority.go new file mode 100644 index 000000000000..52b45c4407ba --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/minority.go @@ -0,0 +1,77 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "context" + + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" +) + +var _ Poll = (*Minority)(nil) + +// Minority implements the bootstrapping poll to determine the initial set of +// potentially accaptable blocks. +// +// This poll fetches the last accepted block from an initial set of peers. In +// order for the protocol to find a recently accepted block, there must be at +// least one correct node in this set of peers. If there is not a correct node +// in the set of peers, the node will not accept an incorrect block. However, +// the node may be unable to find an acceptable block. +type Minority struct { + requests + + log logging.Logger + + receivedSet set.Set[ids.ID] + received []ids.ID +} + +func NewMinority( + log logging.Logger, + frontierNodes set.Set[ids.NodeID], + maxOutstanding int, +) *Minority { + return &Minority{ + requests: requests{ + maxOutstanding: maxOutstanding, + pendingSend: frontierNodes, + }, + log: log, + } +} + +func (m *Minority) RecordOpinion(_ context.Context, nodeID ids.NodeID, blkIDs set.Set[ids.ID]) error { + if !m.recordResponse(nodeID) { + // The chain router should have already dropped unexpected messages. + m.log.Error("received unexpected opinion", + zap.String("pollType", "minority"), + zap.Stringer("nodeID", nodeID), + zap.Reflect("blkIDs", blkIDs), + ) + return nil + } + + m.receivedSet.Union(blkIDs) + + if !m.finished() { + return nil + } + + m.received = m.receivedSet.List() + + m.log.Debug("finalized bootstrapping poll", + zap.String("pollType", "minority"), + zap.Stringers("frontier", m.received), + ) + return nil +} + +func (m *Minority) Result(context.Context) ([]ids.ID, bool) { + return m.received, m.finished() +} diff --git a/snow/consensus/snowman/bootstrapper/minority_test.go b/snow/consensus/snowman/bootstrapper/minority_test.go new file mode 100644 index 000000000000..f720ee18025a --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/minority_test.go @@ -0,0 +1,242 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" +) + +func TestNewMinority(t *testing.T) { + minority := NewMinority( + logging.NoLog{}, // log + set.Of(nodeID0), // frontierNodes + 2, // maxOutstanding + ) + + expectedMinority := &Minority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Of(nodeID0), + }, + log: logging.NoLog{}, + } + require.Equal(t, expectedMinority, minority) +} + +func TestMinorityGetPeers(t *testing.T) { + tests := []struct { + name string + minority Poll + expectedState Poll + expectedPeers set.Set[ids.NodeID] + }{ + { + name: "max outstanding", + minority: &Minority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + }, + expectedState: &Minority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + }, + expectedPeers: nil, + }, + { + name: "send until max outstanding", + minority: &Minority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Of(nodeID0, nodeID1), + }, + log: logging.NoLog{}, + }, + expectedState: &Minority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Set[ids.NodeID]{}, + outstanding: set.Of(nodeID0, nodeID1), + }, + log: logging.NoLog{}, + }, + expectedPeers: set.Of(nodeID0, nodeID1), + }, + { + name: "send until no more to send", + minority: &Minority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Of(nodeID0), + }, + log: logging.NoLog{}, + }, + expectedState: &Minority{ + requests: requests{ + maxOutstanding: 2, + pendingSend: set.Set[ids.NodeID]{}, + outstanding: set.Of(nodeID0), + }, + log: logging.NoLog{}, + }, + expectedPeers: set.Of(nodeID0), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + peers := test.minority.GetPeers(context.Background()) + require.Equal(test.expectedState, test.minority) + require.Equal(test.expectedPeers, peers) + }) + } +} + +func TestMinorityRecordOpinion(t *testing.T) { + tests := []struct { + name string + minority Poll + nodeID ids.NodeID + blkIDs set.Set[ids.ID] + expectedState Poll + expectedErr error + }{ + { + name: "unexpected response", + minority: &Minority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + }, + nodeID: nodeID0, + blkIDs: nil, + expectedState: &Minority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + }, + expectedErr: nil, + }, + { + name: "unfinished after response", + minority: &Minority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + }, + nodeID: nodeID1, + blkIDs: set.Of(blkID0), + expectedState: &Minority{ + requests: requests{ + maxOutstanding: 1, + pendingSend: set.Of(nodeID0), + outstanding: set.Set[ids.NodeID]{}, + }, + log: logging.NoLog{}, + receivedSet: set.Of(blkID0), + }, + expectedErr: nil, + }, + { + name: "finished after response", + minority: &Minority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Of(nodeID2), + }, + log: logging.NoLog{}, + }, + nodeID: nodeID2, + blkIDs: set.Of(blkID1), + expectedState: &Minority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Set[ids.NodeID]{}, + }, + log: logging.NoLog{}, + receivedSet: set.Of(blkID1), + received: []ids.ID{blkID1}, + }, + expectedErr: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + err := test.minority.RecordOpinion(context.Background(), test.nodeID, test.blkIDs) + require.Equal(test.expectedState, test.minority) + require.ErrorIs(err, test.expectedErr) + }) + } +} + +func TestMinorityResult(t *testing.T) { + tests := []struct { + name string + minority Poll + expectedAccepted []ids.ID + expectedFinalized bool + }{ + { + name: "not finalized", + minority: &Minority{ + requests: requests{ + maxOutstanding: 1, + outstanding: set.Of(nodeID1), + }, + log: logging.NoLog{}, + received: nil, + }, + expectedAccepted: nil, + expectedFinalized: false, + }, + { + name: "finalized", + minority: &Minority{ + requests: requests{ + maxOutstanding: 1, + }, + log: logging.NoLog{}, + receivedSet: set.Of(blkID0), + received: []ids.ID{blkID0}, + }, + expectedAccepted: []ids.ID{blkID0}, + expectedFinalized: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + accepted, finalized := test.minority.Result(context.Background()) + require.Equal(test.expectedAccepted, accepted) + require.Equal(test.expectedFinalized, finalized) + }) + } +} diff --git a/snow/consensus/snowman/bootstrapper/noop.go b/snow/consensus/snowman/bootstrapper/noop.go new file mode 100644 index 000000000000..1cd3bffd58b7 --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/noop.go @@ -0,0 +1,27 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "context" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/set" +) + +var Noop Poll = noop{} + +type noop struct{} + +func (noop) GetPeers(context.Context) set.Set[ids.NodeID] { + return nil +} + +func (noop) RecordOpinion(context.Context, ids.NodeID, set.Set[ids.ID]) error { + return nil +} + +func (noop) Result(context.Context) ([]ids.ID, bool) { + return nil, false +} diff --git a/snow/consensus/snowman/bootstrapper/noop_test.go b/snow/consensus/snowman/bootstrapper/noop_test.go new file mode 100644 index 000000000000..0a485a8fae76 --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/noop_test.go @@ -0,0 +1,23 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNoop(t *testing.T) { + require := require.New(t) + + require.Empty(Noop.GetPeers(context.Background())) + + require.NoError(Noop.RecordOpinion(context.Background(), nodeID0, nil)) + + blkIDs, finalized := Noop.Result(context.Background()) + require.Empty(blkIDs) + require.False(finalized) +} diff --git a/snow/consensus/snowman/bootstrapper/poll.go b/snow/consensus/snowman/bootstrapper/poll.go new file mode 100644 index 000000000000..450341d9d64d --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/poll.go @@ -0,0 +1,23 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "context" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/set" +) + +type Poll interface { + // GetPeers returns the set of peers whose opinion should be requested. It + // is expected to repeatedly call this function along with [RecordOpinion] + // until [Result] returns finalized. + GetPeers(ctx context.Context) (peers set.Set[ids.NodeID]) + // RecordOpinion of a node whose opinion was requested. + RecordOpinion(ctx context.Context, nodeID ids.NodeID, blkIDs set.Set[ids.ID]) error + // Result returns the evaluation of all the peer's opinions along with a + // flag to identify that the result has finished being calculated. + Result(ctx context.Context) (blkIDs []ids.ID, finalized bool) +} diff --git a/snow/consensus/snowman/bootstrapper/poll_test.go b/snow/consensus/snowman/bootstrapper/poll_test.go new file mode 100644 index 000000000000..134867ae1822 --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/poll_test.go @@ -0,0 +1,15 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import "github.com/ava-labs/avalanchego/ids" + +var ( + nodeID0 = ids.GenerateTestNodeID() + nodeID1 = ids.GenerateTestNodeID() + nodeID2 = ids.GenerateTestNodeID() + + blkID0 = ids.GenerateTestID() + blkID1 = ids.GenerateTestID() +) diff --git a/snow/consensus/snowman/bootstrapper/requests.go b/snow/consensus/snowman/bootstrapper/requests.go new file mode 100644 index 000000000000..28fc25ce1643 --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/requests.go @@ -0,0 +1,48 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "context" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/set" +) + +type requests struct { + maxOutstanding int + + pendingSend set.Set[ids.NodeID] + outstanding set.Set[ids.NodeID] +} + +func (r *requests) GetPeers(context.Context) set.Set[ids.NodeID] { + numPending := r.outstanding.Len() + if numPending >= r.maxOutstanding { + return nil + } + + numToSend := math.Min( + r.maxOutstanding-numPending, + r.pendingSend.Len(), + ) + nodeIDs := set.NewSet[ids.NodeID](numToSend) + for i := 0; i < numToSend; i++ { + nodeID, _ := r.pendingSend.Pop() + nodeIDs.Add(nodeID) + } + r.outstanding.Union(nodeIDs) + return nodeIDs +} + +func (r *requests) recordResponse(nodeID ids.NodeID) bool { + wasOutstanding := r.outstanding.Contains(nodeID) + r.outstanding.Remove(nodeID) + return wasOutstanding +} + +func (r *requests) finished() bool { + return r.pendingSend.Len() == 0 && r.outstanding.Len() == 0 +} diff --git a/snow/consensus/snowman/bootstrapper/sampler.go b/snow/consensus/snowman/bootstrapper/sampler.go new file mode 100644 index 000000000000..9511a1e4243f --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/sampler.go @@ -0,0 +1,49 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/sampler" + "github.com/ava-labs/avalanchego/utils/set" +) + +// Sample keys from [elements] uniformly by weight without replacement. The +// returned set will have size less than or equal to [maxSize]. This function +// will error if the sum of all weights overflows. +func Sample[T comparable](elements map[T]uint64, maxSize int) (set.Set[T], error) { + var ( + keys = make([]T, len(elements)) + weights = make([]uint64, len(elements)) + totalWeight uint64 + err error + ) + i := 0 + for key, weight := range elements { + keys[i] = key + weights[i] = weight + totalWeight, err = math.Add64(totalWeight, weight) + if err != nil { + return nil, err + } + i++ + } + + sampler := sampler.NewWeightedWithoutReplacement() + if err := sampler.Initialize(weights); err != nil { + return nil, err + } + + maxSize = int(math.Min(uint64(maxSize), totalWeight)) + indices, err := sampler.Sample(maxSize) + if err != nil { + return nil, err + } + + sampledElements := set.NewSet[T](maxSize) + for _, index := range indices { + sampledElements.Add(keys[index]) + } + return sampledElements, nil +} diff --git a/snow/consensus/snowman/bootstrapper/sampler_test.go b/snow/consensus/snowman/bootstrapper/sampler_test.go new file mode 100644 index 000000000000..1b9e366decc7 --- /dev/null +++ b/snow/consensus/snowman/bootstrapper/sampler_test.go @@ -0,0 +1,75 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package bootstrapper + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/set" + + safemath "github.com/ava-labs/avalanchego/utils/math" +) + +func TestSample(t *testing.T) { + tests := []struct { + name string + elements map[ids.NodeID]uint64 + maxSize int + expectedSampled set.Set[ids.NodeID] + expectedErr error + }{ + { + name: "sample everything", + elements: map[ids.NodeID]uint64{ + nodeID0: 1, + nodeID1: 1, + }, + maxSize: 2, + expectedSampled: set.Of(nodeID0, nodeID1), + expectedErr: nil, + }, + { + name: "limit sample due to too few elements", + elements: map[ids.NodeID]uint64{ + nodeID0: 1, + }, + maxSize: 2, + expectedSampled: set.Of(nodeID0), + expectedErr: nil, + }, + { + name: "limit sample", + elements: map[ids.NodeID]uint64{ + nodeID0: math.MaxUint64 - 1, + nodeID1: 1, + }, + maxSize: 1, + expectedSampled: set.Of(nodeID0), + expectedErr: nil, + }, + { + name: "overflow", + elements: map[ids.NodeID]uint64{ + nodeID0: math.MaxUint64, + nodeID1: 1, + }, + maxSize: 1, + expectedSampled: nil, + expectedErr: safemath.ErrOverflow, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + sampled, err := Sample(test.elements, test.maxSize) + require.ErrorIs(err, test.expectedErr) + require.Equal(test.expectedSampled, sampled) + }) + } +} diff --git a/snow/engine/common/bootstrapper.go b/snow/engine/common/bootstrapper.go index c567db5ee2d0..0455a46cc203 100644 --- a/snow/engine/common/bootstrapper.go +++ b/snow/engine/common/bootstrapper.go @@ -5,16 +5,13 @@ package common import ( "context" - "fmt" - "math" "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/set" - safemath "github.com/ava-labs/avalanchego/utils/math" + smbootstrapper "github.com/ava-labs/avalanchego/snow/consensus/snowman/bootstrapper" ) const ( @@ -46,30 +43,8 @@ type bootstrapper struct { Config Halter - // Holds the beacons that were sampled for the accepted frontier - sampledBeacons validators.Manager - // IDs of validators we should request an accepted frontier from - pendingSendAcceptedFrontier set.Set[ids.NodeID] - // IDs of validators we requested an accepted frontier from but haven't - // received a reply yet - pendingReceiveAcceptedFrontier set.Set[ids.NodeID] - // IDs of validators that failed to respond with their accepted frontier - failedAcceptedFrontier set.Set[ids.NodeID] - // IDs of all the returned accepted frontiers - acceptedFrontierSet set.Set[ids.ID] - - // IDs of validators we should request filtering the accepted frontier from - pendingSendAccepted set.Set[ids.NodeID] - // IDs of validators we requested filtering the accepted frontier from but - // haven't received a reply yet - pendingReceiveAccepted set.Set[ids.NodeID] - // IDs of validators that failed to respond with their filtered accepted - // frontier - failedAccepted set.Set[ids.NodeID] - // IDs of the returned accepted containers and the stake weight that has - // marked them as accepted - acceptedVotes map[ids.ID]uint64 - acceptedFrontier []ids.ID + minority smbootstrapper.Poll + majority smbootstrapper.Poll // number of times the bootstrap has been attempted bootstrapAttempts int @@ -77,12 +52,13 @@ type bootstrapper struct { func NewCommonBootstrapper(config Config) Bootstrapper { return &bootstrapper{ - Config: config, + Config: config, + minority: smbootstrapper.Noop, + majority: smbootstrapper.Noop, } } func (b *bootstrapper) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error { - // ignores any late responses if requestID != b.Config.SharedCfg.RequestID { b.Ctx.Log.Debug("received out-of-sync AcceptedFrontier message", zap.Stringer("nodeID", nodeID), @@ -92,21 +68,13 @@ func (b *bootstrapper) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, return nil } - if !b.pendingReceiveAcceptedFrontier.Contains(nodeID) { - b.Ctx.Log.Debug("received unexpected AcceptedFrontier message", - zap.Stringer("nodeID", nodeID), - ) - return nil + if err := b.minority.RecordOpinion(ctx, nodeID, set.Of(containerID)); err != nil { + return err } - - // Union the reported accepted frontier from [nodeID] with the accepted - // frontier we got from others - b.acceptedFrontierSet.Add(containerID) - return b.markAcceptedFrontierReceived(ctx, nodeID) + return b.sendMessagesOrFinish(ctx) } func (b *bootstrapper) GetAcceptedFrontierFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { - // ignores any late responses if requestID != b.Config.SharedCfg.RequestID { b.Ctx.Log.Debug("received out-of-sync GetAcceptedFrontierFailed message", zap.Stringer("nodeID", nodeID), @@ -116,76 +84,13 @@ func (b *bootstrapper) GetAcceptedFrontierFailed(ctx context.Context, nodeID ids return nil } - if !b.pendingReceiveAcceptedFrontier.Contains(nodeID) { - b.Ctx.Log.Debug("received unexpected GetAcceptedFrontierFailed message", - zap.Stringer("nodeID", nodeID), - ) - return nil - } - - // If we can't get a response from [nodeID], act as though they said their - // accepted frontier is empty and we add the validator to the failed list - b.failedAcceptedFrontier.Add(nodeID) - return b.markAcceptedFrontierReceived(ctx, nodeID) -} - -func (b *bootstrapper) markAcceptedFrontierReceived(ctx context.Context, nodeID ids.NodeID) error { - // Mark that we received a response from [nodeID] - b.pendingReceiveAcceptedFrontier.Remove(nodeID) - - b.sendGetAcceptedFrontiers(ctx) - - // still waiting on requests - if b.pendingReceiveAcceptedFrontier.Len() != 0 { - return nil - } - - // We've received the accepted frontier from every bootstrap validator - // Ask each bootstrap validator to filter the list of containers that we were - // told are on the accepted frontier such that the list only contains containers - // they think are accepted. - totalSampledWeight, err := b.sampledBeacons.TotalWeight(b.Ctx.SubnetID) - if err != nil { - return fmt.Errorf("failed to get total weight of sampled beacons for subnet %s: %w", b.Ctx.SubnetID, err) - } - beaconsTotalWeight, err := b.Beacons.TotalWeight(b.Ctx.SubnetID) - if err != nil { - return fmt.Errorf("failed to get total weight of beacons for subnet %s: %w", b.Ctx.SubnetID, err) - } - newAlpha := float64(totalSampledWeight*b.Alpha) / float64(beaconsTotalWeight) - - failedBeaconWeight, err := b.Beacons.SubsetWeight(b.Ctx.SubnetID, b.failedAcceptedFrontier) - if err != nil { - return fmt.Errorf("failed to get total weight of failed beacons: %w", err) - } - - // fail the bootstrap if the weight is not enough to bootstrap - if float64(totalSampledWeight)-newAlpha < float64(failedBeaconWeight) { - if b.Config.RetryBootstrap { - b.Ctx.Log.Debug("restarting bootstrap", - zap.String("reason", "not enough frontiers received"), - zap.Int("numBeacons", b.Beacons.Count(b.Ctx.SubnetID)), - zap.Int("numFailedBootstrappers", b.failedAcceptedFrontier.Len()), - zap.Int("numBootstrapAttemps", b.bootstrapAttempts), - ) - return b.Restart(ctx, false) - } - - b.Ctx.Log.Debug("didn't receive enough frontiers", - zap.Int("numFailedValidators", b.failedAcceptedFrontier.Len()), - zap.Int("numBootstrapAttempts", b.bootstrapAttempts), - ) + if err := b.minority.RecordOpinion(ctx, nodeID, nil); err != nil { + return err } - - b.Config.SharedCfg.RequestID++ - b.acceptedFrontier = b.acceptedFrontierSet.List() - - b.sendGetAccepted(ctx) - return nil + return b.sendMessagesOrFinish(ctx) } func (b *bootstrapper) Accepted(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerIDs set.Set[ids.ID]) error { - // ignores any late responses if requestID != b.Config.SharedCfg.RequestID { b.Ctx.Log.Debug("received out-of-sync Accepted message", zap.Stringer("nodeID", nodeID), @@ -195,90 +100,13 @@ func (b *bootstrapper) Accepted(ctx context.Context, nodeID ids.NodeID, requestI return nil } - if !b.pendingReceiveAccepted.Contains(nodeID) { - b.Ctx.Log.Debug("received unexpected Accepted message", - zap.Stringer("nodeID", nodeID), - ) - return nil - } - // Mark that we received a response from [nodeID] - b.pendingReceiveAccepted.Remove(nodeID) - - weight := b.Beacons.GetWeight(b.Ctx.SubnetID, nodeID) - for containerID := range containerIDs { - previousWeight := b.acceptedVotes[containerID] - newWeight, err := safemath.Add64(weight, previousWeight) - if err != nil { - b.Ctx.Log.Error("failed calculating the Accepted votes", - zap.Uint64("weight", weight), - zap.Uint64("previousWeight", previousWeight), - zap.Error(err), - ) - newWeight = math.MaxUint64 - } - b.acceptedVotes[containerID] = newWeight - } - - b.sendGetAccepted(ctx) - - // wait on pending responses - if b.pendingReceiveAccepted.Len() != 0 { - return nil - } - - // We've received the filtered accepted frontier from every bootstrap validator - // Accept all containers that have a sufficient weight behind them - accepted := make([]ids.ID, 0, len(b.acceptedVotes)) - for containerID, weight := range b.acceptedVotes { - if weight >= b.Alpha { - accepted = append(accepted, containerID) - } - } - - // if we don't have enough weight for the bootstrap to be accepted then - // retry or fail the bootstrap - size := len(accepted) - if size == 0 && b.Beacons.Count(b.Ctx.SubnetID) > 0 { - // if we had too many timeouts when asking for validator votes, we - // should restart bootstrap hoping for the network problems to go away; - // otherwise, we received enough (>= b.Alpha) responses, but no frontier - // was supported by a majority of validators (i.e. votes are split - // between minorities supporting different frontiers). - beaconTotalWeight, err := b.Beacons.TotalWeight(b.Ctx.SubnetID) - if err != nil { - return fmt.Errorf("failed to get total weight of beacons for subnet %s: %w", b.Ctx.SubnetID, err) - } - failedBeaconWeight, err := b.Beacons.SubsetWeight(b.Ctx.SubnetID, b.failedAccepted) - if err != nil { - return fmt.Errorf("failed to get total weight of failed beacons for subnet %s: %w", b.Ctx.SubnetID, err) - } - votingStakes := beaconTotalWeight - failedBeaconWeight - if b.Config.RetryBootstrap && votingStakes < b.Alpha { - b.Ctx.Log.Debug("restarting bootstrap", - zap.String("reason", "not enough votes received"), - zap.Int("numBeacons", b.Beacons.Count(b.Ctx.SubnetID)), - zap.Int("numFailedBootstrappers", b.failedAccepted.Len()), - zap.Int("numBootstrapAttempts", b.bootstrapAttempts), - ) - return b.Restart(ctx, false) - } - } - - if !b.Config.SharedCfg.Restarted { - b.Ctx.Log.Info("bootstrapping started syncing", - zap.Int("numVerticesInFrontier", size), - ) - } else { - b.Ctx.Log.Debug("bootstrapping started syncing", - zap.Int("numVerticesInFrontier", size), - ) + if err := b.majority.RecordOpinion(ctx, nodeID, containerIDs); err != nil { + return err } - - return b.Bootstrapable.ForceAccepted(ctx, accepted) + return b.sendMessagesOrFinish(ctx) } func (b *bootstrapper) GetAcceptedFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { - // ignores any late responses if requestID != b.Config.SharedCfg.RequestID { b.Ctx.Log.Debug("received out-of-sync GetAcceptedFailed message", zap.Stringer("nodeID", nodeID), @@ -288,58 +116,50 @@ func (b *bootstrapper) GetAcceptedFailed(ctx context.Context, nodeID ids.NodeID, return nil } - // If we can't get a response from [nodeID], act as though they said that - // they think none of the containers we sent them in GetAccepted are - // accepted - b.failedAccepted.Add(nodeID) - return b.Accepted(ctx, nodeID, requestID, nil) + if err := b.majority.RecordOpinion(ctx, nodeID, nil); err != nil { + return err + } + return b.sendMessagesOrFinish(ctx) } func (b *bootstrapper) Startup(ctx context.Context) error { - beaconIDs, err := b.Beacons.Sample(b.Ctx.SubnetID, b.Config.SampleK) - if err != nil { - return err + currentBeacons := b.Beacons.GetMap(b.Ctx.SubnetID) + nodeWeights := make(map[ids.NodeID]uint64, len(currentBeacons)) + for nodeID, beacon := range currentBeacons { + nodeWeights[nodeID] = beacon.Weight } - b.sampledBeacons = validators.NewManager() - b.pendingSendAcceptedFrontier.Clear() - for _, nodeID := range beaconIDs { - if _, ok := b.sampledBeacons.GetValidator(b.Ctx.SubnetID, nodeID); !ok { - // Invariant: We never use the TxID or BLS keys populated here. - err = b.sampledBeacons.AddStaker(b.Ctx.SubnetID, nodeID, nil, ids.Empty, 1) - } else { - err = b.sampledBeacons.AddWeight(b.Ctx.SubnetID, nodeID, 1) - } - if err != nil { - return err - } - b.pendingSendAcceptedFrontier.Add(nodeID) + frontierNodes, err := smbootstrapper.Sample(nodeWeights, b.SampleK) + if err != nil { + return err } - b.pendingReceiveAcceptedFrontier.Clear() - b.failedAcceptedFrontier.Clear() - b.acceptedFrontierSet.Clear() + b.Ctx.Log.Debug("sampled nodes to seed bootstrapping frontier", + zap.Reflect("sampledNodes", frontierNodes), + zap.Int("numNodes", len(nodeWeights)), + ) - b.pendingSendAccepted.Clear() - for _, nodeID := range b.Beacons.GetValidatorIDs(b.Ctx.SubnetID) { - b.pendingSendAccepted.Add(nodeID) - } - - b.pendingReceiveAccepted.Clear() - b.failedAccepted.Clear() - b.acceptedVotes = make(map[ids.ID]uint64) + b.minority = smbootstrapper.NewMinority( + b.Ctx.Log, + frontierNodes, + MaxOutstandingBroadcastRequests, + ) + b.majority = smbootstrapper.NewMajority( + b.Ctx.Log, + nodeWeights, + MaxOutstandingBroadcastRequests, + ) b.bootstrapAttempts++ - if b.pendingSendAcceptedFrontier.Len() == 0 { + if accepted, finalized := b.majority.Result(ctx); finalized { b.Ctx.Log.Info("bootstrapping skipped", zap.String("reason", "no provided bootstraps"), ) - return b.Bootstrapable.ForceAccepted(ctx, nil) + return b.Bootstrapable.ForceAccepted(ctx, accepted) } b.Config.SharedCfg.RequestID++ - b.sendGetAcceptedFrontiers(ctx) - return nil + return b.sendMessagesOrFinish(ctx) } func (b *bootstrapper) Restart(ctx context.Context, reset bool) error { @@ -361,40 +181,50 @@ func (b *bootstrapper) Restart(ctx context.Context, reset bool) error { return b.Startup(ctx) } -// Ask up to [MaxOutstandingBroadcastRequests] bootstrap validators to send -// their accepted frontier with the current accepted frontier -func (b *bootstrapper) sendGetAcceptedFrontiers(ctx context.Context) { - vdrs := set.NewSet[ids.NodeID](1) - for b.pendingSendAcceptedFrontier.Len() > 0 && b.pendingReceiveAcceptedFrontier.Len() < MaxOutstandingBroadcastRequests { - vdr, _ := b.pendingSendAcceptedFrontier.Pop() - // Add the validator to the set to send the messages to - vdrs.Add(vdr) - // Add the validator to send pending receipt set - b.pendingReceiveAcceptedFrontier.Add(vdr) +func (b *bootstrapper) sendMessagesOrFinish(ctx context.Context) error { + if peers := b.minority.GetPeers(ctx); peers.Len() > 0 { + b.Sender.SendGetAcceptedFrontier(ctx, peers, b.Config.SharedCfg.RequestID) + return nil } - if vdrs.Len() > 0 { - b.Sender.SendGetAcceptedFrontier(ctx, vdrs, b.Config.SharedCfg.RequestID) + potentialAccepted, finalized := b.minority.Result(ctx) + if !finalized { + // We haven't finalized the accepted frontier, so we should wait for the + // outstanding requests. + return nil + } + + if peers := b.majority.GetPeers(ctx); peers.Len() > 0 { + b.Sender.SendGetAccepted(ctx, peers, b.Config.SharedCfg.RequestID, potentialAccepted) + return nil + } + + accepted, finalized := b.majority.Result(ctx) + if !finalized { + // We haven't finalized the accepted set, so we should wait for the + // outstanding requests. + return nil } -} -// Ask up to [MaxOutstandingBroadcastRequests] bootstrap validators to send -// their filtered accepted frontier -func (b *bootstrapper) sendGetAccepted(ctx context.Context) { - vdrs := set.NewSet[ids.NodeID](1) - for b.pendingSendAccepted.Len() > 0 && b.pendingReceiveAccepted.Len() < MaxOutstandingBroadcastRequests { - vdr, _ := b.pendingSendAccepted.Pop() - // Add the validator to the set to send the messages to - vdrs.Add(vdr) - // Add the validator to send pending receipt set - b.pendingReceiveAccepted.Add(vdr) + numAccepted := len(accepted) + if numAccepted == 0 { + b.Ctx.Log.Debug("restarting bootstrap", + zap.String("reason", "no blocks accepted"), + zap.Int("numBeacons", b.Beacons.Count(b.Ctx.SubnetID)), + zap.Int("numBootstrapAttempts", b.bootstrapAttempts), + ) + return b.Restart(ctx, false /*=reset*/) } - if vdrs.Len() > 0 { - b.Ctx.Log.Debug("sent GetAccepted messages", - zap.Int("numSent", vdrs.Len()), - zap.Int("numPending", b.pendingSendAccepted.Len()), + if !b.Config.SharedCfg.Restarted { + b.Ctx.Log.Info("bootstrapping started syncing", + zap.Int("numAccepted", numAccepted), + ) + } else { + b.Ctx.Log.Debug("bootstrapping started syncing", + zap.Int("numAccepted", numAccepted), ) - b.Sender.SendGetAccepted(ctx, vdrs, b.Config.SharedCfg.RequestID, b.acceptedFrontier) } + + return b.Bootstrapable.ForceAccepted(ctx, accepted) }