diff --git a/agreement/abstractions.go b/agreement/abstractions.go
index 44aafa4fd8..6cb7256273 100644
--- a/agreement/abstractions.go
+++ b/agreement/abstractions.go
@@ -85,6 +85,8 @@ type BlockFactory interface {
// nodes on the network can assemble entries, the agreement protocol may
// lose liveness.
AssembleBlock(basics.Round) (ValidatedBlock, error)
+
+ StartSpeculativeBlockAssembly(context.Context, ValidatedBlock, crypto.Digest, bool)
}
// A Ledger represents the sequence of Entries agreed upon by the protocol.
diff --git a/agreement/actions.go b/agreement/actions.go
index 833dc8af36..fb93c86d1a 100644
--- a/agreement/actions.go
+++ b/agreement/actions.go
@@ -54,6 +54,8 @@ const (
attest
assemble
repropose
+ speculativeAssembly
+ speculativeAssemblyIfStarted
// disk
checkpoint
@@ -309,13 +311,14 @@ func (a rezeroAction) do(ctx context.Context, s *Service) {
}
type pseudonodeAction struct {
- // assemble, repropose, attest
+ // assemble, repropose, attest, speculativeAssembly, speculativeAssemblyIfStarted
T actionType
- Round round
- Period period
- Step step
- Proposal proposalValue
+ Round round
+ Period period
+ Step step
+ Proposal proposalValue
+ ValidatedBlock ValidatedBlock
}
func (a pseudonodeAction) t() actionType {
@@ -346,6 +349,11 @@ func (a pseudonodeAction) do(ctx context.Context, s *Service) {
default:
s.log.Errorf("pseudonode.MakeProposals call failed %v", err)
}
+ case speculativeAssembly:
+ s.loopback.StartSpeculativeBlockAssembly(ctx, a.ValidatedBlock, a.Proposal.BlockDigest, false)
+ case speculativeAssemblyIfStarted:
+ s.loopback.StartSpeculativeBlockAssembly(ctx, a.ValidatedBlock, a.Proposal.BlockDigest, true)
+
case repropose:
logEvent := logspec.AgreementEvent{
Type: logspec.VoteAttest,
@@ -464,7 +472,7 @@ func zeroAction(t actionType) action {
return ensureAction{}
case rezero:
return rezeroAction{}
- case attest, assemble, repropose:
+ case attest, assemble, repropose, speculativeAssembly, speculativeAssemblyIfStarted:
return pseudonodeAction{}
case checkpoint:
return checkpointAction{}
diff --git a/agreement/actiontype_string.go b/agreement/actiontype_string.go
index 9272ec2cf6..e6cee56ece 100644
--- a/agreement/actiontype_string.go
+++ b/agreement/actiontype_string.go
@@ -23,12 +23,14 @@ func _() {
_ = x[attest-12]
_ = x[assemble-13]
_ = x[repropose-14]
- _ = x[checkpoint-15]
+ _ = x[speculativeAssembly-15]
+ _ = x[speculativeAssemblyIfStarted-16]
+ _ = x[checkpoint-17]
}
-const _actionType_name = "noopignorebroadcastrelaydisconnectbroadcastVotesverifyVoteverifyPayloadverifyBundleensurestageDigestrezeroattestassemblereproposecheckpoint"
+const _actionType_name = "noopignorebroadcastrelaydisconnectbroadcastVotesverifyVoteverifyPayloadverifyBundleensurestageDigestrezeroattestassemblereproposespeculativeAssemblyspeculativeAssemblyIfStartedcheckpoint"
-var _actionType_index = [...]uint8{0, 4, 10, 19, 24, 34, 48, 58, 71, 83, 89, 100, 106, 112, 120, 129, 139}
+var _actionType_index = [...]uint8{0, 4, 10, 19, 24, 34, 48, 58, 71, 83, 89, 100, 106, 112, 120, 129, 148, 176, 186}
func (i actionType) String() string {
if i >= actionType(len(_actionType_index)-1) {
diff --git a/agreement/agreementtest/simulate_test.go b/agreement/agreementtest/simulate_test.go
index 53c42411ed..7d8622e10b 100644
--- a/agreement/agreementtest/simulate_test.go
+++ b/agreement/agreementtest/simulate_test.go
@@ -98,6 +98,9 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc
return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil
}
+func (f testBlockFactory) StartSpeculativeBlockAssembly(context.Context, agreement.ValidatedBlock, crypto.Digest, bool) {
+}
+
// If we try to read from high rounds, we panic and do not emit an error to find bugs during testing.
type testLedger struct {
mu deadlock.Mutex
diff --git a/agreement/common_test.go b/agreement/common_test.go
index 0c11d9553d..9b7cafb9fb 100644
--- a/agreement/common_test.go
+++ b/agreement/common_test.go
@@ -184,6 +184,10 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (ValidatedBlock, error)
return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil
}
+func (f testBlockFactory) StartSpeculativeBlockAssembly(context.Context, ValidatedBlock, crypto.Digest, bool) {
+ return
+}
+
// If we try to read from high rounds, we panic and do not emit an error to find bugs during testing.
type testLedger struct {
mu deadlock.Mutex
diff --git a/agreement/coservice.go b/agreement/coservice.go
index 7fe4ab1f52..b8952c8142 100644
--- a/agreement/coservice.go
+++ b/agreement/coservice.go
@@ -17,9 +17,10 @@
package agreement
import (
- "github.com/algorand/go-deadlock"
+ "runtime"
+ "testing"
- "github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-deadlock"
)
//go:generate stringer -type=coserviceType
@@ -35,10 +36,12 @@ const (
//msgp:ignore coserviceType
type coserviceType int
+// coserviceMonitor is unit test instrumentation
type coserviceMonitor struct {
deadlock.Mutex
id int
+ t *testing.T
c map[coserviceType]uint
coserviceListener
@@ -79,7 +82,10 @@ func (m *coserviceMonitor) dec(t coserviceType) {
m.c = make(map[coserviceType]uint)
}
if m.c[t] == 0 {
- logging.Base().Panicf("%d: tried to decrement empty coservice queue %v", m.id, t)
+ var stack [1000]byte
+ sl := runtime.Stack(stack[:], false)
+ m.t.Log(string(stack[:sl]))
+ m.t.Fatalf("%d: tried to decrement empty coservice queue %v", m.id, t)
}
m.c[t]--
diff --git a/agreement/demux.go b/agreement/demux.go
index f31c4d075d..b9de002101 100644
--- a/agreement/demux.go
+++ b/agreement/demux.go
@@ -60,10 +60,13 @@ type demux struct {
queue []<-chan externalEvent
processingMonitor EventsProcessingMonitor
- monitor *coserviceMonitor
cancelTokenizers context.CancelFunc
log logging.Logger
+
+ // coserviceMonitor is unit test instrumentation.
+ // should be fast no-op if monitor == nil
+ monitor *coserviceMonitor
}
// demuxParams contains the parameters required to initliaze a new demux object
@@ -74,7 +77,10 @@ type demuxParams struct {
voteVerifier *AsyncVoteVerifier
processingMonitor EventsProcessingMonitor
log logging.Logger
- monitor *coserviceMonitor
+
+ // coserviceMonitor is unit test instrumentation.
+ // should be fast no-op if monitor == nil
+ monitor *coserviceMonitor
}
// makeDemux initializes the goroutines needed to process external events, setting up the appropriate channels.
@@ -190,7 +196,7 @@ func (d *demux) verifyBundle(ctx context.Context, m message, r round, p period,
// next blocks until it observes an external input event of interest for the state machine.
//
// If ok is false, there are no more events so the agreement service should quit.
-func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Duration, currentRound round) (e externalEvent, ok bool) {
+func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Duration, speculationDeadline time.Duration, currentRound round) (e externalEvent, ok bool) {
defer func() {
if !ok {
return
@@ -253,6 +259,14 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
deadlineCh := s.Clock.TimeoutAt(deadline)
fastDeadlineCh := s.Clock.TimeoutAt(fastDeadline)
+ var speculationDeadlineCh <-chan time.Time
+ // zero timeout means we don't have enough time to speculate on block assembly
+ if speculationDeadline != 0 {
+ speculationDeadlineCh = s.Clock.TimeoutAt(speculationDeadline)
+ }
+
+ //d.log.Infof("demux deadline %d, fastD %d, specD %d, d.monitor %v", deadline, fastDeadline, speculationDeadline, d.monitor) // not threadsafe in some tests
+
d.UpdateEventsQueue(eventQueueDemux, 0)
d.monitor.dec(demuxCoserviceType)
@@ -269,7 +283,7 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
// the pseudonode channel got closed. remove it from the queue and try again.
d.queue = d.queue[1:]
d.UpdateEventsQueue(eventQueuePseudonode, 0)
- return d.next(s, deadline, fastDeadline, currentRound)
+ return d.next(s, deadline, fastDeadline, speculationDeadline, currentRound)
// control
case <-s.quit:
@@ -303,6 +317,11 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
d.UpdateEventsQueue(eventQueueDemux, 1)
d.monitor.inc(demuxCoserviceType)
d.monitor.dec(clockCoserviceType)
+ case <-speculationDeadlineCh:
+ e = timeoutEvent{T: speculationTimeout, RandomEntropy: s.RandomSource.Uint64(), Round: nextRound}
+ d.UpdateEventsQueue(eventQueueDemux, 1)
+ d.monitor.inc(demuxCoserviceType)
+ d.monitor.dec(clockCoserviceType)
// raw
case m, open := <-rawVotes:
@@ -358,6 +377,7 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
}
// setupCompoundMessage processes compound messages: distinct messages which are delivered together
+// TODO: does this ever really see something other than empty .Vote?
func setupCompoundMessage(l LedgerReader, m message) (res externalEvent) {
compound := m.CompoundMessage
if compound.Vote == (unauthenticatedVote{}) {
diff --git a/agreement/demux_test.go b/agreement/demux_test.go
index 027dbc9e13..4b0d74f0ae 100644
--- a/agreement/demux_test.go
+++ b/agreement/demux_test.go
@@ -37,6 +37,7 @@ import (
)
const fastTimeoutChTime = 2
+const speculativeBlockAsmTime = time.Duration(5 * time.Microsecond)
type demuxTester struct {
*testing.T
@@ -65,8 +66,10 @@ type demuxTestUsecase struct {
verifiedProposal testChanState
verifiedBundle testChanState
// expected output
- e event
- ok bool
+ e event
+ ok bool
+ speculativeAsmTime time.Duration
+ desc string
}
var demuxTestUsecases = []demuxTestUsecase{
@@ -195,6 +198,7 @@ var demuxTestUsecases = []demuxTestUsecase{
verifiedBundle: testChanState{eventCount: 0, closed: false},
e: messageEvent{T: votePresent},
ok: true,
+ desc: "one vote one prop",
},
{
queue: []testChanState{},
@@ -411,6 +415,26 @@ var demuxTestUsecases = []demuxTestUsecase{
verifiedBundle: testChanState{eventCount: 0, closed: false},
e: messageEvent{T: votePresent},
ok: true,
+ desc: "one prop",
+ },
+ {
+ queue: []testChanState{{eventCount: 0, closed: false}},
+ rawVotes: testChanState{eventCount: 0, closed: false},
+ rawProposals: testChanState{eventCount: 0, closed: false},
+ rawBundles: testChanState{eventCount: 0, closed: false},
+ compoundProposals: false,
+ quit: false,
+ voteChannelFull: false,
+ proposalChannelFull: false,
+ bundleChannelFull: false,
+ ledgerRoundReached: false,
+ deadlineReached: false,
+ verifiedVote: testChanState{eventCount: 0, closed: false},
+ verifiedProposal: testChanState{eventCount: 0, closed: false},
+ verifiedBundle: testChanState{eventCount: 0, closed: false},
+ e: timeoutEvent{T: speculationTimeout},
+ ok: true,
+ speculativeAsmTime: speculativeBlockAsmTime,
},
}
@@ -432,6 +456,9 @@ func (t *demuxTester) TimeoutAt(delta time.Duration) <-chan time.Time {
if delta == fastTimeoutChTime {
return nil
}
+ if delta == speculativeBlockAsmTime {
+ return time.After(delta)
+ }
c := make(chan time.Time, 2)
if t.currentUsecase.deadlineReached {
@@ -653,6 +680,7 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool {
dmx := &demux{}
+ dmx.log = logging.TestingLog(t)
dmx.crypto = t
dmx.ledger = t
dmx.rawVotes = t.makeRawChannel(protocol.AgreementVoteTag, testcase.rawVotes, false)
@@ -674,14 +702,13 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool {
if testcase.quit {
close(s.quit)
}
-
- e, ok := dmx.next(s, time.Second, fastTimeoutChTime, 300)
+ e, ok := dmx.next(s, time.Second, fastTimeoutChTime, testcase.speculativeAsmTime, 300)
if !assert.Equal(t, testcase.ok, ok) {
return false
}
- if !assert.Equalf(t, strings.Replace(testcase.e.String(), "{test_index}", fmt.Sprintf("%d", t.testIdx), 1), e.String(), "Test case %d failed.", t.testIdx+1) {
+ if !assert.Equalf(t, strings.Replace(testcase.e.String(), "{test_index}", fmt.Sprintf("%d", t.testIdx), 1), e.String(), "Test case %d (%s) failed.", t.testIdx+1, testcase.desc) {
return false
}
diff --git a/agreement/events.go b/agreement/events.go
index 52737e5f2c..88d7ec6a4f 100644
--- a/agreement/events.go
+++ b/agreement/events.go
@@ -122,8 +122,11 @@ const (
// period.
//
// fastTimeout is like timeout but for fast partition recovery.
+ // speculation timeout marks when the player should start speculative
+ // block assembly.
timeout
fastTimeout
+ speculationTimeout
// Other events are delivered from one state machine to another to
// communicate some message or as a reply to some message. These events
@@ -197,6 +200,14 @@ const (
// readPinned is sent to the proposalStore to read the pinned value, if it exists.
readPinned
+ // readLowestValue is sent to the proposalPeriodMachine to read the
+ // proposal-vote with the lowest credential.
+ readLowestValue
+
+ // readLowestPayload is sent to the proposalStore to read the payload
+ // corresponding to the lowest-credential proposal-vote, if it exists.
+ readLowestPayload
+
/*
* The following are event types that replace queries, and may warrant
* a revision to make them more state-machine-esque.
@@ -363,7 +374,7 @@ func (e roundInterruptionEvent) AttachConsensusVersion(v ConsensusVersionView) e
}
type timeoutEvent struct {
- // {timeout,fastTimeout}
+ // {timeout,fastTimeout,speculationTimeout}
T eventType
RandomEntropy uint64
@@ -407,6 +418,38 @@ func (e newRoundEvent) ComparableStr() string {
return e.String()
}
+type readLowestEvent struct {
+ // T is either readLowestValue or readLowestPayload
+ T eventType
+
+ // Round and Period are the round and period for which to query
+ // the lowest-credential value and payload. This type of event
+ // is only sent for pipelining, which only makes sense for period
+ // 0, but the Period is here anyway to route to the appropriate
+ // proposalMachinePeriod.
+ Round round
+ Period period
+
+ // Proposal holds the lowest-credential value.
+ Proposal proposalValue
+ // Payload holds the payload, if one exists (which is the case if PayloadOK is set).
+ Payload proposal
+ // PayloadOK is set if and only if a payload was received for the lowest-credential value.
+ PayloadOK bool
+}
+
+func (e readLowestEvent) t() eventType {
+ return e.T
+}
+
+func (e readLowestEvent) String() string {
+ return fmt.Sprintf("%v: %.5v", e.t().String(), e.Proposal.BlockDigest.String())
+}
+
+func (e readLowestEvent) ComparableStr() string {
+ return e.String()
+}
+
type newPeriodEvent struct {
// Period holds the latest period relevant to the proposalRoundMachine.
Period period
@@ -744,7 +787,7 @@ func zeroEvent(t eventType) event {
return messageEvent{}
case roundInterruption:
return roundInterruptionEvent{}
- case timeout, fastTimeout:
+ case timeout, fastTimeout, speculationTimeout:
return timeoutEvent{}
case newRound:
return newRoundEvent{}
diff --git a/agreement/eventtype_string.go b/agreement/eventtype_string.go
index 9da84c1b98..2b1f26fd9c 100644
--- a/agreement/eventtype_string.go
+++ b/agreement/eventtype_string.go
@@ -18,40 +18,43 @@ func _() {
_ = x[roundInterruption-7]
_ = x[timeout-8]
_ = x[fastTimeout-9]
- _ = x[softThreshold-10]
- _ = x[certThreshold-11]
- _ = x[nextThreshold-12]
- _ = x[proposalCommittable-13]
- _ = x[proposalAccepted-14]
- _ = x[voteFiltered-15]
- _ = x[voteMalformed-16]
- _ = x[bundleFiltered-17]
- _ = x[bundleMalformed-18]
- _ = x[payloadRejected-19]
- _ = x[payloadMalformed-20]
- _ = x[payloadPipelined-21]
- _ = x[payloadAccepted-22]
- _ = x[proposalFrozen-23]
- _ = x[voteAccepted-24]
- _ = x[newRound-25]
- _ = x[newPeriod-26]
- _ = x[readStaging-27]
- _ = x[readPinned-28]
- _ = x[voteFilterRequest-29]
- _ = x[voteFilteredStep-30]
- _ = x[nextThresholdStatusRequest-31]
- _ = x[nextThresholdStatus-32]
- _ = x[freshestBundleRequest-33]
- _ = x[freshestBundle-34]
- _ = x[dumpVotesRequest-35]
- _ = x[dumpVotes-36]
- _ = x[wrappedAction-37]
- _ = x[checkpointReached-38]
+ _ = x[speculationTimeout-10]
+ _ = x[softThreshold-11]
+ _ = x[certThreshold-12]
+ _ = x[nextThreshold-13]
+ _ = x[proposalCommittable-14]
+ _ = x[proposalAccepted-15]
+ _ = x[voteFiltered-16]
+ _ = x[voteMalformed-17]
+ _ = x[bundleFiltered-18]
+ _ = x[bundleMalformed-19]
+ _ = x[payloadRejected-20]
+ _ = x[payloadMalformed-21]
+ _ = x[payloadPipelined-22]
+ _ = x[payloadAccepted-23]
+ _ = x[proposalFrozen-24]
+ _ = x[voteAccepted-25]
+ _ = x[newRound-26]
+ _ = x[newPeriod-27]
+ _ = x[readStaging-28]
+ _ = x[readPinned-29]
+ _ = x[readLowestValue-30]
+ _ = x[readLowestPayload-31]
+ _ = x[voteFilterRequest-32]
+ _ = x[voteFilteredStep-33]
+ _ = x[nextThresholdStatusRequest-34]
+ _ = x[nextThresholdStatus-35]
+ _ = x[freshestBundleRequest-36]
+ _ = x[freshestBundle-37]
+ _ = x[dumpVotesRequest-38]
+ _ = x[dumpVotes-39]
+ _ = x[wrappedAction-40]
+ _ = x[checkpointReached-41]
}
-const _eventType_name = "nonevotePresentpayloadPresentbundlePresentvoteVerifiedpayloadVerifiedbundleVerifiedroundInterruptiontimeoutfastTimeoutsoftThresholdcertThresholdnextThresholdproposalCommittableproposalAcceptedvoteFilteredvoteMalformedbundleFilteredbundleMalformedpayloadRejectedpayloadMalformedpayloadPipelinedpayloadAcceptedproposalFrozenvoteAcceptednewRoundnewPeriodreadStagingreadPinnedvoteFilterRequestvoteFilteredStepnextThresholdStatusRequestnextThresholdStatusfreshestBundleRequestfreshestBundledumpVotesRequestdumpVoteswrappedActioncheckpointReached"
+const _eventType_name = "nonevotePresentpayloadPresentbundlePresentvoteVerifiedpayloadVerifiedbundleVerifiedroundInterruptiontimeoutfastTimeoutspeculationTimeoutsoftThresholdcertThresholdnextThresholdproposalCommittableproposalAcceptedvoteFilteredvoteMalformedbundleFilteredbundleMalformedpayloadRejectedpayloadMalformedpayloadPipelinedpayloadAcceptedproposalFrozenvoteAcceptednewRoundnewPeriodreadStagingreadPinnedreadLowestValuereadLowestPayloadvoteFilterRequestvoteFilteredStepnextThresholdStatusRequestnextThresholdStatusfreshestBundleRequestfreshestBundledumpVotesRequestdumpVoteswrappedActioncheckpointReached"
-var _eventType_index = [...]uint16{0, 4, 15, 29, 42, 54, 69, 83, 100, 107, 118, 131, 144, 157, 176, 192, 204, 217, 231, 246, 261, 277, 293, 308, 322, 334, 342, 351, 362, 372, 389, 405, 431, 450, 471, 485, 501, 510, 523, 540}
+var _eventType_index = [...]uint16{0, 4, 15, 29, 42, 54, 69, 83, 100, 107, 118, 136, 149, 162, 175, 194, 210, 222, 235, 249, 264, 279, 295, 311, 326, 340, 352, 360, 369, 380, 390, 405, 422, 439, 455, 481, 500, 521, 535, 551, 560, 573, 590}
func (i eventType) String() string {
if i >= eventType(len(_eventType_index)-1) {
diff --git a/agreement/fuzzer/ledger_test.go b/agreement/fuzzer/ledger_test.go
index a62caee4d9..6dd74f002c 100644
--- a/agreement/fuzzer/ledger_test.go
+++ b/agreement/fuzzer/ledger_test.go
@@ -112,6 +112,9 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc
return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil
}
+func (f testBlockFactory) StartSpeculativeBlockAssembly(context.Context, agreement.ValidatedBlock, crypto.Digest, bool) {
+}
+
type testLedgerSyncFunc func(l *testLedger, r basics.Round, c agreement.Certificate) bool
// If we try to read from high rounds, we panic and do not emit an error to find bugs during testing.
diff --git a/agreement/msgp_gen.go b/agreement/msgp_gen.go
index 19a803b759..f7bf8e64a9 100644
--- a/agreement/msgp_gen.go
+++ b/agreement/msgp_gen.go
@@ -3753,9 +3753,12 @@ func PeriodRouterMaxSize() (s int) {
// MarshalMsg implements msgp.Marshaler
func (z *player) MarshalMsg(b []byte) (o []byte) {
o = msgp.Require(b, z.Msgsize())
- // map header, size 8
+ // map header, size 9
+ // string "ConsensusVersion"
+ o = append(o, 0x89, 0xb0, 0x43, 0x6f, 0x6e, 0x73, 0x65, 0x6e, 0x73, 0x75, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e)
+ o = (*z).ConsensusVersion.MarshalMsg(o)
// string "Deadline"
- o = append(o, 0x88, 0xa8, 0x44, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65)
+ o = append(o, 0xa8, 0x44, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65)
o = msgp.AppendDuration(o, (*z).Deadline)
// string "FastRecoveryDeadline"
o = append(o, 0xb4, 0x46, 0x61, 0x73, 0x74, 0x52, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x44, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65)
@@ -3875,6 +3878,14 @@ func (z *player) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
}
+ if zb0001 > 0 {
+ zb0001--
+ bts, err = (*z).ConsensusVersion.UnmarshalMsg(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "struct-from-array", "ConsensusVersion")
+ return
+ }
+ }
if zb0001 > 0 {
err = msgp.ErrTooManyArrayFields(zb0001)
if err != nil {
@@ -3958,6 +3969,12 @@ func (z *player) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Pending")
return
}
+ case "ConsensusVersion":
+ bts, err = (*z).ConsensusVersion.UnmarshalMsg(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "ConsensusVersion")
+ return
+ }
default:
err = msgp.ErrNoField(string(field))
if err != nil {
@@ -3978,13 +3995,13 @@ func (_ *player) CanUnmarshalMsg(z interface{}) bool {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *player) Msgsize() (s int) {
- s = 1 + 6 + (*z).Round.Msgsize() + 7 + msgp.Uint64Size + 5 + msgp.Uint64Size + 15 + msgp.Uint64Size + 9 + msgp.DurationSize + 8 + msgp.BoolSize + 21 + msgp.DurationSize + 8 + (*z).Pending.Msgsize()
+ s = 1 + 6 + (*z).Round.Msgsize() + 7 + msgp.Uint64Size + 5 + msgp.Uint64Size + 15 + msgp.Uint64Size + 9 + msgp.DurationSize + 8 + msgp.BoolSize + 21 + msgp.DurationSize + 8 + (*z).Pending.Msgsize() + 17 + (*z).ConsensusVersion.Msgsize()
return
}
// MsgIsZero returns whether this is a zero value
func (z *player) MsgIsZero() bool {
- return ((*z).Round.MsgIsZero()) && ((*z).Period == 0) && ((*z).Step == 0) && ((*z).LastConcluding == 0) && ((*z).Deadline == 0) && ((*z).Napping == false) && ((*z).FastRecoveryDeadline == 0) && ((*z).Pending.MsgIsZero())
+ return ((*z).Round.MsgIsZero()) && ((*z).Period == 0) && ((*z).Step == 0) && ((*z).LastConcluding == 0) && ((*z).Deadline == 0) && ((*z).Napping == false) && ((*z).FastRecoveryDeadline == 0) && ((*z).Pending.MsgIsZero()) && ((*z).ConsensusVersion.MsgIsZero())
}
// MaxSize returns a maximum valid message size for this message type
diff --git a/agreement/persistence_test.go b/agreement/persistence_test.go
index fbd9323b09..19a8ac94e2 100644
--- a/agreement/persistence_test.go
+++ b/agreement/persistence_test.go
@@ -183,15 +183,18 @@ func randomizeDiskState() (rr rootRouter, p player) {
func TestRandomizedEncodingFullDiskState(t *testing.T) {
partitiontest.PartitionTest(t)
for i := 0; i < 5000; i++ {
+ if i%100 == 0 {
+ t.Logf("i=%d", i)
+ }
router, player := randomizeDiskState()
a := []action{}
clock := timers.MakeMonotonicClock(time.Date(2015, 1, 2, 5, 6, 7, 8, time.UTC))
log := makeServiceLogger(logging.Base())
- e1 := encode(clock, router, player, a, true)
- e2 := encode(clock, router, player, a, false)
- require.Equalf(t, e1, e2, "msgp and go-codec encodings differ: len(msgp)=%v, len(reflect)=%v", len(e1), len(e2))
- _, rr1, p1, _, err1 := decode(e1, clock, log, true)
- _, rr2, p2, _, err2 := decode(e1, clock, log, false)
+ eReflect := encode(clock, router, player, a, true) // reflect=true
+ eMsgp := encode(clock, router, player, a, false)
+ require.Equalf(t, eMsgp, eReflect, "msgp and go-codec encodings differ: len(msgp)=%v, len(reflect)=%v", len(eMsgp), len(eReflect))
+ _, rr1, p1, _, err1 := decode(eReflect, clock, log, true)
+ _, rr2, p2, _, err2 := decode(eReflect, clock, log, false)
require.NoErrorf(t, err1, "reflect decoding failed")
require.NoErrorf(t, err2, "msgp decoding failed")
require.Equalf(t, rr1, rr2, "rootRouters decoded differently")
diff --git a/agreement/player.go b/agreement/player.go
index 1ae552b0b1..9b7005cde1 100644
--- a/agreement/player.go
+++ b/agreement/player.go
@@ -52,6 +52,9 @@ type player struct {
// Pending holds the player's proposalTable, which stores proposals that
// must be verified after some vote has been verified.
Pending proposalTable
+
+ // the current consensus version
+ ConsensusVersion protocol.ConsensusVersion
}
func (p *player) T() stateMachineTag {
@@ -85,6 +88,10 @@ func (p *player) handle(r routerHandle, e event) []action {
r.t.logTimeout(*p)
}
+ if e.T == speculationTimeout {
+ return p.handleSpeculationTimeout(r, e)
+ }
+
switch p.Step {
case soft:
// precondition: nap = false
@@ -123,6 +130,16 @@ func (p *player) handle(r routerHandle, e event) []action {
}
}
+// handleSpeculationTimeout TODO: rename this 'timeout' is the START of speculative assembly.
+func (p *player) handleSpeculationTimeout(r routerHandle, e timeoutEvent) []action {
+ if e.Proto.Err != nil {
+ r.t.log.Errorf("failed to read protocol version for speculationTimeout event (proto %v): %v", e.Proto.Version, e.Proto.Err)
+ return nil
+ }
+
+ return p.startSpeculativeBlockAsm(r, nil, false)
+}
+
func (p *player) handleFastTimeout(r routerHandle, e timeoutEvent) []action {
if e.Proto.Err != nil {
r.t.log.Errorf("failed to read protocol version for fastTimeout event (proto %v): %v", e.Proto.Version, e.Proto.Err)
@@ -161,6 +178,7 @@ func (p *player) issueSoftVote(r routerHandle) (actions []action) {
// If we arrive due to fast-forward/soft threshold; then answer.Bottom = false and answer.Proposal = bottom
// and we should soft-vote normally (not based on the starting value)
a.Proposal = nextStatus.Proposal
+ // TODO: how do we speculative block assemble based on nextStatus.Proposal?
return append(actions, a)
}
@@ -177,8 +195,10 @@ func (p *player) issueSoftVote(r routerHandle) (actions []action) {
return nil
}
- // original proposal: vote for it
- return append(actions, a)
+ // original proposal: vote for it, maybe build
+ actions = append(actions, a)
+ actions = p.startSpeculativeBlockAsm(r, actions, false)
+ return actions
}
// A committableEvent is the trigger for issuing a cert vote.
@@ -217,6 +237,27 @@ func (p *player) issueNextVote(r routerHandle) []action {
return actions
}
+func (p *player) startSpeculativeBlockAsm(r routerHandle, actions []action, onlyIfStarted bool) []action {
+ if p.Period != 0 {
+ // If not period 0, cautiously do a simpler protocol.
+ return actions
+ }
+ // get the best proposal we have
+ re := readLowestEvent{T: readLowestPayload, Round: p.Round, Period: p.Period}
+ re = r.dispatch(*p, re, proposalMachineRound, p.Round, p.Period, 0).(readLowestEvent)
+
+ // if we have its payload and its been validated already, start speculating on top of it
+ if re.PayloadOK && re.Payload.ve != nil {
+ a := pseudonodeAction{T: speculativeAssembly, Round: p.Round, Period: p.Period, ValidatedBlock: re.Payload.ve, Proposal: re.Proposal}
+ if onlyIfStarted {
+ // only re-start speculation if we had already started it but got a better block
+ a.T = speculativeAssemblyIfStarted
+ }
+ return append(actions, a)
+ }
+ return actions
+}
+
func (p *player) issueFastVote(r routerHandle) (actions []action) {
actions = p.partitionPolicy(r)
@@ -585,6 +626,13 @@ func (p *player) handleMessageEvent(r routerHandle, e messageEvent) (actions []a
actions = append(actions, a)
}
+ // StartSpeculativeBlockAssembly every time we validate a proposal
+ // TODO: maybe only do this if after speculation has started; interrupt speculation on a block when we get a better block
+ // TODO: maybe don't do this at all and just delete it?
+ if ef.t() == payloadAccepted {
+ actions = p.startSpeculativeBlockAsm(r, actions, true)
+ }
+
// If the payload is valid, check it against any received cert threshold.
// Of course, this should only trigger for payloadVerified case.
// This allows us to handle late payloads (relative to cert-bundles, i.e., certificates) without resorting to catchup.
diff --git a/agreement/player_test.go b/agreement/player_test.go
index 75987c2ed0..441ebd1469 100644
--- a/agreement/player_test.go
+++ b/agreement/player_test.go
@@ -114,13 +114,28 @@ func simulateProposalPayloads(t *testing.T, router *rootRouter, player *player,
}
}
+// most player code returns a fixed number of actions, but some player code sometimes adds an exctra speculative block assembly action that we filter out to make tests simpler
+func ignoreSpeculativeAssembly(a []action) []action {
+ var out []action
+ for _, ia := range a {
+ switch ia.t() {
+ case speculativeAssembly, speculativeAssemblyIfStarted:
+ // do not copy out
+ default:
+ out = append(out, ia)
+ }
+ }
+ return out
+}
+
func simulateProposals(t *testing.T, router *rootRouter, player *player, voteBatch []event, payloadBatch []event) {
for i, e := range voteBatch {
var res []action
*player, res = router.submitTop(&playerTracer, *player, e)
- earlier := res
+ earlier := ignoreSpeculativeAssembly(res)
*player, res = router.submitTop(&playerTracer, *player, payloadBatch[i])
+ res = ignoreSpeculativeAssembly(res)
if len(res) != len(earlier) {
panic("proposal action mismatch")
}
@@ -136,13 +151,25 @@ func simulateTimeoutExpectSoft(t *testing.T, router *rootRouter, player *player,
var res []action
e := makeTimeoutEvent()
*player, res = router.submitTop(&playerTracer, *player, e)
- if len(res) != 1 {
- panic("wrong number of actions")
+ // one or two actions, one must be attest, other _may_ be speculativeAssembly
+ require.True(t, 1 <= len(res) && len(res) <= 2, "wrong number of actions")
+
+ hasAttest := false
+ var a1x action
+
+ for _, ax := range res {
+ switch ax.t() {
+ case attest:
+ hasAttest = true
+ a1x = ax
+ case speculativeAssembly:
+ // ok
+ default:
+ t.Fatalf("bad action type: %v", ax.t())
+ }
}
-
- a1x := res[0]
- if a1x.t() != attest {
- panic("action 1 is not attest")
+ if !hasAttest {
+ panic("missing attest")
}
a1 := a1x.(pseudonodeAction)
diff --git a/agreement/proposal.go b/agreement/proposal.go
index bf021f2cfe..48a72b7c15 100644
--- a/agreement/proposal.go
+++ b/agreement/proposal.go
@@ -84,7 +84,7 @@ func (p unauthenticatedProposal) value() proposalValue {
}
}
-// A proposal is an Block along with everything needed to validate it.
+// A proposal is a Block along with everything needed to validate it.
type proposal struct {
unauthenticatedProposal
diff --git a/agreement/proposalStore.go b/agreement/proposalStore.go
index 080609de50..3feea81d76 100644
--- a/agreement/proposalStore.go
+++ b/agreement/proposalStore.go
@@ -352,6 +352,15 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event {
se.Committable = ea.Assembled
se.Payload = ea.Payload
return se
+ case readLowestPayload:
+ re := e.(readLowestEvent)
+ re.T = readLowestValue
+ re = r.dispatch(p, re, proposalMachinePeriod, re.Round, re.Period, 0).(readLowestEvent)
+ re.T = readLowestPayload
+ ea := store.Assemblers[re.Proposal]
+ re.PayloadOK = ea.Assembled
+ re.Payload = ea.Payload
+ return re
case readPinned:
se := e.(pinnedValueEvent)
ea := store.Assemblers[store.Pinned] // If pinned is bottom, assembled/payloadOK = false, payload = bottom
diff --git a/agreement/proposalTracker.go b/agreement/proposalTracker.go
index 59ffb77a28..7627cdd28d 100644
--- a/agreement/proposalTracker.go
+++ b/agreement/proposalTracker.go
@@ -165,6 +165,11 @@ func (t *proposalTracker) handle(r routerHandle, p player, e event) event {
t.Freezer = t.Freezer.freeze()
return e
+ case readLowestValue:
+ e := e.(readLowestEvent)
+ e.Proposal = t.Freezer.Lowest.R.Proposal
+ return e
+
case softThreshold, certThreshold:
e := e.(thresholdEvent)
t.Staging = e.Proposal
diff --git a/agreement/proposalTrackerContract.go b/agreement/proposalTrackerContract.go
index 2b995dfcac..5451d8152d 100644
--- a/agreement/proposalTrackerContract.go
+++ b/agreement/proposalTrackerContract.go
@@ -32,7 +32,7 @@ type proposalTrackerContract struct {
// TODO check concrete types of events
func (c *proposalTrackerContract) pre(p player, in event) (pre []error) {
switch in.t() {
- case voteVerified, proposalFrozen, softThreshold, certThreshold, voteFilterRequest, readStaging:
+ case voteVerified, proposalFrozen, softThreshold, certThreshold, voteFilterRequest, readStaging, readLowestValue, readLowestPayload:
default:
pre = append(pre, fmt.Errorf("incoming event has invalid type: %v", in.t()))
}
diff --git a/agreement/pseudonode.go b/agreement/pseudonode.go
index 78f6674d7b..3bd55416f2 100644
--- a/agreement/pseudonode.go
+++ b/agreement/pseudonode.go
@@ -23,6 +23,7 @@ import (
"sync"
"time"
+ "github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/account"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/logging"
@@ -60,6 +61,9 @@ type pseudonode interface {
// It returns an error if the pseudonode is unable to perform this.
MakeProposals(ctx context.Context, r round, p period) (<-chan externalEvent, error)
+ // StartSpeculativeBlockAssembly calls the BlockFactory's StartSpeculativeBlockAssembly on a new go routine.
+ StartSpeculativeBlockAssembly(ctx context.Context, ve ValidatedBlock, blockHash crypto.Digest, onlyIfStarted bool)
+
// MakeVotes returns a vote for a given proposal in some round, period, and step.
//
// The passed-in context may be used to cancel vote creation and close the channel immediately.
@@ -185,6 +189,11 @@ func (n asyncPseudonode) MakeProposals(ctx context.Context, r round, p period) (
}
}
+func (n asyncPseudonode) StartSpeculativeBlockAssembly(ctx context.Context, ve ValidatedBlock, blockHash crypto.Digest, onlyIfStarted bool) {
+ // TODO: make this synchronous instead of thread? (thread creation likely moving to inside transactionPool)
+ go n.factory.StartSpeculativeBlockAssembly(ctx, ve, blockHash, onlyIfStarted)
+}
+
func (n asyncPseudonode) MakeVotes(ctx context.Context, r round, p period, s step, prop proposalValue, persistStateDone chan error) (chan externalEvent, error) {
proposalTask := n.makeVotesTask(ctx, r, p, s, prop, persistStateDone)
if len(proposalTask.participation) == 0 {
diff --git a/agreement/service.go b/agreement/service.go
index 3cac126193..708b37c7fa 100644
--- a/agreement/service.go
+++ b/agreement/service.go
@@ -80,9 +80,10 @@ type parameters Parameters
// externalDemuxSignals used to syncronize the external signals that goes to the demux with the main loop.
type externalDemuxSignals struct {
- Deadline time.Duration
- FastRecoveryDeadline time.Duration
- CurrentRound round
+ Deadline time.Duration
+ FastRecoveryDeadline time.Duration
+ SpeculativeBlockAsmDeadline time.Duration
+ CurrentRound round
}
// MakeService creates a new Agreement Service instance given a set of Parameters.
@@ -147,6 +148,7 @@ func (s *Service) Start() {
input := make(chan externalEvent)
output := make(chan []action)
ready := make(chan externalDemuxSignals)
+ // TODO: use demuxOne() inside of mainLoop() instead of this nonsense pair of threads
go s.demuxLoop(ctx, input, output, ready)
go s.mainLoop(input, output, ready)
}
@@ -166,7 +168,7 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out
for a := range output {
s.do(ctx, a)
extSignals := <-ready
- e, ok := s.demux.next(s, extSignals.Deadline, extSignals.FastRecoveryDeadline, extSignals.CurrentRound)
+ e, ok := s.demux.next(s, extSignals.Deadline, extSignals.FastRecoveryDeadline, extSignals.SpeculativeBlockAsmDeadline, extSignals.CurrentRound)
if !ok {
close(input)
break
@@ -179,6 +181,12 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out
close(s.done)
}
+// TODO: use demuxOne() inside mainLoop() instead of having a pair of synchronous go threads trading off via chan
+func (s *Service) demuxOne(ctx context.Context, a []action, extSignals externalDemuxSignals) (e externalEvent, ok bool) {
+ s.do(ctx, a)
+ return s.demux.next(s, extSignals.Deadline, extSignals.FastRecoveryDeadline, extSignals.SpeculativeBlockAsmDeadline, extSignals.CurrentRound)
+}
+
// mainLoop drives the state machine.
//
// After possibly restoring from disk and then initializing, it does the following in a loop:
@@ -225,8 +233,17 @@ func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, r
}
for {
+ status.ConsensusVersion, err = s.Ledger.ConsensusVersion(status.Round.SubSaturate(2))
+ if err != nil {
+ s.Panicf("cannot read latest consensus version, round %d: %v", status.Round.SubSaturate(2), err)
+ }
+
+ // set speculative block assembly based on the current local configuration
+ specClock := SpeculativeBlockAsmTime(status.Period, status.ConsensusVersion, s.parameters.Local.SpeculativeAsmTimeOffset)
+
+ // TODO: e, ok := s.demuxOne(ctx, a, externalDemuxSignals{Deadline: status.Deadline, FastRecoveryDeadline: status.FastRecoveryDeadline, SpeculativeBlockAsmDeadline: specClock, CurrentRound: status.Round})
output <- a
- ready <- externalDemuxSignals{Deadline: status.Deadline, FastRecoveryDeadline: status.FastRecoveryDeadline, CurrentRound: status.Round}
+ ready <- externalDemuxSignals{Deadline: status.Deadline, FastRecoveryDeadline: status.FastRecoveryDeadline, SpeculativeBlockAsmDeadline: specClock, CurrentRound: status.Round}
e, ok := <-input
if !ok {
break
diff --git a/agreement/service_test.go b/agreement/service_test.go
index e24f81890c..1af0f4e1e5 100644
--- a/agreement/service_test.go
+++ b/agreement/service_test.go
@@ -150,7 +150,7 @@ type testingNetworkEndpoint struct {
type nodeID int
// bufferCapacity is per channel
-func makeTestingNetwork(nodes int, bufferCapacity int, validator BlockValidator) *testingNetwork {
+func makeTestingNetwork(t *testing.T, nodes int, bufferCapacity int, validator BlockValidator) *testingNetwork {
n := new(testingNetwork)
n.validator = validator
@@ -168,6 +168,7 @@ func makeTestingNetwork(nodes int, bufferCapacity int, validator BlockValidator)
m := new(coserviceMonitor)
m.id = i
+ m.t = t
n.monitors[nodeID(i)] = m
}
@@ -507,10 +508,13 @@ type activityMonitor struct {
quiet chan struct{}
cb func(nodeID, map[coserviceType]uint)
+
+ t *testing.T
}
-func makeActivityMonitor() (m *activityMonitor) {
+func makeActivityMonitor(t *testing.T) (m *activityMonitor) {
m = new(activityMonitor)
+ m.t = t
m.sums = make(map[nodeID]uint)
m.listeners = make(map[nodeID]coserviceListener)
m.activity = make(chan struct{}, 1000)
@@ -549,17 +553,29 @@ func (m *activityMonitor) waitForActivity() {
}
func (m *activityMonitor) waitForQuiet() {
+ if m.t.Failed() {
+ return
+ }
+ dt := time.NewTicker(100 * time.Millisecond)
+ defer dt.Stop()
+ ticksRemaining := 100
select {
case <-m.quiet:
- case <-time.After(10 * time.Second):
- m.dump()
+ case <-dt.C:
+ if m.t.Failed() {
+ return
+ }
+ if ticksRemaining <= 0 {
+ m.dump()
- var buf [1000000]byte
- n := runtime.Stack(buf[:], true)
- fmt.Println("Printing goroutine dump of size", n)
- fmt.Println(string(buf[:n]))
+ var buf [1000000]byte
+ n := runtime.Stack(buf[:], true)
+ fmt.Println("Printing goroutine dump of size", n)
+ fmt.Println(string(buf[:n]))
- panic("timed out waiting for quiet...")
+ panic("timed out waiting for quiet...")
+ }
+ ticksRemaining--
}
}
@@ -708,11 +724,7 @@ func setupAgreementWithValidator(t *testing.T, numNodes int, traceLevel traceLev
accounts, balances := createTestAccountsAndBalances(t, numNodes, (&[32]byte{})[:])
baseLedger := ledgerFactory(balances)
- // logging
- log := logging.Base()
- f, _ := os.Create(t.Name() + ".log")
- log.SetJSONFormatter()
- log.SetOutput(f)
+ log := logging.TestingLog(t)
log.SetLevel(logging.Debug)
// node setup
@@ -720,8 +732,8 @@ func setupAgreementWithValidator(t *testing.T, numNodes int, traceLevel traceLev
ledgers := make([]Ledger, numNodes)
dbAccessors := make([]db.Accessor, numNodes)
services := make([]*Service, numNodes)
- baseNetwork := makeTestingNetwork(numNodes, bufCap, validator)
- am := makeActivityMonitor()
+ baseNetwork := makeTestingNetwork(t, numNodes, bufCap, validator)
+ am := makeActivityMonitor(t)
for i := 0; i < numNodes; i++ {
accessor, err := db.MakeAccessor(t.Name()+"_"+strconv.Itoa(i)+"_crash.db", false, true)
@@ -747,7 +759,7 @@ func setupAgreementWithValidator(t *testing.T, numNodes int, traceLevel traceLev
BlockFactory: testBlockFactory{Owner: i},
Clock: clocks[i],
Accessor: accessor,
- Local: config.Local{CadaverSizeTarget: 10000000},
+ Local: config.Local{CadaverSizeTarget: 10000000, SpeculativeAsmTimeOffset: 9999999999},
RandomSource: &testingRand{},
}
@@ -809,12 +821,11 @@ func (m *coserviceMonitor) clearClock() {
}
}
-func expectNewPeriod(clocks []timers.Clock, zeroes uint) (newzeroes uint) {
+func expectNewPeriod(t *testing.T, clocks []timers.Clock, zeroes uint) (newzeroes uint) {
zeroes++
for i := range clocks {
if clocks[i].(*testingClock).zeroes != zeroes {
- errstr := fmt.Sprintf("unexpected number of zeroes: %v != %v", clocks[i].(*testingClock).zeroes, zeroes)
- panic(errstr)
+ require.Equal(t, zeroes, clocks[i].(*testingClock).zeroes, "clocks[%d] unexpected number of zeroes %v != %v", i, zeroes, clocks[i].(*testingClock).zeroes)
}
}
return zeroes
@@ -841,9 +852,10 @@ func triggerGlobalTimeout(d time.Duration, clocks []timers.Clock, activityMonito
activityMonitor.waitForQuiet()
}
-func runRound(clocks []timers.Clock, activityMonitor *activityMonitor, zeroes uint, filterTimeout time.Duration) (newzeroes uint) {
+func runRound(t *testing.T, clocks []timers.Clock, activityMonitor *activityMonitor, zeroes uint, filterTimeout time.Duration) (newzeroes uint) {
triggerGlobalTimeout(filterTimeout, clocks, activityMonitor)
- return expectNewPeriod(clocks, zeroes)
+ t.Log("runRound a")
+ return expectNewPeriod(t, clocks, zeroes)
}
func sanityCheck(startRound round, numRounds round, ledgers []Ledger) {
@@ -882,16 +894,21 @@ func simulateAgreementWithLedgerFactory(t *testing.T, numNodes int, numRounds in
for i := 0; i < numNodes; i++ {
services[i].Start()
}
+ t.Logf("%d nodes started", numNodes)
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ t.Log("all quiet")
+ zeroes := expectNewPeriod(t, clocks, 0)
+ t.Logf("runRound [%d]clocks", len(clocks))
// run round with round-specific consensus version first (since fix in #1896)
version, _ := baseLedger.ConsensusVersion(ParamsRound(startRound))
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ t.Logf("zeroes %d", zeroes)
for j := 1; j < numRounds; j++ {
+ t.Logf("test round j=%d", j)
version, _ := baseLedger.ConsensusVersion(ParamsRound(baseLedger.NextRound() + basics.Round(j-1)))
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
@@ -1027,11 +1044,11 @@ func TestAgreementFastRecoveryDownEarly(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// force fast partition recovery into bottom
@@ -1049,19 +1066,19 @@ func TestAgreementFastRecoveryDownEarly(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(firstFPR, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// terminate on period 1
{
baseNetwork.repairAll()
triggerGlobalTimeout(FilterTimeout(1, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
@@ -1085,11 +1102,11 @@ func TestAgreementFastRecoveryDownMiss(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// force fast partition recovery into bottom
@@ -1130,19 +1147,19 @@ func TestAgreementFastRecoveryDownMiss(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(secondFPR, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// terminate on period 1
{
baseNetwork.repairAll()
triggerGlobalTimeout(FilterTimeout(1, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
@@ -1166,11 +1183,11 @@ func TestAgreementFastRecoveryLate(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// force fast partition recovery into value
@@ -1232,14 +1249,14 @@ func TestAgreementFastRecoveryLate(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(secondFPR, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// terminate on period 1
{
baseNetwork.repairAll()
triggerGlobalTimeout(FilterTimeout(1, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
for _, l := range ledgers {
@@ -1255,7 +1272,7 @@ func TestAgreementFastRecoveryLate(t *testing.T) {
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
@@ -1279,11 +1296,11 @@ func TestAgreementFastRecoveryRedo(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// force fast partition recovery into value
@@ -1345,7 +1362,7 @@ func TestAgreementFastRecoveryRedo(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(secondFPR, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// fail period 1 with value again
@@ -1386,14 +1403,14 @@ func TestAgreementFastRecoveryRedo(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(secondFPR, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// terminate on period 2
{
baseNetwork.repairAll()
triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
for _, l := range ledgers {
@@ -1409,7 +1426,7 @@ func TestAgreementFastRecoveryRedo(t *testing.T) {
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
@@ -1433,11 +1450,11 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// fail period 0
@@ -1447,7 +1464,7 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// fail period 1 on bottom with block
@@ -1456,19 +1473,19 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// terminate on period 2
{
baseNetwork.repairAll()
triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
@@ -1492,11 +1509,11 @@ func TestAgreementLateCertBug(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// delay minority cert votes to force period 1
@@ -1509,7 +1526,7 @@ func TestAgreementLateCertBug(t *testing.T) {
baseNetwork.repairAll()
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// terminate on period 0 in period 1
@@ -1521,12 +1538,12 @@ func TestAgreementLateCertBug(t *testing.T) {
baseNetwork.finishAllMulticast()
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
@@ -1550,11 +1567,11 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// force partition recovery into value
@@ -1585,7 +1602,7 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) {
}
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
require.Equal(t, 4, int(zeroes))
}
@@ -1612,7 +1629,7 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) {
}
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
require.Equal(t, 5, int(zeroes))
}
@@ -1621,13 +1638,13 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) {
{
baseNetwork.repairAll()
triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
require.Equal(t, 6, int(zeroes))
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
services[i].Shutdown()
@@ -1650,11 +1667,11 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// force partition recovery into value.
@@ -1690,7 +1707,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
return params
})
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
require.Equal(t, 4, int(zeroes))
}
@@ -1716,7 +1733,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
}
}
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
@@ -1724,13 +1741,13 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
{
baseNetwork.repairAll()
triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
require.Equal(t, 6, int(zeroes))
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
services[i].Shutdown()
@@ -1753,11 +1770,11 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// force partition recovery into both bottom and value. one node enters bottom, the rest enter value
@@ -1815,7 +1832,7 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) {
lower, upper := (next + 1).nextVoteRanges()
delta := time.Duration(testingRand{}.Uint64() % uint64(upper-lower))
triggerGlobalTimeout(lower+delta, clocks[1:], activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
require.Equal(t, 4, int(zeroes))
}
@@ -1842,20 +1859,20 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) {
}
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// Finish in period 2
{
baseNetwork.repairAll()
triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
require.Equal(t, 6, int(zeroes))
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
services[i].Shutdown()
@@ -1878,11 +1895,11 @@ func TestAgreementSlowPayloadsPreDeadline(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// run round and then start pocketing payloads
@@ -1890,7 +1907,7 @@ func TestAgreementSlowPayloadsPreDeadline(t *testing.T) {
closeFn := baseNetwork.pocketAllCompound(pocket) // (takes effect next round)
{
triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// run round with late payload
@@ -1908,12 +1925,12 @@ func TestAgreementSlowPayloadsPreDeadline(t *testing.T) {
baseNetwork.finishAllMulticast()
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
services[i].Shutdown()
@@ -1936,11 +1953,11 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// run round and then start pocketing payloads
@@ -1948,7 +1965,7 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) {
closeFn := baseNetwork.pocketAllCompound(pocket) // (takes effect next round)
{
triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// force network into period 1 by delaying proposals
@@ -1956,7 +1973,7 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// recover in period 1
@@ -1973,12 +1990,12 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(FilterTimeout(1, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
services[i].Shutdown()
@@ -2001,11 +2018,11 @@ func TestAgreementLargePeriods(t *testing.T) {
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// partition the network, run until period 60
@@ -2017,7 +2034,7 @@ func TestAgreementLargePeriods(t *testing.T) {
baseNetwork.repairAll()
triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
require.Equal(t, 4+p, int(zeroes))
}
}
@@ -2025,12 +2042,12 @@ func TestAgreementLargePeriods(t *testing.T) {
// terminate
{
triggerGlobalTimeout(FilterTimeout(60, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// run two more rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
for i := 0; i < numNodes; i++ {
services[i].Shutdown()
@@ -2099,11 +2116,11 @@ func TestAgreementRegression_WrongPeriodPayloadVerificationCancellation_8ba23942
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
for j := 0; j < 2; j++ {
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
}
// run round and then start pocketing payloads, suspending validation
@@ -2112,7 +2129,7 @@ func TestAgreementRegression_WrongPeriodPayloadVerificationCancellation_8ba23942
closeFn := baseNetwork.pocketAllCompound(pocket0) // (takes effect next round)
{
triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
}
// force network into period 1 by failing period 0, entering with bottom and no soft threshold (to prevent proposal value pinning)
@@ -2205,15 +2222,15 @@ func TestAgreementRegression_WrongPeriodPayloadVerificationCancellation_8ba23942
}
baseNetwork.finishAllMulticast()
- zeroes = expectNewPeriod(clocks, zeroes)
+ zeroes = expectNewPeriod(t, clocks, zeroes)
activityMonitor.waitForQuiet()
// run two more rounds
//for j := 0; j < 2; j++ {
- // zeroes = runRound(clocks, activityMonitor, zeroes, period(1-j))
+ // zeroes = runRound(t, clocks, activityMonitor, zeroes, period(1-j))
//}
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(1, version))
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(1, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
for i := 0; i < numNodes; i++ {
services[i].Shutdown()
@@ -2256,9 +2273,9 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) {
}
activityMonitor.waitForActivity()
activityMonitor.waitForQuiet()
- zeroes := expectNewPeriod(clocks, 0)
+ zeroes := expectNewPeriod(t, clocks, 0)
// run two rounds
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
// make sure relay does not see block proposal for round 3
baseNetwork.intercept(func(params multicastParams) multicastParams {
if params.tag == protocol.ProposalPayloadTag {
@@ -2289,7 +2306,7 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) {
return params
})
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version))
// Round 3:
// First partition the relay to prevent it from seeing certificate or block
@@ -2313,7 +2330,7 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) {
})
// And with some hypothetical second relay the network achieves consensus on a certificate and block.
triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor)
- zeroes = expectNewPeriod(clocks[1:], zeroes)
+ zeroes = expectNewPeriod(t, clocks[1:], zeroes)
require.Equal(t, uint(3), clocks[0].(*testingClock).zeroes)
close(pocketCert)
@@ -2332,7 +2349,7 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) {
// this relay must still relay initial messages. Note that payloads were already relayed with
// the previous global timeout.
triggerGlobalTimeout(FilterTimeout(0, version), clocks[1:], activityMonitor)
- zeroes = expectNewPeriod(clocks[1:], zeroes)
+ zeroes = expectNewPeriod(t, clocks[1:], zeroes)
require.Equal(t, uint(3), clocks[0].(*testingClock).zeroes)
for i := 0; i < numNodes; i++ {
diff --git a/agreement/types.go b/agreement/types.go
index bc2decb7d0..eae8942b56 100644
--- a/agreement/types.go
+++ b/agreement/types.go
@@ -38,6 +38,15 @@ func FilterTimeout(p period, v protocol.ConsensusVersion) time.Duration {
return config.Consensus[v].AgreementFilterTimeout
}
+// SpeculativeBlockAsmTime is the time at which we would like to begin speculative assembly
+func SpeculativeBlockAsmTime(p period, v protocol.ConsensusVersion, speculativeAsmTimeOffset time.Duration) time.Duration {
+ hardwait := FilterTimeout(p, v)
+ if hardwait > speculativeAsmTimeOffset {
+ return hardwait - speculativeAsmTimeOffset
+ }
+ return time.Duration(0)
+}
+
// DeadlineTimeout is the duration of the second agreement step.
func DeadlineTimeout() time.Duration {
return deadlineTimeout
diff --git a/config/localTemplate.go b/config/localTemplate.go
index f13fc97f1f..672a55abd4 100644
--- a/config/localTemplate.go
+++ b/config/localTemplate.go
@@ -517,6 +517,9 @@ type Local struct {
// only relevant if TxIncomingFilteringFlags is non-zero
TxIncomingFilterMaxSize uint64 `version[28]:"500000"`
+ SpeculativeAsmTimeOffset time.Duration `version[30]:"400000000"`
+ SpeculativeAssemblyDisable bool `version[30]:"false"`
+
// BlockServiceMemCap is the memory capacity in bytes which is allowed for the block service to use for HTTP block requests.
// When it exceeds this capacity, it redirects the block requests to a different node
BlockServiceMemCap uint64 `version[28]:"500000000"`
diff --git a/config/local_defaults.go b/config/local_defaults.go
index a3c55c1e1b..b92833daf7 100644
--- a/config/local_defaults.go
+++ b/config/local_defaults.go
@@ -122,6 +122,8 @@ var defaultLocal = Local{
RestReadTimeoutSeconds: 15,
RestWriteTimeoutSeconds: 120,
RunHosted: false,
+ SpeculativeAsmTimeOffset: 0,
+ SpeculativeAssemblyDisable: false,
StorageEngine: "sqlite",
SuggestedFeeBlockHistory: 3,
SuggestedFeeSlidingWindowSize: 50,
diff --git a/data/datatest/impls.go b/data/datatest/impls.go
index fefcb054da..6b5ce99b06 100644
--- a/data/datatest/impls.go
+++ b/data/datatest/impls.go
@@ -64,6 +64,9 @@ func (i entryFactoryImpl) AssembleBlock(round basics.Round) (agreement.Validated
return validatedBlock{blk: &b}, nil
}
+func (i entryFactoryImpl) StartSpeculativeBlockAssembly(context.Context, agreement.ValidatedBlock, crypto.Digest, bool) {
+}
+
// WithSeed implements the agreement.ValidatedBlock interface.
func (ve validatedBlock) WithSeed(s committee.Seed) agreement.ValidatedBlock {
newblock := ve.blk.WithSeed(s)
diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go
index a03baea4f0..b6d0188664 100644
--- a/data/pools/transactionPool.go
+++ b/data/pools/transactionPool.go
@@ -17,6 +17,7 @@
package pools
import (
+ "context"
"errors"
"fmt"
"sync"
@@ -26,6 +27,7 @@ import (
"github.com/algorand/go-deadlock"
"github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
@@ -35,8 +37,11 @@ import (
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/condvar"
+ "github.com/algorand/go-algorand/util/metrics"
)
+var speculativeAssemblyDiscarded = metrics.NewTagCounter("algod_speculative_assembly_discarded", "started speculative block assembly but a different block won later")
+
// A TransactionPool prepares valid blocks for proposal and caches
// validated transaction groups.
//
@@ -49,8 +54,9 @@ import (
// TransactionPool.AssembleBlock constructs a valid block for
// proposal given a deadline.
type TransactionPool struct {
- // feePerByte is stored at the beginning of this struct to ensure it has a 64 bit aligned address. This is needed as it's being used
- // with atomic operations which require 64 bit alignment on arm.
+ // feePerByte is stored at the beginning of this struct to ensure it has a
+ // 64 bit aligned address. This is needed as it's being used with atomic
+ // operations which require 64 bit alignment on arm.
feePerByte uint64
// const
@@ -58,7 +64,7 @@ type TransactionPool struct {
logAssembleStats bool
expFeeFactor uint64
txPoolMaxSize int
- ledger *ledger.Ledger
+ ledger ledger.LedgerForEvaluator
mu deadlock.Mutex
cond sync.Cond
@@ -93,6 +99,22 @@ type TransactionPool struct {
// proposalAssemblyTime is the ProposalAssemblyTime configured for this node.
proposalAssemblyTime time.Duration
+ ctx context.Context
+
+ // specBlockMu protects speculative block assembly vars below:
+ specBlockMu deadlock.Mutex
+ specActive bool
+ // specBlockDigest ValidatedBlock.Block().Digest()
+ specBlockDigest crypto.Digest
+ cancelSpeculativeAssembly context.CancelFunc
+ speculativePool *TransactionPool
+ // specBlockCh has an assembled speculative block
+ //specBlockCh chan *ledgercore.ValidatedBlock
+ // specAsmDone channel is closed when there is no speculative assembly
+ //specAsmDone <-chan struct{}
+ // TODO: feed into assemblyMu & assemblyCond above!
+
+ cfg config.Local
// stateproofOverflowed indicates that a stateproof transaction was allowed to
// exceed the txPoolMaxSize. This flag is reset to false OnNewBlock
stateproofOverflowed bool
@@ -114,6 +136,7 @@ func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Lo
if cfg.TxPoolExponentialIncreaseFactor < 1 {
cfg.TxPoolExponentialIncreaseFactor = 1
}
+
pool := TransactionPool{
pendingTxids: make(map[transactions.Txid]transactions.SignedTxn),
rememberedTxids: make(map[transactions.Txid]transactions.SignedTxn),
@@ -126,15 +149,57 @@ func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Lo
txPoolMaxSize: cfg.TxPoolSize,
proposalAssemblyTime: cfg.ProposalAssemblyTime,
log: log,
+ cfg: cfg,
+ ctx: context.Background(),
}
pool.cond.L = &pool.mu
pool.assemblyCond.L = &pool.assemblyMu
- pool.recomputeBlockEvaluator(nil, 0)
+ pool.recomputeBlockEvaluator(nil, 0, false)
return &pool
}
-// poolAsmResults is used to syncronize the state of the block assembly process. The structure reading/writing is syncronized
-// via the pool.assemblyMu lock.
+// TODO: this needs a careful read for lock/shallow-copy issues
+func (pool *TransactionPool) copyTransactionPoolOverSpecLedger(ctx context.Context, vb *ledgercore.ValidatedBlock) (*TransactionPool, context.CancelFunc, error) {
+ specLedger, err := ledger.MakeValidatedBlockAsLFE(vb, pool.ledger)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ copyPoolctx, cancel := context.WithCancel(ctx)
+
+ copy := TransactionPool{
+ pendingTxids: make(map[transactions.Txid]transactions.SignedTxn), // pendingTxIds is only used for stats and hints
+ pendingTxGroups: pool.pendingTxGroups[:],
+ rememberedTxids: make(map[transactions.Txid]transactions.SignedTxn),
+ expiredTxCount: make(map[basics.Round]int),
+ ledger: specLedger,
+ statusCache: makeStatusCache(pool.cfg.TxPoolSize),
+ logProcessBlockStats: pool.cfg.EnableProcessBlockStats,
+ logAssembleStats: pool.cfg.EnableAssembleStats,
+ expFeeFactor: pool.cfg.TxPoolExponentialIncreaseFactor,
+ txPoolMaxSize: pool.cfg.TxPoolSize,
+ proposalAssemblyTime: pool.cfg.ProposalAssemblyTime,
+ assemblyRound: specLedger.Latest() + 1,
+ log: pool.log,
+ cfg: pool.cfg,
+ ctx: copyPoolctx,
+ }
+ // TODO: make an 'assembly context struct' with a subset of TransactionPool fields?
+ copy.cond.L = ©.mu
+ copy.assemblyCond.L = ©.assemblyMu
+
+ //pool.cancelSpeculativeAssembly = cancel
+
+ // specBlockCh := make(chan *ledgercore.ValidatedBlock, 1)
+ // pool.specBlockMu.Lock()
+ // pool.specBlockCh = specBlockCh
+ // pool.specBlockMu.Unlock()
+
+ return ©, cancel, nil
+}
+
+// poolAsmResults is used to syncronize the state of the block assembly process.
+// The structure reading/writing is syncronized via the pool.assemblyMu lock.
type poolAsmResults struct {
// the ok variable indicates whether the assembly for the block roundStartedEvaluating was complete ( i.e. ok == true ) or
// whether it's still in-progress.
@@ -142,11 +207,13 @@ type poolAsmResults struct {
blk *ledgercore.ValidatedBlock
stats telemetryspec.AssembleBlockMetrics
err error
- // roundStartedEvaluating is the round which we were attempted to evaluate last. It's a good measure for
- // which round we started evaluating, but not a measure to whether the evaluation is complete.
+ // roundStartedEvaluating is the round which we were attempted to evaluate
+ // last. It's a good measure for which round we started evaluating, but not
+ // a measure to whether the evaluation is complete.
roundStartedEvaluating basics.Round
- // assemblyCompletedOrAbandoned is *not* protected via the pool.assemblyMu lock and should be accessed only from the OnNewBlock goroutine.
- // it's equivalent to the "ok" variable, and used for avoiding taking the lock.
+ // assemblyCompletedOrAbandoned is *not* protected via the pool.assemblyMu
+ // lock and should be accessed only from the OnNewBlock goroutine. it's
+ // equivalent to the "ok" variable, and used for avoiding taking the lock.
assemblyCompletedOrAbandoned bool
}
@@ -181,7 +248,14 @@ func (pool *TransactionPool) Reset() {
pool.numPendingWholeBlocks = 0
pool.pendingBlockEvaluator = nil
pool.statusCache.reset()
- pool.recomputeBlockEvaluator(nil, 0)
+ pool.recomputeBlockEvaluator(nil, 0, false)
+
+ // cancel speculative assembly and clear its result
+ pool.specBlockMu.Lock()
+ if pool.cancelSpeculativeAssembly != nil {
+ pool.cancelSpeculativeAssembly()
+ }
+ pool.specBlockMu.Unlock()
}
// NumExpired returns the number of transactions that expired at the
@@ -337,7 +411,7 @@ func (pool *TransactionPool) computeFeePerByte() uint64 {
return feePerByte
}
-// checkSufficientFee take a set of signed transactions and verifies that each transaction has
+// checkSufficientFee takes a set of signed transactions and verifies that each transaction has
// sufficient fee to get into the transaction pool
func (pool *TransactionPool) checkSufficientFee(txgroup []transactions.SignedTxn) error {
// Special case: the state proof transaction, if issued from the
@@ -395,17 +469,18 @@ func (pool *TransactionPool) remember(txgroup []transactions.SignedTxn) error {
params := poolIngestParams{
recomputing: false,
}
- return pool.ingest(txgroup, params)
+ _, err := pool.ingest(txgroup, params, false)
+ return err
}
// add tries to add the transaction group to the pool, bypassing the fee
// priority checks.
-func (pool *TransactionPool) add(txgroup []transactions.SignedTxn, stats *telemetryspec.AssembleBlockMetrics) error {
+func (pool *TransactionPool) add(txgroup []transactions.SignedTxn, stats *telemetryspec.AssembleBlockMetrics, stopAtFirstFullBlock bool) (stoppedAtBlock bool, err error) {
params := poolIngestParams{
recomputing: true,
stats: stats,
}
- return pool.ingest(txgroup, params)
+ return pool.ingest(txgroup, params, stopAtFirstFullBlock)
}
// ingest checks whether a transaction group could be remembered in the pool,
@@ -413,9 +488,9 @@ func (pool *TransactionPool) add(txgroup []transactions.SignedTxn, stats *teleme
//
// ingest assumes that pool.mu is locked. It might release the lock
// while it waits for OnNewBlock() to be called.
-func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poolIngestParams) error {
+func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poolIngestParams, stopAtFirstFullBlock bool) (stoppedAtBlock bool, err error) {
if pool.pendingBlockEvaluator == nil {
- return ErrNoPendingBlockEvaluator
+ return false, ErrNoPendingBlockEvaluator
}
if !params.recomputing {
@@ -427,26 +502,26 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo
for pool.pendingBlockEvaluator.Round() <= latest && time.Now().Before(waitExpires) {
condvar.TimedWait(&pool.cond, timeoutOnNewBlock)
if pool.pendingBlockEvaluator == nil {
- return ErrNoPendingBlockEvaluator
+ return false, ErrNoPendingBlockEvaluator
}
}
- err := pool.checkSufficientFee(txgroup)
+ err = pool.checkSufficientFee(txgroup)
if err != nil {
- return err
+ return false, err
}
}
- err := pool.addToPendingBlockEvaluator(txgroup, params.recomputing, params.stats)
+ stoppedAtBlock, err = pool.addToPendingBlockEvaluator(txgroup, params.recomputing, params.stats, stopAtFirstFullBlock)
if err != nil {
- return err
+ return false, err
}
pool.rememberedTxGroups = append(pool.rememberedTxGroups, txgroup)
for _, t := range txgroup {
pool.rememberedTxids[t.ID()] = t
}
- return nil
+ return stoppedAtBlock, nil
}
// RememberOne stores the provided transaction.
@@ -496,8 +571,101 @@ func (pool *TransactionPool) Lookup(txid transactions.Txid) (tx transactions.Sig
return pool.statusCache.check(txid)
}
-// OnNewBlock excises transactions from the pool that are included in the specified Block or if they've expired
+// StartSpeculativeBlockAssembly handles creating a speculative block
+func (pool *TransactionPool) StartSpeculativeBlockAssembly(ctx context.Context, vb *ledgercore.ValidatedBlock, blockHash crypto.Digest, onlyIfStarted bool) {
+
+ if pool.cfg.SpeculativeAssemblyDisable {
+ return
+ }
+
+ if blockHash.IsZero() {
+ // if we don't already have a block hash, calculate it now
+ blockHash = vb.Block().Digest()
+ }
+
+ pool.specBlockMu.Lock()
+ defer pool.specBlockMu.Unlock()
+ if pool.specActive {
+ if blockHash == pool.specBlockDigest {
+ pool.log.Infof("StartSpeculativeBlockAssembly %s already running", blockHash.String())
+ return
+ }
+ // cancel prior speculative block assembly based on different block
+ speculativeAssemblyDiscarded.Add("start", 1)
+ pool.cancelSpeculativeAssembly()
+ pool.specActive = false
+ } else if onlyIfStarted {
+ // not already started, don't start one
+ return
+ }
+ pool.log.Infof("StartSpeculativeBlockAssembly %s", blockHash.String())
+ pool.specActive = true
+ pool.specBlockDigest = blockHash
+
+ pool.mu.Lock()
+
+ // move remembered txns to pending
+ pool.rememberCommit(false)
+
+ // create shallow pool copy, close the done channel when we're done with
+ // speculative block assembly.
+ speculativePool, cancel, err := pool.copyTransactionPoolOverSpecLedger(ctx, vb)
+ pool.cancelSpeculativeAssembly = cancel
+ pool.speculativePool = speculativePool
+
+ pool.mu.Unlock()
+
+ if err != nil {
+ pool.specActive = false
+ pool.log.Warnf("StartSpeculativeBlockAssembly: %v", err)
+ return
+ }
+
+ // process txns only until one block is full
+ // action on subordinate pool continues asynchronously, to be picked up in tryReadSpeculativeBlock
+ go speculativePool.onNewBlock(vb.Block(), vb.Delta(), true)
+}
+
+func (pool *TransactionPool) tryReadSpeculativeBlock(branch bookkeeping.BlockHash, round basics.Round, deadline time.Time, stats *telemetryspec.AssembleBlockMetrics) (*ledgercore.ValidatedBlock, error) {
+ // assumes pool.assemblyMu is held
+ pool.specBlockMu.Lock()
+
+ if pool.specActive {
+ if pool.specBlockDigest == crypto.Digest(branch) {
+ pool.log.Infof("update speculative deadline: %s", deadline.String())
+ specPool := pool.speculativePool
+ pool.specBlockMu.Unlock()
+ // TODO: is continuing to hold outer pool.assemblyMu here a bad thing?
+ specPool.assemblyMu.Lock()
+ assembled, err := specPool.waitForBlockAssembly(round, deadline, stats)
+ specPool.assemblyMu.Unlock()
+ return assembled, err
+ }
+ speculativeAssemblyDiscarded.Add("read", 1)
+ pool.cancelSpeculativeAssembly()
+ pool.specActive = false
+ }
+ pool.specBlockMu.Unlock()
+ // nope, nothing
+ return nil, nil
+}
+
+// OnNewBlock callback calls the internal implementation, onNewBlock with the “false” parameter to process all transactions.
func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta) {
+ pool.specBlockMu.Lock()
+ if pool.specActive && block.Digest() != pool.specBlockDigest {
+ speculativeAssemblyDiscarded.Add("ONB", 1)
+ pool.cancelSpeculativeAssembly()
+ pool.specActive = false
+ // cancel speculative assembly, fall through to starting normal assembly
+ }
+ // TODO: make core onNewBlock() _wait_ until speculative block is done, merge state from that result
+ pool.specBlockMu.Unlock()
+ pool.onNewBlock(block, delta, false)
+}
+
+// onNewBlock excises transactions from the pool that are included in the specified Block or if they've expired
+func (pool *TransactionPool) onNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta, stopReprocessingAtFirstAsmBlock bool) {
var stats telemetryspec.ProcessBlockMetrics
var knownCommitted uint
var unknownCommitted uint
@@ -518,6 +686,7 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor
pool.mu.Lock()
defer pool.mu.Unlock()
defer pool.cond.Broadcast()
+
if pool.pendingBlockEvaluator == nil || block.Round() >= pool.pendingBlockEvaluator.Round() {
// Adjust the pool fee threshold. The rules are:
// - If there was less than one full block in the pool, reduce
@@ -545,7 +714,7 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor
// Recompute the pool by starting from the new latest block.
// This has the side-effect of discarding transactions that
// have been committed (or that are otherwise no longer valid).
- stats = pool.recomputeBlockEvaluator(committedTxids, knownCommitted)
+ stats = pool.recomputeBlockEvaluator(committedTxids, knownCommitted, stopReprocessingAtFirstAsmBlock)
}
stats.KnownCommittedCount = knownCommitted
@@ -564,10 +733,13 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor
}
}
-// isAssemblyTimedOut determines if we should keep attempting complete the block assembly by adding more transactions to the pending evaluator,
-// or whether we've ran out of time. It takes into consideration the assemblyDeadline that was set by the AssembleBlock function as well as the
-// projected time it's going to take to call the GenerateBlock function before the block assembly would be ready.
-// The function expects that the pool.assemblyMu lock would be taken before being called.
+// isAssemblyTimedOut determines if we should keep attempting complete the block
+// assembly by adding more transactions to the pending evaluator, or whether
+// we've ran out of time. It takes into consideration the assemblyDeadline that
+// was set by the AssembleBlock function as well as the projected time it's
+// going to take to call the GenerateBlock function before the block assembly
+// would be ready. The function expects that the pool.assemblyMu lock would be
+// taken before being called.
func (pool *TransactionPool) isAssemblyTimedOut() bool {
if pool.assemblyDeadline.IsZero() {
// we have no deadline, so no reason to timeout.
@@ -605,9 +777,12 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio
pool.assemblyMu.Lock()
defer pool.assemblyMu.Unlock()
if pool.assemblyRound > pool.pendingBlockEvaluator.Round() {
- // the block we're assembling now isn't the one the the AssembleBlock is waiting for. While it would be really cool
- // to finish generating the block, it would also be pointless to spend time on it.
- // we're going to set the ok and assemblyCompletedOrAbandoned to "true" so we can complete this loop asap
+ // the block we're assembling now isn't the one the the
+ // AssembleBlock is waiting for. While it would be really cool
+ // to finish generating the block, it would also be pointless to
+ // spend time on it. we're going to set the ok and
+ // assemblyCompletedOrAbandoned to "true" so we can complete
+ // this loop asap
pool.assemblyResults.ok = true
pool.assemblyResults.assemblyCompletedOrAbandoned = true
stats.StopReason = telemetryspec.AssembleBlockAbandon
@@ -643,20 +818,23 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio
return err
}
-func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup []transactions.SignedTxn, recomputing bool, stats *telemetryspec.AssembleBlockMetrics) error {
- err := pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats)
+func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup []transactions.SignedTxn, recomputing bool, stats *telemetryspec.AssembleBlockMetrics, addUntilFirstBlockFull bool) (shouldContinue bool, err error) {
+ err = pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats)
if err == ledgercore.ErrNoSpace {
pool.numPendingWholeBlocks++
+ if addUntilFirstBlockFull {
+ return true, nil
+ }
pool.pendingBlockEvaluator.ResetTxnBytes()
err = pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats)
}
- return err
+ return false, err
}
// recomputeBlockEvaluator constructs a new BlockEvaluator and feeds all
// in-pool transactions to it (removing any transactions that are rejected
// by the BlockEvaluator). Expects that the pool.mu mutex would be already taken.
-func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transactions.Txid]ledgercore.IncludedTransactions, knownCommitted uint) (stats telemetryspec.ProcessBlockMetrics) {
+func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transactions.Txid]ledgercore.IncludedTransactions, knownCommitted uint, stopAtFirstBlock bool) (stats telemetryspec.ProcessBlockMetrics) {
pool.pendingBlockEvaluator = nil
latest := pool.ledger.Latest()
@@ -700,6 +878,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
if hint < 0 || int(knownCommitted) < 0 {
hint = 0
}
+
pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint, 0, nil)
if err != nil {
// The pendingBlockEvaluator is an interface, and in case of an evaluator error
@@ -725,6 +904,12 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
// Feed the transactions in order
for _, txgroup := range txgroups {
+ select {
+ case <-pool.ctx.Done():
+ return
+ default: // continue processing transactions
+ }
+
if len(txgroup) == 0 {
asmStats.InvalidCount++
continue
@@ -733,7 +918,8 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
asmStats.EarlyCommittedCount++
continue
}
- err := pool.add(txgroup, &asmStats)
+
+ hasBlock, err := pool.add(txgroup, &asmStats, stopAtFirstBlock)
if err != nil {
for _, tx := range txgroup {
pool.statusCache.put(tx, err.Error())
@@ -753,16 +939,18 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
case *ledgercore.LeaseInLedgerError:
asmStats.LeaseErrorCount++
stats.RemovedInvalidCount++
- pool.log.Infof("Cannot re-add pending transaction to pool: %v", err)
+ pool.log.Infof("Cannot re-add pending transaction to pool (lease): %v", err)
case *transactions.MinFeeError:
asmStats.MinFeeErrorCount++
stats.RemovedInvalidCount++
- pool.log.Infof("Cannot re-add pending transaction to pool: %v", err)
+ pool.log.Infof("Cannot re-add pending transaction to pool (fee): %v", err)
default:
asmStats.InvalidCount++
stats.RemovedInvalidCount++
pool.log.Warnf("Cannot re-add pending transaction to pool: %v", err)
}
+ } else if hasBlock {
+ break
}
}
@@ -791,7 +979,10 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
}
pool.assemblyMu.Unlock()
- pool.rememberCommit(true)
+ // remember the changes made to the tx pool if we're not in speculative assembly
+ if !stopAtFirstBlock {
+ pool.rememberCommit(true)
+ }
return
}
@@ -888,14 +1079,33 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim
defer pool.assemblyMu.Unlock()
if pool.assemblyResults.roundStartedEvaluating > round {
- // we've already assembled a round in the future. Since we're clearly won't go backward, it means
- // that the agreement is far behind us, so we're going to return here with error code to let
- // the agreement know about it.
- // since the network is already ahead of us, there is no issue here in not generating a block ( since the block would get discarded anyway )
+ // we've already assembled a round in the future. Since we're clearly
+ // won't go backward, it means that the agreement is far behind us, so
+ // we're going to return here with error code to let the agreement know
+ // about it. since the network is already ahead of us, there is no issue
+ // here in not generating a block ( since the block would get discarded
+ // anyway )
pool.log.Infof("AssembleBlock: requested round is behind transaction pool round %d < %d", round, pool.assemblyResults.roundStartedEvaluating)
return nil, ErrStaleBlockAssemblyRequest
}
+ // Maybe we have a block already assembled
+ // TODO: this needs to be changed if a speculative block is in flight and if it is valid. If it is not valid, cancel it, if it is valid, wait for it.
+ prev, err := pool.ledger.Block(round.SubSaturate(1))
+ if err == nil {
+ specBlock, specErr := pool.tryReadSpeculativeBlock(prev.Hash(), round, deadline, &stats)
+ if specBlock != nil || specErr != nil {
+ pool.log.Infof("got spec block for %s, specErr %v", prev.Hash().String(), specErr)
+ return specBlock, specErr
+ }
+ }
+
+ assembled, err = pool.waitForBlockAssembly(round, deadline, &stats)
+ return
+}
+
+// should be called with assemblyMu held
+func (pool *TransactionPool) waitForBlockAssembly(round basics.Round, deadline time.Time, stats *telemetryspec.AssembleBlockMetrics) (assembled *ledgercore.ValidatedBlock, err error) {
pool.assemblyDeadline = deadline
pool.assemblyRound = round
for time.Now().Before(deadline) && (!pool.assemblyResults.ok || pool.assemblyResults.roundStartedEvaluating != round) {
@@ -903,9 +1113,11 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim
}
if !pool.assemblyResults.ok {
- // we've passed the deadline, so we're either going to have a partial block, or that we won't make it on time.
- // start preparing an empty block in case we'll miss the extra time (assemblyWaitEps).
- // the assembleEmptyBlock is using the database, so we want to unlock here and take the lock again later on.
+ // we've passed the deadline, so we're either going to have a partial
+ // block, or that we won't make it on time. start preparing an empty
+ // block in case we'll miss the extra time (assemblyWaitEps). the
+ // assembleEmptyBlock is using the database, so we want to unlock here
+ // and take the lock again later on.
pool.assemblyMu.Unlock()
emptyBlock, emptyBlockErr := pool.assembleEmptyBlock(round)
pool.assemblyMu.Lock()
@@ -954,12 +1166,13 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim
pool.assemblyResults.roundStartedEvaluating, round)
}
- stats = pool.assemblyResults.stats
+ *stats = pool.assemblyResults.stats
return pool.assemblyResults.blk, nil
}
-// assembleEmptyBlock construct a new block for the given round. Internally it's using the ledger database calls, so callers
-// need to be aware that it might take a while before it would return.
+// assembleEmptyBlock construct a new block for the given round. Internally it's
+// using the ledger database calls, so callers need to be aware that it might
+// take a while before it would return.
func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *ledgercore.ValidatedBlock, err error) {
prevRound := round - 1
prev, err := pool.ledger.BlockHdr(prevRound)
@@ -973,8 +1186,9 @@ func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *
var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval
if errors.As(err, &nonSeqBlockEval) {
if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound {
- // in the case that the ledger have already moved beyond that round, just let the agreement know that
- // we don't generate a block and it's perfectly fine.
+ // in the case that the ledger have already moved beyond that
+ // round, just let the agreement know that we don't generate a
+ // block and it's perfectly fine.
return nil, ErrStaleBlockAssemblyRequest
}
}
@@ -990,7 +1204,7 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.Valid
defer pool.mu.Unlock()
// drop the current block evaluator and start with a new one.
- pool.recomputeBlockEvaluator(nil, 0)
+ pool.recomputeBlockEvaluator(nil, 0, false)
// The above was already pregenerating the entire block,
// so there won't be any waiting on this call.
diff --git a/data/pools/transactionPool_test.go b/data/pools/transactionPool_test.go
index f03eadf7da..a90a4cfdd8 100644
--- a/data/pools/transactionPool_test.go
+++ b/data/pools/transactionPool_test.go
@@ -19,12 +19,14 @@ package pools
import (
"bufio"
"bytes"
+ "context"
"fmt"
"math/rand"
"os"
"runtime"
"runtime/pprof"
"strings"
+ "sync"
"testing"
"time"
@@ -44,6 +46,7 @@ import (
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/stateproof"
"github.com/algorand/go-algorand/stateproof/verify"
@@ -1198,7 +1201,7 @@ func BenchmarkTransactionPoolRecompute(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
- transactionPool[i].recomputeBlockEvaluator(committedTxIds[i], knownCommitted[i])
+ transactionPool[i].recomputeBlockEvaluator(committedTxIds[i], knownCommitted[i], false)
}
b.StopTimer()
if profF != nil {
@@ -1528,7 +1531,7 @@ func TestStateProofLogging(t *testing.T) {
err = transactionPool.RememberOne(stxn)
require.NoError(t, err)
- transactionPool.recomputeBlockEvaluator(nil, 0)
+ transactionPool.recomputeBlockEvaluator(nil, 0, false)
_, err = transactionPool.AssembleBlock(514, time.Time{})
require.NoError(t, err)
@@ -1612,3 +1615,309 @@ func generateProofForTesting(
return proof
}
+
+func TestSpeculativeBlockAssembly(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ numOfAccounts := 10
+ // Generate accounts
+ secrets := make([]*crypto.SignatureSecrets, numOfAccounts)
+ addresses := make([]basics.Address, numOfAccounts)
+
+ for i := 0; i < numOfAccounts; i++ {
+ secret := keypair()
+ addr := basics.Address(secret.SignatureVerifier)
+ secrets[i] = secret
+ addresses[i] = addr
+ }
+
+ mockLedger := makeMockLedger(t, initAccFixed(addresses, 1<<32))
+ cfg := config.GetDefaultLocal()
+ cfg.TxPoolSize = testPoolSize
+ cfg.EnableProcessBlockStats = false
+ log := logging.TestingLog(t)
+ transactionPool := MakeTransactionPool(mockLedger, cfg, log)
+
+ savedTransactions := 0
+ for i, sender := range addresses {
+ amount := uint64(0)
+ for _, receiver := range addresses {
+ if sender != receiver {
+ tx := transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: sender,
+ Fee: basics.MicroAlgos{Raw: proto.MinTxnFee + amount},
+ FirstValid: 0,
+ LastValid: 10,
+ Note: make([]byte, 0),
+ GenesisHash: mockLedger.GenesisHash(),
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: receiver,
+ Amount: basics.MicroAlgos{Raw: 0},
+ },
+ }
+ amount++
+
+ signedTx := tx.Sign(secrets[i])
+ require.NoError(t, transactionPool.RememberOne(signedTx))
+ savedTransactions++
+ }
+ }
+ }
+ pending := transactionPool.PendingTxGroups()
+ require.Len(t, pending, savedTransactions)
+
+ secret := keypair()
+ recv := basics.Address(secret.SignatureVerifier)
+
+ tx := transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: addresses[0],
+ Fee: basics.MicroAlgos{Raw: proto.MinTxnFee},
+ FirstValid: 0,
+ LastValid: 10,
+ Note: []byte{1},
+ GenesisHash: mockLedger.GenesisHash(),
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: recv,
+ Amount: basics.MicroAlgos{Raw: 0},
+ },
+ }
+ signedTx := tx.Sign(secrets[0])
+
+ blockEval := newBlockEvaluator(t, mockLedger)
+ err := blockEval.Transaction(signedTx, transactions.ApplyData{})
+ require.NoError(t, err)
+
+ // simulate this transaction was applied
+ block, err := blockEval.GenerateBlock()
+ require.NoError(t, err)
+
+ t.Logf("prev block digest %s", block.Block().Digest().String())
+ t.Logf("prev block hash %s", block.Block().Hash().String())
+
+ transactionPool.StartSpeculativeBlockAssembly(context.Background(), block, crypto.Digest{}, false)
+ // TODO(yg): shouldn't we wait for it to finish?
+ //<-transactionPool.specAsmDone
+
+ // add the block
+ err = mockLedger.AddBlock(block.Block(), agreement.Certificate{})
+ require.NoError(t, err)
+
+ // empty tx pool
+ transactionPool.pendingTxids = make(map[transactions.Txid]transactions.SignedTxn)
+ transactionPool.pendingTxGroups = nil
+
+ // check that we still assemble the block
+ specBlock, err := transactionPool.AssembleBlock(block.Block().Round()+1, time.Now().Add(time.Second))
+ require.NoError(t, err)
+ require.Equal(t, specBlock.Block().Branch, block.Block().Hash(), "spec.Branch %s", specBlock.Block().Branch.String())
+ require.NotNil(t, specBlock)
+ require.Len(t, specBlock.Block().Payset, savedTransactions)
+}
+
+func TestSpeculativeBlockAssemblyWithOverlappingBlock(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ numOfAccounts := 10
+ // Generate accounts
+ secrets := make([]*crypto.SignatureSecrets, numOfAccounts)
+ addresses := make([]basics.Address, numOfAccounts)
+
+ for i := 0; i < numOfAccounts; i++ {
+ secret := keypair()
+ addr := basics.Address(secret.SignatureVerifier)
+ secrets[i] = secret
+ addresses[i] = addr
+ }
+
+ mockLedger := makeMockLedger(t, initAccFixed(addresses, 1<<32))
+ cfg := config.GetDefaultLocal()
+ cfg.TxPoolSize = testPoolSize
+ cfg.EnableProcessBlockStats = false
+ transactionPool := MakeTransactionPool(mockLedger, cfg, logging.Base())
+
+ savedTransactions := 0
+ pendingTxn := transactions.SignedTxn{}
+ pendingTxIDSet := make(map[crypto.Signature]bool)
+ for i, sender := range addresses {
+ amount := uint64(0)
+ for _, receiver := range addresses {
+ if sender != receiver {
+ tx := transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: sender,
+ Fee: basics.MicroAlgos{Raw: proto.MinTxnFee + amount},
+ FirstValid: 0,
+ LastValid: 10,
+ Note: make([]byte, 0),
+ GenesisHash: mockLedger.GenesisHash(),
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: receiver,
+ Amount: basics.MicroAlgos{Raw: 0},
+ },
+ }
+ amount++
+
+ pendingTxn = tx.Sign(secrets[i])
+ require.NoError(t, transactionPool.RememberOne(pendingTxn))
+ pendingTxIDSet[pendingTxn.Sig] = true
+ savedTransactions++
+ }
+ }
+ }
+ pending := transactionPool.PendingTxGroups()
+ require.Len(t, pending, savedTransactions)
+ require.Len(t, pendingTxIDSet, savedTransactions)
+
+ blockEval := newBlockEvaluator(t, mockLedger)
+ err := blockEval.Transaction(pendingTxn, transactions.ApplyData{})
+ require.NoError(t, err)
+
+ // simulate this transaction was applied
+ block, err := blockEval.GenerateBlock()
+ require.NoError(t, err)
+
+ transactionPool.StartSpeculativeBlockAssembly(context.Background(), block, crypto.Digest{}, false)
+ //<-transactionPool.specAsmDone
+ var stats telemetryspec.AssembleBlockMetrics
+ specBlock, specErr := transactionPool.tryReadSpeculativeBlock(block.Block().Hash(), block.Block().Round()+1, time.Now().Add(time.Second), &stats)
+ require.NoError(t, specErr)
+ require.NotNil(t, specBlock)
+ // assembled block doesn't have txn in the speculated block
+ require.Len(t, specBlock.Block().Payset, savedTransactions-1)
+
+ // tx pool unaffected
+ require.Len(t, transactionPool.PendingTxIDs(), savedTransactions)
+
+ for _, txn := range specBlock.Block().Payset {
+ require.NotEqual(t, txn.SignedTxn.Sig, pendingTxn.Sig)
+ require.True(t, pendingTxIDSet[txn.SignedTxn.Sig])
+ }
+}
+
+// This test runs the speculative block assembly and adds txns to the pool in another thread
+func TestSpeculativeBlockAssemblyDataRace(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ numOfAccounts := 10
+ // Generate accounts
+ secrets := make([]*crypto.SignatureSecrets, numOfAccounts)
+ addresses := make([]basics.Address, numOfAccounts)
+
+ for i := 0; i < numOfAccounts; i++ {
+ secret := keypair()
+ addr := basics.Address(secret.SignatureVerifier)
+ secrets[i] = secret
+ addresses[i] = addr
+ }
+
+ mockLedger := makeMockLedger(t, initAccFixed(addresses, 1<<32))
+ cfg := config.GetDefaultLocal()
+ cfg.TxPoolSize = testPoolSize
+ cfg.EnableProcessBlockStats = false
+ transactionPool := MakeTransactionPool(mockLedger, cfg, logging.Base())
+
+ savedTransactions := 0
+ pendingTxn := transactions.SignedTxn{}
+ pendingTxIDSet := make(map[crypto.Signature]bool)
+ for i, sender := range addresses {
+ amount := uint64(0)
+ for _, receiver := range addresses {
+ if sender != receiver {
+ tx := transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: sender,
+ Fee: basics.MicroAlgos{Raw: proto.MinTxnFee + amount},
+ FirstValid: 0,
+ LastValid: 10,
+ Note: make([]byte, 0),
+ GenesisHash: mockLedger.GenesisHash(),
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: receiver,
+ Amount: basics.MicroAlgos{Raw: 0},
+ },
+ }
+ amount++
+
+ pendingTxn = tx.Sign(secrets[i])
+ require.NoError(t, transactionPool.RememberOne(pendingTxn))
+ pendingTxIDSet[pendingTxn.Sig] = true
+ savedTransactions++
+ }
+ }
+ }
+ t.Logf("savedTransactions=%d", savedTransactions)
+ pending := transactionPool.PendingTxGroups()
+ require.Len(t, pending, savedTransactions)
+ require.Len(t, pendingTxIDSet, savedTransactions)
+
+ blockEval := newBlockEvaluator(t, mockLedger)
+ err := blockEval.Transaction(pendingTxn, transactions.ApplyData{})
+ require.NoError(t, err)
+
+ // simulate this transaction was applied
+ block, err := blockEval.GenerateBlock()
+ require.NoError(t, err)
+
+ transactionPool.StartSpeculativeBlockAssembly(context.Background(), block, crypto.Digest{}, false)
+ newSavedTransactions := 0
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i, sender := range addresses {
+ amount := uint64(0)
+ for _, receiver := range addresses {
+ if sender != receiver {
+ tx := transactions.Transaction{
+ Type: protocol.PaymentTx,
+ Header: transactions.Header{
+ Sender: sender,
+ Fee: basics.MicroAlgos{Raw: proto.MinTxnFee + amount},
+ FirstValid: 0,
+ LastValid: 11,
+ Note: make([]byte, 0),
+ GenesisHash: mockLedger.GenesisHash(),
+ },
+ PaymentTxnFields: transactions.PaymentTxnFields{
+ Receiver: receiver,
+ Amount: basics.MicroAlgos{Raw: 0},
+ },
+ }
+ amount++
+
+ pendingTxn = tx.Sign(secrets[i])
+ require.NoError(t, transactionPool.RememberOne(pendingTxn))
+ pendingTxIDSet[pendingTxn.Sig] = true
+ newSavedTransactions++
+ }
+ }
+ }
+ }()
+ wg.Wait()
+ t.Logf("newSavedTransactions=%d", newSavedTransactions)
+
+ // tx pool should have old txns and new txns
+ require.Len(t, transactionPool.PendingTxIDs(), savedTransactions+newSavedTransactions)
+
+ var stats telemetryspec.AssembleBlockMetrics
+ specBlock, specErr := transactionPool.tryReadSpeculativeBlock(block.Block().Hash(), block.Block().Round()+1, time.Now().Add(time.Second), &stats)
+ require.NoError(t, specErr)
+ require.NotNil(t, specBlock)
+ // assembled block doesn't have txn in the speculated block
+ require.Len(t, specBlock.Block().Payset, savedTransactions-1, "len(Payset)=%d, savedTransactions=%d", len(specBlock.Block().Payset), savedTransactions)
+
+ for _, txn := range specBlock.Block().Payset {
+ require.NotEqual(t, txn.SignedTxn.Sig, pendingTxn.Sig)
+ require.True(t, pendingTxIDSet[txn.SignedTxn.Sig])
+ }
+}
diff --git a/installer/config.json.example b/installer/config.json.example
index 8522011ce6..70bcc758b9 100644
--- a/installer/config.json.example
+++ b/installer/config.json.example
@@ -101,6 +101,8 @@
"RestReadTimeoutSeconds": 15,
"RestWriteTimeoutSeconds": 120,
"RunHosted": false,
+ "SpeculativeAsmTimeOffset": 0,
+ "SpeculativeAssemblyDisable": false,
"StorageEngine": "sqlite",
"SuggestedFeeBlockHistory": 3,
"SuggestedFeeSlidingWindowSize": 50,
diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go
index 5edb6d20e1..e99574487c 100644
--- a/ledger/ledger_test.go
+++ b/ledger/ledger_test.go
@@ -1102,7 +1102,7 @@ func testLedgerSingleTxApplyData(t *testing.T, version protocol.ConsensusVersion
VoteLast: 10000,
}
- // depends on what the concensus is need to generate correct KeyregTxnFields.
+ // depends on what the consensus is need to generate correct KeyregTxnFields.
if proto.EnableStateProofKeyregCheck {
frst, lst := uint64(correctKeyregFields.VoteFirst), uint64(correctKeyregFields.VoteLast)
store, err := db.MakeAccessor("test-DB", false, true)
diff --git a/ledger/ledgercore/validatedBlock.go b/ledger/ledgercore/validatedBlock.go
index 84d09e2a9a..0ace3a24f4 100644
--- a/ledger/ledgercore/validatedBlock.go
+++ b/ledger/ledgercore/validatedBlock.go
@@ -17,8 +17,11 @@
package ledgercore
import (
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/committee"
+ "github.com/algorand/go-algorand/data/transactions"
)
// ValidatedBlock represents the result of a block validation. It can
@@ -50,6 +53,27 @@ func (vb ValidatedBlock) WithSeed(s committee.Seed) ValidatedBlock {
}
}
+// CheckDup checks whether a txn is a duplicate
+func (vb ValidatedBlock) CheckDup(currentProto config.ConsensusParams, firstValid, lastValid basics.Round, txid transactions.Txid, txl Txlease) error {
+ _, present := vb.delta.Txids[txid]
+ if present {
+ return &TransactionInLedgerError{Txid: txid}
+ }
+
+ if currentProto.SupportTransactionLeases && (txl.Lease != [32]byte{}) {
+ expires, ok := vb.delta.Txleases[txl]
+ if ok && vb.blk.Round() <= expires {
+ return MakeLeaseInLedgerError(txid, txl, false)
+ }
+ }
+ return nil
+}
+
+// Hash returns the hash of the block
+func (vb ValidatedBlock) Hash() bookkeeping.BlockHash {
+ return vb.blk.Hash()
+}
+
// MakeValidatedBlock creates a validated block.
func MakeValidatedBlock(blk bookkeeping.Block, delta StateDelta) ValidatedBlock {
return ValidatedBlock{
diff --git a/ledger/speculative.go b/ledger/speculative.go
new file mode 100644
index 0000000000..d6b251f9c8
--- /dev/null
+++ b/ledger/speculative.go
@@ -0,0 +1,305 @@
+// Copyright (C) 2019-2023 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package ledger
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/crypto"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/data/transactions"
+ "github.com/algorand/go-algorand/data/transactions/logic"
+ "github.com/algorand/go-algorand/data/transactions/verify"
+ "github.com/algorand/go-algorand/ledger/eval"
+ "github.com/algorand/go-algorand/ledger/ledgercore"
+ "github.com/algorand/go-algorand/logging"
+)
+
+// LedgerForEvaluator defines the ledger interface needed by the evaluator.
+type LedgerForEvaluator interface { //nolint:revive //LedgerForEvaluator is a long established but newly leaking-out name, and there really isn't a better name for it despite how lint dislikes ledger.LedgerForEvaluator
+ eval.LedgerForEvaluator
+ LookupKv(rnd basics.Round, key string) ([]byte, error)
+ FlushCaches()
+
+ // and a few more Ledger functions
+ Block(basics.Round) (bookkeeping.Block, error)
+ Latest() basics.Round
+ VerifiedTransactionCache() verify.VerifiedTransactionCache
+ StartEvaluator(hdr bookkeeping.BlockHeader, paysetHint, maxTxnBytesPerBlock int, tracer logic.EvalTracer) (*eval.BlockEvaluator, error)
+ GetStateProofVerificationContext(stateProofLastAttestedRound basics.Round) (*ledgercore.StateProofVerificationContext, error)
+}
+
+// ErrRoundTooHigh "round too high" try to get ledger record in the future
+var ErrRoundTooHigh = errors.New("round too high")
+
+// validatedBlockAsLFE presents a LedgerForEvaluator interface on top of
+// a ValidatedBlock. This makes it possible to construct a BlockEvaluator
+// on top, which in turn allows speculatively constructing a subsequent
+// block, before the ValidatedBlock is committed to the ledger.
+//
+// ledger ValidatedBlock -------> Block
+// | ^ blk
+// | | vb
+// | l |
+// \---------- validatedBlockAsLFE
+//
+// where ledger is the full ledger.
+type validatedBlockAsLFE struct {
+ // l points to the underlying ledger; it might be another instance
+ // of validatedBlockAsLFE if we are speculating on a chain of many
+ // blocks.
+ l LedgerForEvaluator
+
+ // vb points to the ValidatedBlock that logically extends the
+ // state of the ledger.
+ vb *ledgercore.ValidatedBlock
+}
+
+// MakeValidatedBlockAsLFE constructs a new validatedBlockAsLFE from a ValidatedBlock.
+func MakeValidatedBlockAsLFE(vb *ledgercore.ValidatedBlock, l LedgerForEvaluator) (*validatedBlockAsLFE, error) {
+ latestRound := l.Latest()
+ if vb.Block().Round().SubSaturate(1) != latestRound {
+ return nil, fmt.Errorf("MakeBlockAsLFE: Ledger round %d mismatches next block round %d", latestRound, vb.Block().Round())
+ }
+ hdr, err := l.BlockHdr(latestRound)
+ if err != nil {
+ return nil, err
+ }
+ if vb.Block().Branch != hdr.Hash() {
+ return nil, fmt.Errorf("MakeBlockAsLFE: Ledger latest block hash %x mismatches block's prev hash %x", hdr.Hash(), vb.Block().Branch)
+ }
+
+ return &validatedBlockAsLFE{
+ l: l,
+ vb: vb,
+ }, nil
+}
+
+// Block implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) Block(r basics.Round) (bookkeeping.Block, error) {
+ if r == v.vb.Block().Round() {
+ return v.vb.Block(), nil
+ }
+
+ return v.l.Block(r)
+}
+
+// BlockHdr implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) BlockHdr(r basics.Round) (bookkeeping.BlockHeader, error) {
+ if r == v.vb.Block().Round() {
+ return v.vb.Block().BlockHeader, nil
+ }
+
+ return v.l.BlockHdr(r)
+}
+
+// CheckDup implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) CheckDup(currentProto config.ConsensusParams, current basics.Round, firstValid basics.Round, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error {
+ if current == v.vb.Block().Round() {
+ return v.vb.CheckDup(currentProto, firstValid, lastValid, txid, txl)
+ }
+
+ return v.l.CheckDup(currentProto, current, firstValid, lastValid, txid, txl)
+}
+
+// GenesisHash implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) GenesisHash() crypto.Digest {
+ return v.l.GenesisHash()
+}
+
+// Latest implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) Latest() basics.Round {
+ return v.vb.Block().Round()
+}
+
+// LatestTotals returns the totals of all accounts for the most recent round, as well as the round number.
+func (v *validatedBlockAsLFE) LatestTotals() (basics.Round, ledgercore.AccountTotals, error) {
+ return v.Latest(), v.vb.Delta().Totals, nil
+}
+
+// VotersForStateProof implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) VotersForStateProof(r basics.Round) (*ledgercore.VotersForRound, error) {
+ if r >= v.vb.Block().Round() {
+ // We do not support computing the compact cert voters for rounds
+ // that have not been committed to the ledger yet. This should not
+ // be a problem as long as the speculation depth does not
+ // exceed CompactCertVotersLookback.
+ err := fmt.Errorf("validatedBlockAsLFE.CompactCertVoters(%d): validated block is for round %d, voters not available", r, v.vb.Block().Round())
+ logging.Base().Warn(err.Error())
+ return nil, err
+ }
+
+ return v.l.VotersForStateProof(r)
+}
+
+// GetCreatorForRound implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) GetCreatorForRound(rnd basics.Round, cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error) {
+ vbround := v.vb.Block().Round()
+ if rnd > vbround {
+ return basics.Address{}, false, ErrRoundTooHigh
+ }
+ if rnd == vbround {
+ delta, ok := v.vb.Delta().Creatables[cidx]
+ if ok {
+ if delta.Created && delta.Ctype == ctype {
+ return delta.Creator, true, nil
+ }
+ return basics.Address{}, false, nil
+ }
+
+ // no change in this block, so lookup in previous
+ rnd--
+ }
+
+ return v.l.GetCreatorForRound(rnd, cidx, ctype)
+}
+
+// GenesisProto returns the initial protocol for this ledger.
+func (v *validatedBlockAsLFE) GenesisProto() config.ConsensusParams {
+ return v.l.GenesisProto()
+}
+
+// LookupApplication loads an application resource that matches the request parameters from the ledger.
+func (v *validatedBlockAsLFE) LookupApplication(rnd basics.Round, addr basics.Address, aidx basics.AppIndex) (ledgercore.AppResource, error) {
+ vbround := v.vb.Block().Round()
+ if rnd > vbround {
+ return ledgercore.AppResource{}, ErrRoundTooHigh
+ }
+ if rnd == vbround {
+ // Intentionally apply (pending) rewards up to rnd.
+ res, ok := v.vb.Delta().Accts.GetResource(addr, basics.CreatableIndex(aidx), basics.AppCreatable)
+ if ok {
+ return ledgercore.AppResource{AppParams: res.AppParams, AppLocalState: res.AppLocalState}, nil
+ }
+
+ // fall back to looking up asset in ledger, until previous block
+ rnd--
+ }
+
+ return v.l.LookupApplication(rnd, addr, aidx)
+}
+
+// LookupAsset loads an asset resource that matches the request parameters from the ledger.
+func (v *validatedBlockAsLFE) LookupAsset(rnd basics.Round, addr basics.Address, aidx basics.AssetIndex) (ledgercore.AssetResource, error) {
+ vbround := v.vb.Block().Round()
+ if rnd > vbround {
+ return ledgercore.AssetResource{}, ErrRoundTooHigh
+ }
+ if rnd == vbround {
+ // Intentionally apply (pending) rewards up to rnd.
+ res, ok := v.vb.Delta().Accts.GetResource(addr, basics.CreatableIndex(aidx), basics.AssetCreatable)
+ if ok {
+ return ledgercore.AssetResource{AssetParams: res.AssetParams, AssetHolding: res.AssetHolding}, nil
+ }
+ // fall back to looking up asset in ledger, until previous block
+ rnd--
+ }
+
+ return v.l.LookupAsset(rnd, addr, aidx)
+}
+
+// LookupWithoutRewards implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) LookupWithoutRewards(rnd basics.Round, a basics.Address) (ledgercore.AccountData, basics.Round, error) {
+ vbround := v.vb.Block().Round()
+ if rnd > vbround {
+ return ledgercore.AccountData{}, rnd, ErrRoundTooHigh
+ }
+ if rnd == vbround {
+ data, ok := v.vb.Delta().Accts.GetData(a)
+ if ok {
+ return data, rnd, nil
+ }
+ // fall back to looking up account in ledger, until previous block
+ rnd--
+ }
+
+ // account didn't change in last round. Subtract 1 so we can lookup the most recent change in the ledger
+ acctData, fallbackrnd, err := v.l.LookupWithoutRewards(rnd, a)
+ if err != nil {
+ return acctData, fallbackrnd, err
+ }
+ return acctData, rnd, err
+}
+
+// VerifiedTransactionCache implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) VerifiedTransactionCache() verify.VerifiedTransactionCache {
+ return v.l.VerifiedTransactionCache()
+}
+
+// VerifiedTransactionCache implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) BlockHdrCached(rnd basics.Round) (hdr bookkeeping.BlockHeader, err error) {
+ vbround := v.vb.Block().Round()
+ if rnd > vbround {
+ return bookkeeping.BlockHeader{}, ErrRoundTooHigh
+ }
+ if rnd == vbround {
+ return v.vb.Block().BlockHeader, nil
+ }
+ return v.l.BlockHdrCached(rnd)
+}
+
+// StartEvaluator implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) StartEvaluator(hdr bookkeeping.BlockHeader, paysetHint, maxTxnBytesPerBlock int, tracer logic.EvalTracer) (*eval.BlockEvaluator, error) {
+ if hdr.Round.SubSaturate(1) != v.Latest() {
+ return nil, fmt.Errorf("StartEvaluator: LFE round %d mismatches next block round %d", v.Latest(), hdr.Round)
+ }
+
+ return eval.StartEvaluator(v, hdr,
+ eval.EvaluatorOptions{
+ PaysetHint: paysetHint,
+ Tracer: tracer,
+ Generate: true,
+ Validate: true,
+ MaxTxnBytesPerBlock: maxTxnBytesPerBlock,
+ })
+}
+
+// GetStateProofVerificationContext implements the ledgerForEvaluator interface.
+func (v *validatedBlockAsLFE) GetStateProofVerificationContext(stateProofLastAttestedRound basics.Round) (*ledgercore.StateProofVerificationContext, error) {
+ //TODO(yg): is this behavior correct? what happens if round is the last block's round?
+ return v.l.GetStateProofVerificationContext(stateProofLastAttestedRound)
+}
+
+// FlushCaches is part of ledger/eval.LedgerForEvaluator interface
+func (v *validatedBlockAsLFE) FlushCaches() {
+}
+
+// LookupKv implements LookupKv
+func (v *validatedBlockAsLFE) LookupKv(rnd basics.Round, key string) ([]byte, error) {
+ vbround := v.vb.Block().Round()
+ if rnd > vbround {
+ return nil, ErrRoundTooHigh
+ }
+ if rnd == vbround {
+ data, ok := v.vb.Delta().KvMods[key]
+ if ok {
+ return data.Data, nil
+ }
+ // fall back to looking up account in ledger, until previous block
+ rnd--
+ }
+
+ // account didn't change in last round. Subtract 1 so we can lookup the most recent change in the ledger
+ data, err := v.l.LookupKv(rnd, key)
+ if err != nil {
+ return nil, err
+ }
+ return data, nil
+}
diff --git a/ledger/speculative_test.go b/ledger/speculative_test.go
new file mode 100644
index 0000000000..bcbb693cb7
--- /dev/null
+++ b/ledger/speculative_test.go
@@ -0,0 +1,119 @@
+// Copyright (C) 2019-2023 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package ledger
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/ledger/eval"
+
+ "github.com/algorand/go-algorand/data/transactions"
+ "github.com/algorand/go-algorand/ledger/ledgercore"
+ ledgertesting "github.com/algorand/go-algorand/ledger/testing"
+ "github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-algorand/protocol"
+)
+
+func TestSpeculative(t *testing.T) {
+ genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 1000)
+ const inMem = true
+ cfg := config.GetDefaultLocal()
+ log := logging.TestingLog(t)
+ l, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
+ require.NoError(t, err, "could not open ledger")
+ defer l.Close()
+
+ blk0, err := l.BlockHdr(l.Latest())
+ require.NoError(t, err)
+
+ var blk1 bookkeeping.Block
+ blk1.CurrentProtocol = protocol.ConsensusCurrentVersion
+ blk1.Branch = blk0.Hash()
+ blk1.RewardsPool = testPoolAddr
+ blk1.FeeSink = testSinkAddr
+ blk1.BlockHeader.GenesisHash = genesisInitState.GenesisHash
+ blk1.BlockHeader.Round = l.Latest() + 1
+
+ //sl, err := MakeSpeculativeLedger(l)
+ //require.NoError(t, err)
+
+ require.NoError(t, err)
+
+ state, err := eval.Eval(context.Background(), l, blk1, false, l.VerifiedTransactionCache(), nil, nil)
+ require.NoError(t, err)
+ vblk1 := ledgercore.MakeValidatedBlock(blk1, state)
+
+ blk1aslfe, err := MakeValidatedBlockAsLFE(&vblk1, l)
+ require.NoError(t, err)
+
+ blk2 := blk1
+ blk2.BlockHeader.Round++
+ blk2.Branch = blk1.Hash()
+
+ // Pick some accounts at random
+ var addr1, addr2 basics.Address
+ for a := range genesisInitState.Accounts {
+ if addr1 == (basics.Address{}) {
+ addr1 = a
+ } else if addr2 == (basics.Address{}) {
+ addr2 = a
+ } else {
+ break
+ }
+ }
+
+ var tx21 transactions.Transaction
+ tx21.Type = protocol.PaymentTx
+ tx21.Sender = addr1
+ tx21.Receiver = addr2
+ tx21.FirstValid = blk2.BlockHeader.Round
+ tx21.LastValid = blk2.BlockHeader.Round
+ tx21.Amount.Raw = 1000000
+ blk2.Payset = append(blk2.Payset, transactions.SignedTxnInBlock{
+ SignedTxnWithAD: transactions.SignedTxnWithAD{
+ SignedTxn: transactions.SignedTxn{
+ Txn: tx21,
+ },
+ },
+ HasGenesisID: true,
+ })
+
+ state, err = eval.Eval(context.Background(), blk1aslfe, blk2, false, blk1aslfe.VerifiedTransactionCache(), nil, nil)
+ require.NoError(t, err)
+ vblk2 := ledgercore.MakeValidatedBlock(blk2, state)
+
+ blk2aslfe, err := MakeValidatedBlockAsLFE(&vblk2, blk1aslfe)
+ require.NoError(t, err)
+
+ ad11, rnd, err := blk2aslfe.LookupWithoutRewards(blk1.Round(), addr1)
+ require.NoError(t, err)
+ // account was never changed
+ require.Equal(t, rnd, blk1.Round())
+
+ ad22, rnd, err := blk2aslfe.LookupWithoutRewards(blk2.Round(), addr1)
+ require.NoError(t, err)
+ // account changed at blk2
+ require.Equal(t, rnd, blk2.Round())
+
+ require.Equal(t, ad22.MicroAlgos.Raw, ad11.MicroAlgos.Raw-1000000)
+}
diff --git a/logging/logspec/agreement.go b/logging/logspec/agreement.go
index c5df1948bd..526c8301d2 100644
--- a/logging/logspec/agreement.go
+++ b/logging/logspec/agreement.go
@@ -58,6 +58,7 @@ const (
// has been reached for a given value during some (round, period, step).
ThresholdReached
// BlockAssembled is emitted when the source receives all parts of a block.
+ // BlockAssembled is not when the local node has assembled a block, but when a block has been _received_ and verified from the network, or possibly when a locally assembled block has been added to the proposal cache of blocks validaded and available for voting for the next round. TODO: rename?
BlockAssembled
// BlockCommittable is emitted when the source observes a
// block B and a threshold of soft-votes for H(B). It is
diff --git a/node/node.go b/node/node.go
index 87ffd3e2a4..1203e3a17b 100644
--- a/node/node.go
+++ b/node/node.go
@@ -1293,6 +1293,16 @@ func (node *AlgorandFullNode) AssembleBlock(round basics.Round) (agreement.Valid
return validatedBlock{vb: lvb}, nil
}
+// StartSpeculativeBlockAssembly handles creating a speculative block
+func (node *AlgorandFullNode) StartSpeculativeBlockAssembly(ctx context.Context, avb agreement.ValidatedBlock, blockHash crypto.Digest, onlyIfStarted bool) {
+ vb, ok := avb.(validatedBlock)
+ if ok {
+ node.transactionPool.StartSpeculativeBlockAssembly(ctx, vb.vb, blockHash, onlyIfStarted)
+ } else {
+ node.log.Panicf("cannot convert agreement ValidatedBlock to ValidateBlock, got %T", avb)
+ }
+}
+
// getOfflineClosedStatus will return an int with the appropriate bit(s) set if it is offline and/or online
func getOfflineClosedStatus(acctData basics.OnlineAccountData) int {
rval := 0
diff --git a/test/scripts/e2e_client_runner.py b/test/scripts/e2e_client_runner.py
index 7b425ef925..233e8244a4 100755
--- a/test/scripts/e2e_client_runner.py
+++ b/test/scripts/e2e_client_runner.py
@@ -446,6 +446,19 @@ def main():
retcode = 0
capv = args.version.capitalize()
xrun(['goal', 'network', 'create', '-r', netdir, '-n', 'tbd', '-t', os.path.join(repodir, f'test/testdata/nettemplates/TwoNodes50Each{capv}.json')], timeout=90)
+<<<<<<< HEAD
+ env['ALGORAND_DATA'] = os.path.join(netdir, 'Node')
+ env['ALGORAND_DATA2'] = os.path.join(netdir, 'Primary')
+ cfgpath = os.path.join(netdir, 'Node', 'config.json')
+ with open(cfgpath, 'rt') as fin:
+ ncfg = json.load(fin)
+ ncfg['EnableDeveloperAPI'] = True
+ with open(cfgpath, 'wt') as fout:
+ json.dump(ncfg, fout)
+ xrun(['goal', 'network', 'start', '-r', netdir], timeout=90)
+ atexit.register(goal_network_stop, netdir, env)
+
+=======
nodeDataDir = os.path.join(netdir, 'Node')
primaryDataDir = os.path.join(netdir, 'Primary')
@@ -465,6 +478,7 @@ def main():
env['ALGORAND_DATA'] = nodeDataDir
env['ALGORAND_DATA2'] = primaryDataDir
+>>>>>>> master
if args.unsafe_scrypt:
create_kmd_config_with_unsafe_scrypt(env['ALGORAND_DATA'])
diff --git a/test/scripts/e2e_subs/e2e-app-real-assets-round.sh b/test/scripts/e2e_subs/e2e-app-real-assets-round.sh
index 25a543300f..905f236c29 100755
--- a/test/scripts/e2e_subs/e2e-app-real-assets-round.sh
+++ b/test/scripts/e2e_subs/e2e-app-real-assets-round.sh
@@ -20,10 +20,29 @@ ACCOUNT=$(${gcmd} account list|awk '{ print $3 }')
${gcmd} asset create --creator ${ACCOUNT} --name bogocoin --unitname bogo --total 1337
ASSET_ID=$(${gcmd} asset info --creator $ACCOUNT --unitname bogo|grep 'Asset ID'|awk '{ print $3 }')
+#${gcmd} account info -a ${ACCOUNT}
+
# Create app that reads asset balance and checks asset details and checks round
ROUND=$(goal node status | grep 'Last committed' | awk '{ print $4 }')
-TIMESTAMP=$(goal ledger block --strict ${ROUND} | jq .block.ts)
-APP_ID=$(${gcmd} app create --creator ${ACCOUNT} --foreign-asset $ASSET_ID --app-arg "int:$ASSET_ID" --app-arg "int:1337" --app-arg "int:0" --app-arg "int:0" --app-arg "int:1337" --app-arg "str:bogo" --app-arg "int:$ROUND" --app-arg "int:$TIMESTAMP" --approval-prog ${DIR}/tealprogs/assetround.teal --global-byteslices 0 --global-ints 0 --local-byteslices 0 --local-ints 0 --clear-prog <(printf "#pragma version 2\nint 1") | grep Created | awk '{ print $6 }')
+#TIMESTAMP=$(goal ledger block --strict ${ROUND} | jq .block.ts)
+TIMESTAMP=$(date +%s)
+#${gcmd} app create --dryrun-dump -o ${TEMPDIR}/ac.txn --creator ${ACCOUNT} --foreign-asset $ASSET_ID --app-arg "int:$ASSET_ID" --app-arg "int:1337" --app-arg "int:0" --app-arg "int:0" --app-arg "int:1337" --app-arg "str:bogo" --app-arg "int:$ROUND" --app-arg "int:$TIMESTAMP" --approval-prog ${DIR}/tealprogs/assetround.teal --global-byteslices 0 --global-ints 0 --local-byteslices 0 --local-ints 0 --clear-prog ${DIR}/tealprogs/approve-all.teal
+#${gcmd} clerk dryrun-remote -v -D ${TEMPDIR}/ac.txn
+
+set +e
+date '+before %s'
+APP_ID=$(${gcmd} app create --creator ${ACCOUNT} --foreign-asset $ASSET_ID --app-arg "int:$ASSET_ID" --app-arg "int:1337" --app-arg "int:0" --app-arg "int:0" --app-arg "int:1337" --app-arg "str:bogo" --app-arg "int:$ROUND" --app-arg "int:$TIMESTAMP" --approval-prog ${DIR}/tealprogs/assetround.teal --global-byteslices 0 --global-ints 0 --local-byteslices 0 --local-ints 0 --clear-prog ${DIR}/tealprogs/approve-all.teal | grep Created | awk '{ print $6 }')
+date '+after %s'
+
+# if [ "x${APP_ID}x" = "xx" ]; then
+# set -e
+# sleep 10
+# date '+before %s'
+# ROUND=$(goal node status | grep 'Last committed' | awk '{ print $4 }')
+# TIMESTAMP=$(date +%s)
+# APP_ID=$(${gcmd} app create --creator ${ACCOUNT} --foreign-asset $ASSET_ID --app-arg "int:$ASSET_ID" --app-arg "int:1337" --app-arg "int:0" --app-arg "int:0" --app-arg "int:1337" --app-arg "str:bogo" --app-arg "int:$ROUND" --app-arg "int:$TIMESTAMP" --approval-prog ${DIR}/tealprogs/assetround.teal --global-byteslices 0 --global-ints 0 --local-byteslices 0 --local-ints 0 --clear-prog ${DIR}/tealprogs/approve-all.teal | grep Created | awk '{ print $6 }')
+# date '+after %s'
+# fi
# Create another account, fund it, send it some asset
ACCOUNTB=$(${gcmd} account new|awk '{ print $6 }')
diff --git a/test/scripts/e2e_subs/tealprogs/assetround.teal b/test/scripts/e2e_subs/tealprogs/assetround.teal
index e6b3c6144f..f147a8ac0c 100644
--- a/test/scripts/e2e_subs/tealprogs/assetround.teal
+++ b/test/scripts/e2e_subs/tealprogs/assetround.teal
@@ -69,39 +69,47 @@ bz fail
round:
-// Check round against arg 6 (arg < global Round, arg + 4 > global Round)
+// Check round against arg 6 // arg <= Round <= arg + 4
+// (arg <= global Round, arg + 4 >= global Round)
txna ApplicationArgs 6
btoi
+dup
global Round
-<
+<=
bz fail
-txna ApplicationArgs 6
-btoi
+//txna ApplicationArgs 6 // from dup
+//btoi
int 4
+
-// Check timestamp against arg 7 (arg < global LatestTimestamp + 60, arg + 60 > global LatestTimestamp)
+global Round
+>=
+bz fail
+
+// Check timestamp against arg 7 (arg-60 <= timestamp <= arg+60)
+// (arg <= global LatestTimestamp + 60, arg + 60 >= global LatestTimestamp)
txna ApplicationArgs 7
btoi
+dup
global LatestTimestamp
int 60
+
-<
+<=
bz fail
-txna ApplicationArgs 7
-btoi
+//txna ApplicationArgs 7 // from dup
+//btoi
int 60
+
global LatestTimestamp
->
+>=
bz fail
success:
diff --git a/test/testdata/configs/config-v28.json b/test/testdata/configs/config-v28.json
index 7b6ceb5326..b885aa68a7 100644
--- a/test/testdata/configs/config-v28.json
+++ b/test/testdata/configs/config-v28.json
@@ -98,6 +98,8 @@
"RestReadTimeoutSeconds": 15,
"RestWriteTimeoutSeconds": 120,
"RunHosted": false,
+ "SpeculativeAsmTimeOffset": 0,
+ "SpeculativeAssemblyDisable": false,
"StorageEngine": "sqlite",
"SuggestedFeeBlockHistory": 3,
"SuggestedFeeSlidingWindowSize": 50,
diff --git a/test/testdata/configs/config-v29.json b/test/testdata/configs/config-v29.json
index 8522011ce6..70bcc758b9 100644
--- a/test/testdata/configs/config-v29.json
+++ b/test/testdata/configs/config-v29.json
@@ -101,6 +101,8 @@
"RestReadTimeoutSeconds": 15,
"RestWriteTimeoutSeconds": 120,
"RunHosted": false,
+ "SpeculativeAsmTimeOffset": 0,
+ "SpeculativeAssemblyDisable": false,
"StorageEngine": "sqlite",
"SuggestedFeeBlockHistory": 3,
"SuggestedFeeSlidingWindowSize": 50,
diff --git a/test/testdata/configs/config-v30.json b/test/testdata/configs/config-v30.json
new file mode 100644
index 0000000000..e73de9a943
--- /dev/null
+++ b/test/testdata/configs/config-v30.json
@@ -0,0 +1,126 @@
+{
+ "Version": 30,
+ "AccountUpdatesStatsInterval": 5000000000,
+ "AccountsRebuildSynchronousMode": 1,
+ "AgreementIncomingBundlesQueueLength": 15,
+ "AgreementIncomingProposalsQueueLength": 50,
+ "AgreementIncomingVotesQueueLength": 20000,
+ "AnnounceParticipationKey": true,
+ "Archival": false,
+ "BaseLoggerDebugLevel": 4,
+ "BlockServiceCustomFallbackEndpoints": "",
+ "BlockServiceMemCap": 500000000,
+ "BroadcastConnectionsLimit": -1,
+ "CadaverDirectory": "",
+ "CadaverSizeTarget": 0,
+ "CatchpointFileHistoryLength": 365,
+ "CatchpointInterval": 10000,
+ "CatchpointTracking": 0,
+ "CatchupBlockDownloadRetryAttempts": 1000,
+ "CatchupBlockValidateMode": 0,
+ "CatchupFailurePeerRefreshRate": 10,
+ "CatchupGossipBlockFetchTimeoutSec": 4,
+ "CatchupHTTPBlockFetchTimeoutSec": 4,
+ "CatchupLedgerDownloadRetryAttempts": 50,
+ "CatchupParallelBlocks": 16,
+ "ConnectionsRateLimitingCount": 60,
+ "ConnectionsRateLimitingWindowSeconds": 1,
+ "DNSBootstrapID": ".algorand.network?backup=.algorand.net&dedup=.algorand-.(network|net)",
+ "DNSSecurityFlags": 1,
+ "DeadlockDetection": 0,
+ "DeadlockDetectionThreshold": 30,
+ "DisableLedgerLRUCache": false,
+ "DisableLocalhostConnectionRateLimit": true,
+ "DisableNetworking": false,
+ "DisableOutgoingConnectionThrottling": false,
+ "EnableAccountUpdatesStats": false,
+ "EnableAgreementReporting": false,
+ "EnableAgreementTimeMetrics": false,
+ "EnableAssembleStats": false,
+ "EnableBlockService": false,
+ "EnableBlockServiceFallbackToArchiver": true,
+ "EnableCatchupFromArchiveServers": false,
+ "EnableDeveloperAPI": false,
+ "EnableExperimentalAPI": false,
+ "EnableFollowMode": false,
+ "EnableGossipBlockService": true,
+ "EnableIncomingMessageFilter": false,
+ "EnableLedgerService": false,
+ "EnableMetricReporting": false,
+ "EnableOutgoingNetworkMessageFiltering": true,
+ "EnablePingHandler": true,
+ "EnableProcessBlockStats": false,
+ "EnableProfiler": false,
+ "EnableRequestLogger": false,
+ "EnableRuntimeMetrics": false,
+ "EnableTopAccountsReporting": false,
+ "EnableTxBacklogRateLimiting": false,
+ "EnableTxnEvalTracer": false,
+ "EnableUsageLog": false,
+ "EnableVerbosedTransactionSyncLogging": false,
+ "EndpointAddress": "127.0.0.1:0",
+ "FallbackDNSResolverAddress": "",
+ "ForceFetchTransactions": false,
+ "ForceRelayMessages": false,
+ "GossipFanout": 4,
+ "HeartbeatUpdateInterval": 600,
+ "IncomingConnectionsLimit": 2400,
+ "IncomingMessageFilterBucketCount": 5,
+ "IncomingMessageFilterBucketSize": 512,
+ "LedgerSynchronousMode": 2,
+ "LogArchiveMaxAge": "",
+ "LogArchiveName": "node.archive.log",
+ "LogSizeLimit": 1073741824,
+ "MaxAPIBoxPerApplication": 100000,
+ "MaxAPIResourcesPerAccount": 100000,
+ "MaxAcctLookback": 4,
+ "MaxCatchpointDownloadDuration": 43200000000000,
+ "MaxConnectionsPerIP": 15,
+ "MinCatchpointFileDownloadBytesPerSecond": 20480,
+ "NetAddress": "",
+ "NetworkMessageTraceServer": "",
+ "NetworkProtocolVersion": "",
+ "NodeExporterListenAddress": ":9100",
+ "NodeExporterPath": "./node_exporter",
+ "OptimizeAccountsDatabaseOnStartup": false,
+ "OutgoingMessageFilterBucketCount": 3,
+ "OutgoingMessageFilterBucketSize": 128,
+ "P2PEnable": false,
+ "P2PPersistPeerID": true,
+ "P2PPrivateKeyLocation": "",
+ "ParticipationKeysRefreshInterval": 60000000000,
+ "PeerConnectionsUpdateInterval": 3600,
+ "PeerPingPeriodSeconds": 0,
+ "PriorityPeers": {},
+ "ProposalAssemblyTime": 500000000,
+ "PublicAddress": "",
+ "ReconnectTime": 60000000000,
+ "ReservedFDs": 256,
+ "RestConnectionsHardLimit": 2048,
+ "RestConnectionsSoftLimit": 1024,
+ "RestReadTimeoutSeconds": 15,
+ "RestWriteTimeoutSeconds": 120,
+ "RunHosted": false,
+ "StorageEngine": "sqlite",
+ "SpeculativeAsmTimeOffset": 400000000,
+ "SpeculativeAssemblyDisable": false,
+ "SuggestedFeeBlockHistory": 3,
+ "SuggestedFeeSlidingWindowSize": 50,
+ "TLSCertFile": "",
+ "TLSKeyFile": "",
+ "TelemetryToLog": true,
+ "TransactionSyncDataExchangeRate": 0,
+ "TransactionSyncSignificantMessageThreshold": 0,
+ "TxBacklogReservedCapacityPerPeer": 20,
+ "TxBacklogServiceRateWindowSeconds": 10,
+ "TxBacklogSize": 26000,
+ "TxIncomingFilterMaxSize": 500000,
+ "TxIncomingFilteringFlags": 1,
+ "TxPoolExponentialIncreaseFactor": 2,
+ "TxPoolSize": 75000,
+ "TxSyncIntervalSeconds": 60,
+ "TxSyncServeResponseSize": 1000000,
+ "TxSyncTimeoutSeconds": 30,
+ "UseXForwardedForAddressField": "",
+ "VerifiedTranscationsCacheSize": 150000
+}