Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

performance: Speculative block assembly into master #4913

Draft
wants to merge 67 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
b827781
plumbing for spec block assembly
yossigi Jul 20, 2022
f44a5e8
with spec ledger code
yossigi Jul 21, 2022
1e1c3ad
block as LFE code
yossigi Jul 22, 2022
cf9eb60
build verified block + spec ledger
yossigi Jul 22, 2022
3021f03
checkpoint: making startevaluator compile
yossigi Jul 22, 2022
7c96473
add new LFE LedgerForEvaluator functionality
yossigi Jul 25, 2022
c54fa48
forward verified txns cache
yossigi Jul 25, 2022
2484101
checkpoint: use spec pool
yossigi Jul 25, 2022
251c021
fixing tests
yossigi Jul 25, 2022
c3d4caa
speculative ledger unittest updates
yossigi Jul 26, 2022
928e818
bug fix: subtract round only as fallback
yossigi Jul 26, 2022
d9403a8
add funcs to read create/read spec blocks
yossigi Jul 26, 2022
847e521
checkpoint: stop assembly at first new block
yossigi Jul 26, 2022
e24af8c
cancel spec asm on any new block (speculative or not)
yossigi Jul 26, 2022
ea87440
plumbing to use spec block
yossigi Jul 27, 2022
3e72c93
get branch from ledger
yossigi Jul 27, 2022
1900f78
release lock when copy fails
yossigi Jul 27, 2022
7513a83
don't flush changes from shallow pool copy to real pool
yossigi Jul 27, 2022
871cad3
Merge branch 'master' into yossi/specblockasm
yossigi Jul 27, 2022
1be9511
add BlockHdrCache method to interface
yossigi Jul 28, 2022
a7e289c
simpler error handling
yossigi Jul 28, 2022
e6dc3dc
checkpoint: plumbing through agreement
yossigi Jul 29, 2022
ee16415
checkpoint: MakeValidatedBlockAsLFE --> MakeBlockAsLFE
yossigi Aug 1, 2022
3687960
return statedelta
yossigi Aug 1, 2022
63c03b9
checkpoint: adding readlowest to agreement
yossigi Aug 2, 2022
633bf49
treat 0 timeout as no timeout
yossigi Aug 2, 2022
57edac8
x
yossigi Aug 2, 2022
7a0fdf7
zero spec timeout
yossigi Aug 2, 2022
2ca5107
txpool interface
yossigi Aug 2, 2022
ef67acf
Merge branch 'master' into yossi/specblockasm
yossigi Aug 2, 2022
28a19a9
demux event test
yossigi Aug 4, 2022
3d84135
fix todos
yossigi Aug 4, 2022
628a656
Merge branch 'master' into yossi/specblockasm
yossigi Aug 5, 2022
aea39ca
check casting
yossigi Aug 5, 2022
e883f2e
bug fix + unit test
yossigi Aug 8, 2022
68c3b77
speculative txn not in block
yossigi Aug 8, 2022
edb6e4d
asm round to next round
yossigi Aug 8, 2022
94b9292
better member name
yossigi Aug 8, 2022
50560d8
check good txns
yossigi Aug 9, 2022
beaf0c6
better test
yossigi Aug 9, 2022
735abb6
reset clears speculative results
yossigi Aug 10, 2022
6b580c6
change the way pendingTxids and pendingTxGroups are copied
cce Aug 17, 2022
ccda8c9
merge with current master
yossigi Aug 18, 2022
b40c0e0
Merge branch 'master' into yossi/specblockasm
yossigi Aug 29, 2022
679cbaf
Merge branch 'master' into yossi/specblockasm
yossigi Aug 29, 2022
4824d26
Merge branch 'master' into yossi/specblockasm
yossigi Aug 31, 2022
5f4a90e
Merge branch 'master' into yossi/specblockasm
yossigi Sep 2, 2022
fbc2718
add test
nicholasguo Sep 15, 2022
6d9bae1
bug fix: return correct round number
yossigi Sep 19, 2022
f34825a
preserve round number response in case of an error
yossigi Sep 19, 2022
3599437
use round from 2 rounds ago
nicholasguo Sep 19, 2022
1d81a56
setting clock for first period after boot
yossigi Sep 20, 2022
cc93338
Merge branch 'yossi/specblockasm' of github.com:algorand/go-algorand …
yossigi Sep 20, 2022
044a72a
optimize
nicholasguo Oct 4, 2022
408277d
Merge branch 'yossi/specblockasm' into nguo/spec-asm-race-test
nicholasguo Oct 6, 2022
40e171b
Merge pull request #4556 from nicholasguoalgorand/nguo/spec-asm-race-…
Oct 6, 2022
3cbfe53
lint
nicholasguo Oct 12, 2022
c5fcc28
Merge remote-tracking branch 'upstream/master' into feature/specblockasm
algorandskiy Dec 15, 2022
4bc1194
fix linter and msgp
algorandskiy Dec 15, 2022
a1d98fb
Merge pull master into feature/specblockasm
algorandskiy Dec 15, 2022
d42feb0
performance: Speculative block assembly (#4861)
brianolson Jan 26, 2023
8f9b54c
Merge remote-tracking branch 'origin/master' into specblockasm
brianolson Jan 26, 2023
657dc20
license fix
brianolson Jan 26, 2023
a8a408c
Merge pull request #5066 from brianolson/specblockasm-master-2ba3f4b704
cce Jan 26, 2023
94aea41
pull from master
yossigi Jul 21, 2023
2076563
versioning
yossigi Jul 27, 2023
7955027
spec asm params
yossigi Aug 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agreement/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type BlockFactory interface {
// nodes on the network can assemble entries, the agreement protocol may
// lose liveness.
AssembleBlock(basics.Round) (ValidatedBlock, error)

StartSpeculativeBlockAssembly(context.Context, ValidatedBlock, crypto.Digest, bool)
}

// A Ledger represents the sequence of Entries agreed upon by the protocol.
Expand Down
20 changes: 14 additions & 6 deletions agreement/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
attest
assemble
repropose
speculativeAssembly
speculativeAssemblyIfStarted

// disk
checkpoint
Expand Down Expand Up @@ -309,13 +311,14 @@ func (a rezeroAction) do(ctx context.Context, s *Service) {
}

type pseudonodeAction struct {
// assemble, repropose, attest
// assemble, repropose, attest, speculativeAssembly, speculativeAssemblyIfStarted
T actionType

Round round
Period period
Step step
Proposal proposalValue
Round round
Period period
Step step
Proposal proposalValue
ValidatedBlock ValidatedBlock
}

func (a pseudonodeAction) t() actionType {
Expand Down Expand Up @@ -346,6 +349,11 @@ func (a pseudonodeAction) do(ctx context.Context, s *Service) {
default:
s.log.Errorf("pseudonode.MakeProposals call failed %v", err)
}
case speculativeAssembly:
s.loopback.StartSpeculativeBlockAssembly(ctx, a.ValidatedBlock, a.Proposal.BlockDigest, false)
case speculativeAssemblyIfStarted:
s.loopback.StartSpeculativeBlockAssembly(ctx, a.ValidatedBlock, a.Proposal.BlockDigest, true)

case repropose:
logEvent := logspec.AgreementEvent{
Type: logspec.VoteAttest,
Expand Down Expand Up @@ -464,7 +472,7 @@ func zeroAction(t actionType) action {
return ensureAction{}
case rezero:
return rezeroAction{}
case attest, assemble, repropose:
case attest, assemble, repropose, speculativeAssembly, speculativeAssemblyIfStarted:
return pseudonodeAction{}
case checkpoint:
return checkpointAction{}
Expand Down
8 changes: 5 additions & 3 deletions agreement/actiontype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions agreement/agreementtest/simulate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc
return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil
}

func (f testBlockFactory) StartSpeculativeBlockAssembly(context.Context, agreement.ValidatedBlock, crypto.Digest, bool) {
}

// If we try to read from high rounds, we panic and do not emit an error to find bugs during testing.
type testLedger struct {
mu deadlock.Mutex
Expand Down
4 changes: 4 additions & 0 deletions agreement/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (ValidatedBlock, error)
return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil
}

func (f testBlockFactory) StartSpeculativeBlockAssembly(context.Context, ValidatedBlock, crypto.Digest, bool) {
return
}

// If we try to read from high rounds, we panic and do not emit an error to find bugs during testing.
type testLedger struct {
mu deadlock.Mutex
Expand Down
12 changes: 9 additions & 3 deletions agreement/coservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package agreement

import (
"github.com/algorand/go-deadlock"
"runtime"
"testing"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-deadlock"
)

//go:generate stringer -type=coserviceType
Expand All @@ -35,10 +36,12 @@ const (
//msgp:ignore coserviceType
type coserviceType int

// coserviceMonitor is unit test instrumentation
type coserviceMonitor struct {
deadlock.Mutex

id int
t *testing.T
c map[coserviceType]uint

coserviceListener
Expand Down Expand Up @@ -79,7 +82,10 @@ func (m *coserviceMonitor) dec(t coserviceType) {
m.c = make(map[coserviceType]uint)
}
if m.c[t] == 0 {
logging.Base().Panicf("%d: tried to decrement empty coservice queue %v", m.id, t)
var stack [1000]byte
sl := runtime.Stack(stack[:], false)
m.t.Log(string(stack[:sl]))
m.t.Fatalf("%d: tried to decrement empty coservice queue %v", m.id, t)
}
m.c[t]--

Expand Down
28 changes: 24 additions & 4 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ type demux struct {

queue []<-chan externalEvent
processingMonitor EventsProcessingMonitor
monitor *coserviceMonitor
cancelTokenizers context.CancelFunc

log logging.Logger

// coserviceMonitor is unit test instrumentation.
// should be fast no-op if monitor == nil
monitor *coserviceMonitor
}

// demuxParams contains the parameters required to initliaze a new demux object
Expand All @@ -74,7 +77,10 @@ type demuxParams struct {
voteVerifier *AsyncVoteVerifier
processingMonitor EventsProcessingMonitor
log logging.Logger
monitor *coserviceMonitor

// coserviceMonitor is unit test instrumentation.
// should be fast no-op if monitor == nil
monitor *coserviceMonitor
}

// makeDemux initializes the goroutines needed to process external events, setting up the appropriate channels.
Expand Down Expand Up @@ -190,7 +196,7 @@ func (d *demux) verifyBundle(ctx context.Context, m message, r round, p period,
// next blocks until it observes an external input event of interest for the state machine.
//
// If ok is false, there are no more events so the agreement service should quit.
func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Duration, currentRound round) (e externalEvent, ok bool) {
func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Duration, speculationDeadline time.Duration, currentRound round) (e externalEvent, ok bool) {
defer func() {
if !ok {
return
Expand Down Expand Up @@ -253,6 +259,14 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
deadlineCh := s.Clock.TimeoutAt(deadline)
fastDeadlineCh := s.Clock.TimeoutAt(fastDeadline)

var speculationDeadlineCh <-chan time.Time
// zero timeout means we don't have enough time to speculate on block assembly
if speculationDeadline != 0 {
speculationDeadlineCh = s.Clock.TimeoutAt(speculationDeadline)
}

//d.log.Infof("demux deadline %d, fastD %d, specD %d, d.monitor %v", deadline, fastDeadline, speculationDeadline, d.monitor) // not threadsafe in some tests

d.UpdateEventsQueue(eventQueueDemux, 0)
d.monitor.dec(demuxCoserviceType)

Expand All @@ -269,7 +283,7 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
// the pseudonode channel got closed. remove it from the queue and try again.
d.queue = d.queue[1:]
d.UpdateEventsQueue(eventQueuePseudonode, 0)
return d.next(s, deadline, fastDeadline, currentRound)
return d.next(s, deadline, fastDeadline, speculationDeadline, currentRound)

// control
case <-s.quit:
Expand Down Expand Up @@ -303,6 +317,11 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
d.UpdateEventsQueue(eventQueueDemux, 1)
d.monitor.inc(demuxCoserviceType)
d.monitor.dec(clockCoserviceType)
case <-speculationDeadlineCh:
e = timeoutEvent{T: speculationTimeout, RandomEntropy: s.RandomSource.Uint64(), Round: nextRound}
d.UpdateEventsQueue(eventQueueDemux, 1)
d.monitor.inc(demuxCoserviceType)
d.monitor.dec(clockCoserviceType)

// raw
case m, open := <-rawVotes:
Expand Down Expand Up @@ -358,6 +377,7 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
}

// setupCompoundMessage processes compound messages: distinct messages which are delivered together
// TODO: does this ever really see something other than empty .Vote?
func setupCompoundMessage(l LedgerReader, m message) (res externalEvent) {
compound := m.CompoundMessage
if compound.Vote == (unauthenticatedVote{}) {
Expand Down
37 changes: 32 additions & 5 deletions agreement/demux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
)

const fastTimeoutChTime = 2
const speculativeBlockAsmTime = time.Duration(5 * time.Microsecond)

type demuxTester struct {
*testing.T
Expand Down Expand Up @@ -65,8 +66,10 @@ type demuxTestUsecase struct {
verifiedProposal testChanState
verifiedBundle testChanState
// expected output
e event
ok bool
e event
ok bool
speculativeAsmTime time.Duration
desc string
}

var demuxTestUsecases = []demuxTestUsecase{
Expand Down Expand Up @@ -195,6 +198,7 @@ var demuxTestUsecases = []demuxTestUsecase{
verifiedBundle: testChanState{eventCount: 0, closed: false},
e: messageEvent{T: votePresent},
ok: true,
desc: "one vote one prop",
},
{
queue: []testChanState{},
Expand Down Expand Up @@ -411,6 +415,26 @@ var demuxTestUsecases = []demuxTestUsecase{
verifiedBundle: testChanState{eventCount: 0, closed: false},
e: messageEvent{T: votePresent},
ok: true,
desc: "one prop",
},
{
queue: []testChanState{{eventCount: 0, closed: false}},
rawVotes: testChanState{eventCount: 0, closed: false},
rawProposals: testChanState{eventCount: 0, closed: false},
rawBundles: testChanState{eventCount: 0, closed: false},
compoundProposals: false,
quit: false,
voteChannelFull: false,
proposalChannelFull: false,
bundleChannelFull: false,
ledgerRoundReached: false,
deadlineReached: false,
verifiedVote: testChanState{eventCount: 0, closed: false},
verifiedProposal: testChanState{eventCount: 0, closed: false},
verifiedBundle: testChanState{eventCount: 0, closed: false},
e: timeoutEvent{T: speculationTimeout},
ok: true,
speculativeAsmTime: speculativeBlockAsmTime,
},
}

Expand All @@ -432,6 +456,9 @@ func (t *demuxTester) TimeoutAt(delta time.Duration) <-chan time.Time {
if delta == fastTimeoutChTime {
return nil
}
if delta == speculativeBlockAsmTime {
return time.After(delta)
}

c := make(chan time.Time, 2)
if t.currentUsecase.deadlineReached {
Expand Down Expand Up @@ -653,6 +680,7 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool {

dmx := &demux{}

dmx.log = logging.TestingLog(t)
dmx.crypto = t
dmx.ledger = t
dmx.rawVotes = t.makeRawChannel(protocol.AgreementVoteTag, testcase.rawVotes, false)
Expand All @@ -674,14 +702,13 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool {
if testcase.quit {
close(s.quit)
}

e, ok := dmx.next(s, time.Second, fastTimeoutChTime, 300)
e, ok := dmx.next(s, time.Second, fastTimeoutChTime, testcase.speculativeAsmTime, 300)

if !assert.Equal(t, testcase.ok, ok) {
return false
}

if !assert.Equalf(t, strings.Replace(testcase.e.String(), "{test_index}", fmt.Sprintf("%d", t.testIdx), 1), e.String(), "Test case %d failed.", t.testIdx+1) {
if !assert.Equalf(t, strings.Replace(testcase.e.String(), "{test_index}", fmt.Sprintf("%d", t.testIdx), 1), e.String(), "Test case %d (%s) failed.", t.testIdx+1, testcase.desc) {
return false
}

Expand Down
47 changes: 45 additions & 2 deletions agreement/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ const (
// period.
//
// fastTimeout is like timeout but for fast partition recovery.
// speculation timeout marks when the player should start speculative
// block assembly.
timeout
fastTimeout
speculationTimeout

// Other events are delivered from one state machine to another to
// communicate some message or as a reply to some message. These events
Expand Down Expand Up @@ -197,6 +200,14 @@ const (
// readPinned is sent to the proposalStore to read the pinned value, if it exists.
readPinned

// readLowestValue is sent to the proposalPeriodMachine to read the
// proposal-vote with the lowest credential.
readLowestValue

// readLowestPayload is sent to the proposalStore to read the payload
// corresponding to the lowest-credential proposal-vote, if it exists.
readLowestPayload

/*
* The following are event types that replace queries, and may warrant
* a revision to make them more state-machine-esque.
Expand Down Expand Up @@ -363,7 +374,7 @@ func (e roundInterruptionEvent) AttachConsensusVersion(v ConsensusVersionView) e
}

type timeoutEvent struct {
// {timeout,fastTimeout}
// {timeout,fastTimeout,speculationTimeout}
T eventType

RandomEntropy uint64
Expand Down Expand Up @@ -407,6 +418,38 @@ func (e newRoundEvent) ComparableStr() string {
return e.String()
}

type readLowestEvent struct {
// T is either readLowestValue or readLowestPayload
T eventType

// Round and Period are the round and period for which to query
// the lowest-credential value and payload. This type of event
// is only sent for pipelining, which only makes sense for period
// 0, but the Period is here anyway to route to the appropriate
// proposalMachinePeriod.
Round round
Period period

// Proposal holds the lowest-credential value.
Proposal proposalValue
// Payload holds the payload, if one exists (which is the case if PayloadOK is set).
Payload proposal
// PayloadOK is set if and only if a payload was received for the lowest-credential value.
PayloadOK bool
}

func (e readLowestEvent) t() eventType {
return e.T
}

func (e readLowestEvent) String() string {
return fmt.Sprintf("%v: %.5v", e.t().String(), e.Proposal.BlockDigest.String())
}

func (e readLowestEvent) ComparableStr() string {
return e.String()
}

type newPeriodEvent struct {
// Period holds the latest period relevant to the proposalRoundMachine.
Period period
Expand Down Expand Up @@ -744,7 +787,7 @@ func zeroEvent(t eventType) event {
return messageEvent{}
case roundInterruption:
return roundInterruptionEvent{}
case timeout, fastTimeout:
case timeout, fastTimeout, speculationTimeout:
return timeoutEvent{}
case newRound:
return newRoundEvent{}
Expand Down
Loading
Loading