Skip to content

Commit

Permalink
chore: use constants for the incoming queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Jan 23, 2025
1 parent ff8ea40 commit 205babe
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 5 deletions.
5 changes: 4 additions & 1 deletion blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1

// ReactorIncomingQueueSize the size of the reactor's message queue.
ReactorIncomingQueueSize = 10
)

type consensusReactor interface {
Expand Down Expand Up @@ -92,7 +95,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
requestsCh: requestsCh,
errorsCh: errorsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, p2p.WithIncomingQueueSize(10))
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, p2p.WithIncomingQueueSize(ReactorIncomingQueueSize))
return bcR
}

Expand Down
5 changes: 4 additions & 1 deletion blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (

// ask for best height every 10s
statusUpdateIntervalSeconds = 10

// ReactorIncomingQueueSize the size of the reactor's message queue.
ReactorIncomingQueueSize = 10
)

var (
Expand Down Expand Up @@ -100,7 +103,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
}
fsm := NewFSM(startHeight, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, p2p.WithIncomingQueueSize(10))
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, p2p.WithIncomingQueueSize(ReactorIncomingQueueSize))
// bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)

return bcR
Expand Down
5 changes: 4 additions & 1 deletion consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (

blocksToContributeToBecomeGoodPeer = 10000
votesToContributeToBecomeGoodPeer = 10000

// ReactorIncomingQueueSize the size of the reactor's message queue.
ReactorIncomingQueueSize = 1000
)

//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -65,7 +68,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
Metrics: NopMetrics(),
traceClient: trace.NoOpTracer(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR, p2p.WithIncomingQueueSize(1000))
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR, p2p.WithIncomingQueueSize(ReactorIncomingQueueSize))

for _, option := range options {
option(conR)
Expand Down
5 changes: 4 additions & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
broadcastEvidenceIntervalS = 10
// If a message fails wait this much before sending it again
peerRetryMessageIntervalMS = 100

// ReactorIncomingQueueSize the size of the reactor's message queue.
ReactorIncomingQueueSize = 1
)

// Reactor handles evpool evidence broadcasting amongst peers.
Expand All @@ -38,7 +41,7 @@ func NewReactor(evpool *Pool) *Reactor {
evR := &Reactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR, p2p.WithIncomingQueueSize(1))
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR, p2p.WithIncomingQueueSize(ReactorIncomingQueueSize))
return evR
}

Expand Down
5 changes: 4 additions & 1 deletion statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
ChunkChannel = byte(0x61)
// recentSnapshots is the number of recent snapshots to send and receive per peer.
recentSnapshots = 10

// ReactorIncomingQueueSize the size of the reactor's message queue.
ReactorIncomingQueueSize = 100
)

// Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
Expand Down Expand Up @@ -56,7 +59,7 @@ func NewReactor(
connQuery: connQuery,
}
// TODO set incoming queue sizes to consts per reactor
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r, p2p.WithIncomingQueueSize(100))
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r, p2p.WithIncomingQueueSize(ReactorIncomingQueueSize))

return r
}
Expand Down

0 comments on commit 205babe

Please sign in to comment.