Skip to content

Commit

Permalink
Replace periodic push accepted gossip with pull preference gossip for…
Browse files Browse the repository at this point in the history
… block discovery (#2367)
  • Loading branch information
StephenButtolph authored Nov 29, 2023
1 parent 1dddf30 commit 56c2ad9
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 46 deletions.
8 changes: 4 additions & 4 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ type ManagerConfig struct {
MeterVMEnabled bool // Should each VM be wrapped with a MeterVM
Metrics metrics.MultiGatherer

AcceptedFrontierGossipFrequency time.Duration
ConsensusAppConcurrency int
FrontierPollFrequency time.Duration
ConsensusAppConcurrency int

// Max Time to spend fetching a container and its
// ancestors when responding to a GetAncestors
Expand Down Expand Up @@ -824,7 +824,7 @@ func (m *manager) createAvalancheChain(
ctx,
vdrs,
msgChan,
m.AcceptedFrontierGossipFrequency,
m.FrontierPollFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
validators.UnhandledSubnetConnector, // avalanche chains don't use subnet connector
Expand Down Expand Up @@ -1166,7 +1166,7 @@ func (m *manager) createSnowmanChain(
ctx,
vdrs,
msgChan,
m.AcceptedFrontierGossipFrequency,
m.FrontierPollFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
subnetConnector,
Expand Down
17 changes: 12 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ const (
subnetConfigFileExt = ".json"
ipResolutionTimeout = 30 * time.Second

ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
acceptedFrontierGossipDeprecationMsg = "push-based accepted frontier gossip is deprecated"
)

var (
Expand All @@ -72,6 +73,12 @@ var (
IpcsChainIDsKey: ipcDeprecationMsg,
IpcsPathKey: ipcDeprecationMsg,
KeystoreAPIEnabledKey: keystoreDeprecationMsg,
ConsensusGossipAcceptedFrontierValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierPeerSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipOnAcceptValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipOnAcceptNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipOnAcceptPeerSizeKey: acceptedFrontierGossipDeprecationMsg,
}

errSybilProtectionDisabledStakerWeights = errors.New("sybil protection disabled weights must be positive")
Expand Down Expand Up @@ -1320,9 +1327,9 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
}

// Gossiping
nodeConfig.AcceptedFrontierGossipFrequency = v.GetDuration(ConsensusAcceptedFrontierGossipFrequencyKey)
if nodeConfig.AcceptedFrontierGossipFrequency < 0 {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusAcceptedFrontierGossipFrequencyKey)
nodeConfig.FrontierPollFrequency = v.GetDuration(ConsensusFrontierPollFrequencyKey)
if nodeConfig.FrontierPollFrequency < 0 {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusFrontierPollFrequencyKey)
}

// App handling
Expand Down
2 changes: 1 addition & 1 deletion config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ func addNodeFlags(fs *pflag.FlagSet) {
fs.Duration(BenchlistMinFailingDurationKey, constants.DefaultBenchlistMinFailingDuration, "Minimum amount of time messages to a peer must be failing before the peer is benched")

// Router
fs.Duration(ConsensusAcceptedFrontierGossipFrequencyKey, constants.DefaultAcceptedFrontierGossipFrequency, "Frequency of gossiping accepted frontiers")
fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain")
fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain")
fs.Duration(ConsensusFrontierPollFrequencyKey, constants.DefaultFrontierPollFrequency, "Frequency of polling for new consensus frontiers")
fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierPeerSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPeerSize, "Number of peers to gossip to when gossiping accepted frontier")
Expand Down
4 changes: 2 additions & 2 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ const (
IpcsChainIDsKey = "ipcs-chain-ids"
IpcsPathKey = "ipcs-path"
MeterVMsEnabledKey = "meter-vms-enabled"
ConsensusAcceptedFrontierGossipFrequencyKey = "consensus-accepted-frontier-gossip-frequency"
ConsensusAppConcurrencyKey = "consensus-app-concurrency"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ConsensusFrontierPollFrequencyKey = "consensus-frontier-poll-frequency"
ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size"
ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size"
ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size"
Expand All @@ -154,7 +155,6 @@ const (
AppGossipValidatorSizeKey = "consensus-app-gossip-validator-size"
AppGossipNonValidatorSizeKey = "consensus-app-gossip-non-validator-size"
AppGossipPeerSizeKey = "consensus-app-gossip-peer-size"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ProposerVMUseCurrentHeightKey = "proposervm-use-current-height"
FdLimitKey = "fd-limit"
IndexEnabledKey = "index-enabled"
Expand Down
4 changes: 2 additions & 2 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ type Config struct {
ConsensusRouter router.Router `json:"-"`
RouterHealthConfig router.HealthConfig `json:"routerHealthConfig"`
ConsensusShutdownTimeout time.Duration `json:"consensusShutdownTimeout"`
// Gossip a container in the accepted frontier every [AcceptedFrontierGossipFrequency]
AcceptedFrontierGossipFrequency time.Duration `json:"consensusGossipFreq"`
// Poll for new frontiers every [FrontierPollFrequency]
FrontierPollFrequency time.Duration `json:"consensusGossipFreq"`
// ConsensusAppConcurrency defines the maximum number of goroutines to
// handle App messages per chain.
ConsensusAppConcurrency int `json:"consensusAppConcurrency"`
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error {
Metrics: n.MetricsGatherer,
SubnetConfigs: n.Config.SubnetConfigs,
ChainConfigs: n.Config.ChainConfigs,
AcceptedFrontierGossipFrequency: n.Config.AcceptedFrontierGossipFrequency,
FrontierPollFrequency: n.Config.FrontierPollFrequency,
ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency,
BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors,
BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent,
Expand Down
4 changes: 4 additions & 0 deletions node/overridden_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (o *overriddenManager) Sample(_ ids.ID, size int) ([]ids.NodeID, error) {
return o.manager.Sample(o.subnetID, size)
}

func (o *overriddenManager) UniformSample(_ ids.ID, size int) ([]ids.NodeID, error) {
return o.manager.UniformSample(o.subnetID, size)
}

func (o *overriddenManager) GetMap(ids.ID) map[ids.NodeID]*validators.GetValidatorOutput {
return o.manager.GetMap(o.subnetID)
}
Expand Down
97 changes: 74 additions & 23 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ import (
"github.com/ava-labs/avalanchego/utils/wrappers"
)

const nonVerifiedCacheSize = 64 * units.MiB
const (
nonVerifiedCacheSize = 64 * units.MiB

// putGossipPeriod specifies the number of times Gossip will be called per
// Put gossip. This is done to avoid splitting Gossip into multiple
// functions and to allow more frequent pull gossip than push gossip.
putGossipPeriod = 10
)

var _ Engine = (*Transitive)(nil)

Expand Down Expand Up @@ -63,6 +70,8 @@ type Transitive struct {

requestID uint32

gossipCounter int

// track outstanding preference requests
polls poll.Set

Expand Down Expand Up @@ -151,6 +160,69 @@ func newTransitive(config Config) (*Transitive, error) {
return t, t.metrics.Initialize("", config.Ctx.Registerer)
}

func (t *Transitive) Gossip(ctx context.Context) error {
lastAcceptedID, lastAcceptedHeight := t.Consensus.LastAccepted()
if numProcessing := t.Consensus.NumProcessing(); numProcessing == 0 {
t.Ctx.Log.Verbo("sampling from validators",
zap.Stringer("validators", t.Validators),
)

// Uniform sampling is used here to reduce bandwidth requirements of
// nodes with a large amount of stake weight.
vdrIDs, err := t.Validators.UniformSample(t.Ctx.SubnetID, 1)
if err != nil {
t.Ctx.Log.Error("skipping block gossip",
zap.String("reason", "no validators"),
zap.Error(err),
)
return nil
}

nextHeightToAccept, err := math.Add64(lastAcceptedHeight, 1)
if err != nil {
t.Ctx.Log.Error("skipping block gossip",
zap.String("reason", "block height overflow"),
zap.Stringer("blkID", lastAcceptedID),
zap.Uint64("lastAcceptedHeight", lastAcceptedHeight),
zap.Error(err),
)
return nil
}

t.requestID++
vdrSet := set.Of(vdrIDs...)
preferredID := t.Consensus.Preference()
t.Sender.SendPullQuery(ctx, vdrSet, t.requestID, preferredID, nextHeightToAccept)
} else {
t.Ctx.Log.Debug("skipping block gossip",
zap.String("reason", "blocks currently processing"),
zap.Int("numProcessing", numProcessing),
)
}

// TODO: Remove periodic push gossip after v1.11.x is activated
t.gossipCounter++
t.gossipCounter %= putGossipPeriod
if t.gossipCounter > 0 {
return nil
}

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", lastAcceptedID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", lastAcceptedID),
)
t.Sender.SendGossip(ctx, lastAccepted.Bytes())
return nil
}

func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkBytes []byte) error {
blk, err := t.VM.ParseBlock(ctx, blkBytes)
if err != nil {
Expand Down Expand Up @@ -383,28 +455,6 @@ func (*Transitive) Timeout(context.Context) error {
return nil
}

func (t *Transitive) Gossip(ctx context.Context) error {
blkID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
}

blk, err := t.GetBlock(ctx, blkID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", blkID),
)
t.Sender.SendGossip(ctx, blk.Bytes())
return nil
}

func (*Transitive) Halt(context.Context) {}

func (t *Transitive) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -873,6 +923,7 @@ func (t *Transitive) sendQuery(
t.Ctx.Log.Error("dropped query for block",
zap.String("reason", "insufficient number of validators"),
zap.Stringer("blkID", blkID),
zap.Int("size", t.Params.K),
)
return
}
Expand Down
12 changes: 6 additions & 6 deletions snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) {
func TestEngineGossip(t *testing.T) {
require := require.New(t)

_, _, sender, vm, te, gBlk := setupDefaultConfig(t)
nodeID, _, sender, vm, te, gBlk := setupDefaultConfig(t)

vm.LastAcceptedF = func(context.Context) (ids.ID, error) {
return gBlk.ID(), nil
Expand All @@ -1392,15 +1392,15 @@ func TestEngineGossip(t *testing.T) {
return gBlk, nil
}

called := new(bool)
sender.SendGossipF = func(_ context.Context, blkBytes []byte) {
*called = true
require.Equal(gBlk.Bytes(), blkBytes)
var calledSendPullQuery bool
sender.SendPullQueryF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], _ uint32, _ ids.ID, _ uint64) {
calledSendPullQuery = true
require.Equal(set.Of(nodeID), nodeIDs)
}

require.NoError(te.Gossip(context.Background()))

require.True(*called)
require.True(calledSendPullQuery)
}

func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions snow/validators/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type Manager interface {
// If sampling the requested size isn't possible, an error will be returned.
Sample(subnetID ids.ID, size int) ([]ids.NodeID, error)

// UniformSample returns a collection of validatorIDs in the subnet.
// If sampling the requested size isn't possible, an error will be returned.
UniformSample(subnetID ids.ID, size int) ([]ids.NodeID, error)

// Map of the validators in this subnet
GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput

Expand Down Expand Up @@ -253,6 +257,21 @@ func (m *manager) Sample(subnetID ids.ID, size int) ([]ids.NodeID, error) {
return set.Sample(size)
}

func (m *manager) UniformSample(subnetID ids.ID, size int) ([]ids.NodeID, error) {
if size == 0 {
return nil, nil
}

m.lock.RLock()
set, exists := m.subnetToVdrs[subnetID]
m.lock.RUnlock()
if !exists {
return nil, ErrMissingValidators
}

return set.UniformSample(size)
}

func (m *manager) GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput {
m.lock.RLock()
set, exists := m.subnetToVdrs[subnetID]
Expand Down
23 changes: 23 additions & 0 deletions snow/validators/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ func (s *vdrSet) Sample(size int) ([]ids.NodeID, error) {
return s.sample(size)
}

func (s *vdrSet) UniformSample(size int) ([]ids.NodeID, error) {
s.lock.RLock()
defer s.lock.RUnlock()

return s.uniformSample(size)
}

func (s *vdrSet) sample(size int) ([]ids.NodeID, error) {
if !s.samplerInitialized {
if err := s.sampler.Initialize(s.weights); err != nil {
Expand All @@ -263,6 +270,22 @@ func (s *vdrSet) sample(size int) ([]ids.NodeID, error) {
return list, nil
}

func (s *vdrSet) uniformSample(size int) ([]ids.NodeID, error) {
uniform := sampler.NewUniform()
uniform.Initialize(uint64(len(s.vdrSlice)))

indices, err := uniform.Sample(size)
if err != nil {
return nil, err
}

list := make([]ids.NodeID, size)
for i, index := range indices {
list[i] = s.vdrSlice[index].NodeID
}
return list, nil
}

func (s *vdrSet) TotalWeight() (uint64, error) {
s.lock.RLock()
defer s.lock.RUnlock()
Expand Down
4 changes: 2 additions & 2 deletions utils/constants/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ const (
DefaultBenchlistMinFailingDuration = 2*time.Minute + 30*time.Second

// Router
DefaultAcceptedFrontierGossipFrequency = 10 * time.Second
DefaultConsensusAppConcurrency = 2
DefaultConsensusShutdownTimeout = time.Minute
DefaultFrontierPollFrequency = 100 * time.Millisecond
DefaultConsensusGossipAcceptedFrontierValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierPeerSize = 15
DefaultConsensusGossipAcceptedFrontierPeerSize = 1
DefaultConsensusGossipOnAcceptValidatorSize = 0
DefaultConsensusGossipOnAcceptNonValidatorSize = 0
DefaultConsensusGossipOnAcceptPeerSize = 10
Expand Down

0 comments on commit 56c2ad9

Please sign in to comment.