diff --git a/agreement/abstractions.go b/agreement/abstractions.go index 44aafa4fd8..6cb7256273 100644 --- a/agreement/abstractions.go +++ b/agreement/abstractions.go @@ -85,6 +85,8 @@ type BlockFactory interface { // nodes on the network can assemble entries, the agreement protocol may // lose liveness. AssembleBlock(basics.Round) (ValidatedBlock, error) + + StartSpeculativeBlockAssembly(context.Context, ValidatedBlock, crypto.Digest, bool) } // A Ledger represents the sequence of Entries agreed upon by the protocol. diff --git a/agreement/actions.go b/agreement/actions.go index 833dc8af36..fb93c86d1a 100644 --- a/agreement/actions.go +++ b/agreement/actions.go @@ -54,6 +54,8 @@ const ( attest assemble repropose + speculativeAssembly + speculativeAssemblyIfStarted // disk checkpoint @@ -309,13 +311,14 @@ func (a rezeroAction) do(ctx context.Context, s *Service) { } type pseudonodeAction struct { - // assemble, repropose, attest + // assemble, repropose, attest, speculativeAssembly, speculativeAssemblyIfStarted T actionType - Round round - Period period - Step step - Proposal proposalValue + Round round + Period period + Step step + Proposal proposalValue + ValidatedBlock ValidatedBlock } func (a pseudonodeAction) t() actionType { @@ -346,6 +349,11 @@ func (a pseudonodeAction) do(ctx context.Context, s *Service) { default: s.log.Errorf("pseudonode.MakeProposals call failed %v", err) } + case speculativeAssembly: + s.loopback.StartSpeculativeBlockAssembly(ctx, a.ValidatedBlock, a.Proposal.BlockDigest, false) + case speculativeAssemblyIfStarted: + s.loopback.StartSpeculativeBlockAssembly(ctx, a.ValidatedBlock, a.Proposal.BlockDigest, true) + case repropose: logEvent := logspec.AgreementEvent{ Type: logspec.VoteAttest, @@ -464,7 +472,7 @@ func zeroAction(t actionType) action { return ensureAction{} case rezero: return rezeroAction{} - case attest, assemble, repropose: + case attest, assemble, repropose, speculativeAssembly, speculativeAssemblyIfStarted: return pseudonodeAction{} case checkpoint: return checkpointAction{} diff --git a/agreement/actiontype_string.go b/agreement/actiontype_string.go index 9272ec2cf6..e6cee56ece 100644 --- a/agreement/actiontype_string.go +++ b/agreement/actiontype_string.go @@ -23,12 +23,14 @@ func _() { _ = x[attest-12] _ = x[assemble-13] _ = x[repropose-14] - _ = x[checkpoint-15] + _ = x[speculativeAssembly-15] + _ = x[speculativeAssemblyIfStarted-16] + _ = x[checkpoint-17] } -const _actionType_name = "noopignorebroadcastrelaydisconnectbroadcastVotesverifyVoteverifyPayloadverifyBundleensurestageDigestrezeroattestassemblereproposecheckpoint" +const _actionType_name = "noopignorebroadcastrelaydisconnectbroadcastVotesverifyVoteverifyPayloadverifyBundleensurestageDigestrezeroattestassemblereproposespeculativeAssemblyspeculativeAssemblyIfStartedcheckpoint" -var _actionType_index = [...]uint8{0, 4, 10, 19, 24, 34, 48, 58, 71, 83, 89, 100, 106, 112, 120, 129, 139} +var _actionType_index = [...]uint8{0, 4, 10, 19, 24, 34, 48, 58, 71, 83, 89, 100, 106, 112, 120, 129, 148, 176, 186} func (i actionType) String() string { if i >= actionType(len(_actionType_index)-1) { diff --git a/agreement/agreementtest/simulate_test.go b/agreement/agreementtest/simulate_test.go index 53c42411ed..7d8622e10b 100644 --- a/agreement/agreementtest/simulate_test.go +++ b/agreement/agreementtest/simulate_test.go @@ -98,6 +98,9 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil } +func (f testBlockFactory) StartSpeculativeBlockAssembly(context.Context, agreement.ValidatedBlock, crypto.Digest, bool) { +} + // If we try to read from high rounds, we panic and do not emit an error to find bugs during testing. type testLedger struct { mu deadlock.Mutex diff --git a/agreement/common_test.go b/agreement/common_test.go index 0c11d9553d..9b7cafb9fb 100644 --- a/agreement/common_test.go +++ b/agreement/common_test.go @@ -184,6 +184,10 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (ValidatedBlock, error) return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil } +func (f testBlockFactory) StartSpeculativeBlockAssembly(context.Context, ValidatedBlock, crypto.Digest, bool) { + return +} + // If we try to read from high rounds, we panic and do not emit an error to find bugs during testing. type testLedger struct { mu deadlock.Mutex diff --git a/agreement/coservice.go b/agreement/coservice.go index 7fe4ab1f52..b8952c8142 100644 --- a/agreement/coservice.go +++ b/agreement/coservice.go @@ -17,9 +17,10 @@ package agreement import ( - "github.com/algorand/go-deadlock" + "runtime" + "testing" - "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-deadlock" ) //go:generate stringer -type=coserviceType @@ -35,10 +36,12 @@ const ( //msgp:ignore coserviceType type coserviceType int +// coserviceMonitor is unit test instrumentation type coserviceMonitor struct { deadlock.Mutex id int + t *testing.T c map[coserviceType]uint coserviceListener @@ -79,7 +82,10 @@ func (m *coserviceMonitor) dec(t coserviceType) { m.c = make(map[coserviceType]uint) } if m.c[t] == 0 { - logging.Base().Panicf("%d: tried to decrement empty coservice queue %v", m.id, t) + var stack [1000]byte + sl := runtime.Stack(stack[:], false) + m.t.Log(string(stack[:sl])) + m.t.Fatalf("%d: tried to decrement empty coservice queue %v", m.id, t) } m.c[t]-- diff --git a/agreement/demux.go b/agreement/demux.go index f31c4d075d..b9de002101 100644 --- a/agreement/demux.go +++ b/agreement/demux.go @@ -60,10 +60,13 @@ type demux struct { queue []<-chan externalEvent processingMonitor EventsProcessingMonitor - monitor *coserviceMonitor cancelTokenizers context.CancelFunc log logging.Logger + + // coserviceMonitor is unit test instrumentation. + // should be fast no-op if monitor == nil + monitor *coserviceMonitor } // demuxParams contains the parameters required to initliaze a new demux object @@ -74,7 +77,10 @@ type demuxParams struct { voteVerifier *AsyncVoteVerifier processingMonitor EventsProcessingMonitor log logging.Logger - monitor *coserviceMonitor + + // coserviceMonitor is unit test instrumentation. + // should be fast no-op if monitor == nil + monitor *coserviceMonitor } // makeDemux initializes the goroutines needed to process external events, setting up the appropriate channels. @@ -190,7 +196,7 @@ func (d *demux) verifyBundle(ctx context.Context, m message, r round, p period, // next blocks until it observes an external input event of interest for the state machine. // // If ok is false, there are no more events so the agreement service should quit. -func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Duration, currentRound round) (e externalEvent, ok bool) { +func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Duration, speculationDeadline time.Duration, currentRound round) (e externalEvent, ok bool) { defer func() { if !ok { return @@ -253,6 +259,14 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat deadlineCh := s.Clock.TimeoutAt(deadline) fastDeadlineCh := s.Clock.TimeoutAt(fastDeadline) + var speculationDeadlineCh <-chan time.Time + // zero timeout means we don't have enough time to speculate on block assembly + if speculationDeadline != 0 { + speculationDeadlineCh = s.Clock.TimeoutAt(speculationDeadline) + } + + //d.log.Infof("demux deadline %d, fastD %d, specD %d, d.monitor %v", deadline, fastDeadline, speculationDeadline, d.monitor) // not threadsafe in some tests + d.UpdateEventsQueue(eventQueueDemux, 0) d.monitor.dec(demuxCoserviceType) @@ -269,7 +283,7 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat // the pseudonode channel got closed. remove it from the queue and try again. d.queue = d.queue[1:] d.UpdateEventsQueue(eventQueuePseudonode, 0) - return d.next(s, deadline, fastDeadline, currentRound) + return d.next(s, deadline, fastDeadline, speculationDeadline, currentRound) // control case <-s.quit: @@ -303,6 +317,11 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat d.UpdateEventsQueue(eventQueueDemux, 1) d.monitor.inc(demuxCoserviceType) d.monitor.dec(clockCoserviceType) + case <-speculationDeadlineCh: + e = timeoutEvent{T: speculationTimeout, RandomEntropy: s.RandomSource.Uint64(), Round: nextRound} + d.UpdateEventsQueue(eventQueueDemux, 1) + d.monitor.inc(demuxCoserviceType) + d.monitor.dec(clockCoserviceType) // raw case m, open := <-rawVotes: @@ -358,6 +377,7 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat } // setupCompoundMessage processes compound messages: distinct messages which are delivered together +// TODO: does this ever really see something other than empty .Vote? func setupCompoundMessage(l LedgerReader, m message) (res externalEvent) { compound := m.CompoundMessage if compound.Vote == (unauthenticatedVote{}) { diff --git a/agreement/demux_test.go b/agreement/demux_test.go index 027dbc9e13..4b0d74f0ae 100644 --- a/agreement/demux_test.go +++ b/agreement/demux_test.go @@ -37,6 +37,7 @@ import ( ) const fastTimeoutChTime = 2 +const speculativeBlockAsmTime = time.Duration(5 * time.Microsecond) type demuxTester struct { *testing.T @@ -65,8 +66,10 @@ type demuxTestUsecase struct { verifiedProposal testChanState verifiedBundle testChanState // expected output - e event - ok bool + e event + ok bool + speculativeAsmTime time.Duration + desc string } var demuxTestUsecases = []demuxTestUsecase{ @@ -195,6 +198,7 @@ var demuxTestUsecases = []demuxTestUsecase{ verifiedBundle: testChanState{eventCount: 0, closed: false}, e: messageEvent{T: votePresent}, ok: true, + desc: "one vote one prop", }, { queue: []testChanState{}, @@ -411,6 +415,26 @@ var demuxTestUsecases = []demuxTestUsecase{ verifiedBundle: testChanState{eventCount: 0, closed: false}, e: messageEvent{T: votePresent}, ok: true, + desc: "one prop", + }, + { + queue: []testChanState{{eventCount: 0, closed: false}}, + rawVotes: testChanState{eventCount: 0, closed: false}, + rawProposals: testChanState{eventCount: 0, closed: false}, + rawBundles: testChanState{eventCount: 0, closed: false}, + compoundProposals: false, + quit: false, + voteChannelFull: false, + proposalChannelFull: false, + bundleChannelFull: false, + ledgerRoundReached: false, + deadlineReached: false, + verifiedVote: testChanState{eventCount: 0, closed: false}, + verifiedProposal: testChanState{eventCount: 0, closed: false}, + verifiedBundle: testChanState{eventCount: 0, closed: false}, + e: timeoutEvent{T: speculationTimeout}, + ok: true, + speculativeAsmTime: speculativeBlockAsmTime, }, } @@ -432,6 +456,9 @@ func (t *demuxTester) TimeoutAt(delta time.Duration) <-chan time.Time { if delta == fastTimeoutChTime { return nil } + if delta == speculativeBlockAsmTime { + return time.After(delta) + } c := make(chan time.Time, 2) if t.currentUsecase.deadlineReached { @@ -653,6 +680,7 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool { dmx := &demux{} + dmx.log = logging.TestingLog(t) dmx.crypto = t dmx.ledger = t dmx.rawVotes = t.makeRawChannel(protocol.AgreementVoteTag, testcase.rawVotes, false) @@ -674,14 +702,13 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool { if testcase.quit { close(s.quit) } - - e, ok := dmx.next(s, time.Second, fastTimeoutChTime, 300) + e, ok := dmx.next(s, time.Second, fastTimeoutChTime, testcase.speculativeAsmTime, 300) if !assert.Equal(t, testcase.ok, ok) { return false } - if !assert.Equalf(t, strings.Replace(testcase.e.String(), "{test_index}", fmt.Sprintf("%d", t.testIdx), 1), e.String(), "Test case %d failed.", t.testIdx+1) { + if !assert.Equalf(t, strings.Replace(testcase.e.String(), "{test_index}", fmt.Sprintf("%d", t.testIdx), 1), e.String(), "Test case %d (%s) failed.", t.testIdx+1, testcase.desc) { return false } diff --git a/agreement/events.go b/agreement/events.go index 52737e5f2c..88d7ec6a4f 100644 --- a/agreement/events.go +++ b/agreement/events.go @@ -122,8 +122,11 @@ const ( // period. // // fastTimeout is like timeout but for fast partition recovery. + // speculation timeout marks when the player should start speculative + // block assembly. timeout fastTimeout + speculationTimeout // Other events are delivered from one state machine to another to // communicate some message or as a reply to some message. These events @@ -197,6 +200,14 @@ const ( // readPinned is sent to the proposalStore to read the pinned value, if it exists. readPinned + // readLowestValue is sent to the proposalPeriodMachine to read the + // proposal-vote with the lowest credential. + readLowestValue + + // readLowestPayload is sent to the proposalStore to read the payload + // corresponding to the lowest-credential proposal-vote, if it exists. + readLowestPayload + /* * The following are event types that replace queries, and may warrant * a revision to make them more state-machine-esque. @@ -363,7 +374,7 @@ func (e roundInterruptionEvent) AttachConsensusVersion(v ConsensusVersionView) e } type timeoutEvent struct { - // {timeout,fastTimeout} + // {timeout,fastTimeout,speculationTimeout} T eventType RandomEntropy uint64 @@ -407,6 +418,38 @@ func (e newRoundEvent) ComparableStr() string { return e.String() } +type readLowestEvent struct { + // T is either readLowestValue or readLowestPayload + T eventType + + // Round and Period are the round and period for which to query + // the lowest-credential value and payload. This type of event + // is only sent for pipelining, which only makes sense for period + // 0, but the Period is here anyway to route to the appropriate + // proposalMachinePeriod. + Round round + Period period + + // Proposal holds the lowest-credential value. + Proposal proposalValue + // Payload holds the payload, if one exists (which is the case if PayloadOK is set). + Payload proposal + // PayloadOK is set if and only if a payload was received for the lowest-credential value. + PayloadOK bool +} + +func (e readLowestEvent) t() eventType { + return e.T +} + +func (e readLowestEvent) String() string { + return fmt.Sprintf("%v: %.5v", e.t().String(), e.Proposal.BlockDigest.String()) +} + +func (e readLowestEvent) ComparableStr() string { + return e.String() +} + type newPeriodEvent struct { // Period holds the latest period relevant to the proposalRoundMachine. Period period @@ -744,7 +787,7 @@ func zeroEvent(t eventType) event { return messageEvent{} case roundInterruption: return roundInterruptionEvent{} - case timeout, fastTimeout: + case timeout, fastTimeout, speculationTimeout: return timeoutEvent{} case newRound: return newRoundEvent{} diff --git a/agreement/eventtype_string.go b/agreement/eventtype_string.go index 9da84c1b98..2b1f26fd9c 100644 --- a/agreement/eventtype_string.go +++ b/agreement/eventtype_string.go @@ -18,40 +18,43 @@ func _() { _ = x[roundInterruption-7] _ = x[timeout-8] _ = x[fastTimeout-9] - _ = x[softThreshold-10] - _ = x[certThreshold-11] - _ = x[nextThreshold-12] - _ = x[proposalCommittable-13] - _ = x[proposalAccepted-14] - _ = x[voteFiltered-15] - _ = x[voteMalformed-16] - _ = x[bundleFiltered-17] - _ = x[bundleMalformed-18] - _ = x[payloadRejected-19] - _ = x[payloadMalformed-20] - _ = x[payloadPipelined-21] - _ = x[payloadAccepted-22] - _ = x[proposalFrozen-23] - _ = x[voteAccepted-24] - _ = x[newRound-25] - _ = x[newPeriod-26] - _ = x[readStaging-27] - _ = x[readPinned-28] - _ = x[voteFilterRequest-29] - _ = x[voteFilteredStep-30] - _ = x[nextThresholdStatusRequest-31] - _ = x[nextThresholdStatus-32] - _ = x[freshestBundleRequest-33] - _ = x[freshestBundle-34] - _ = x[dumpVotesRequest-35] - _ = x[dumpVotes-36] - _ = x[wrappedAction-37] - _ = x[checkpointReached-38] + _ = x[speculationTimeout-10] + _ = x[softThreshold-11] + _ = x[certThreshold-12] + _ = x[nextThreshold-13] + _ = x[proposalCommittable-14] + _ = x[proposalAccepted-15] + _ = x[voteFiltered-16] + _ = x[voteMalformed-17] + _ = x[bundleFiltered-18] + _ = x[bundleMalformed-19] + _ = x[payloadRejected-20] + _ = x[payloadMalformed-21] + _ = x[payloadPipelined-22] + _ = x[payloadAccepted-23] + _ = x[proposalFrozen-24] + _ = x[voteAccepted-25] + _ = x[newRound-26] + _ = x[newPeriod-27] + _ = x[readStaging-28] + _ = x[readPinned-29] + _ = x[readLowestValue-30] + _ = x[readLowestPayload-31] + _ = x[voteFilterRequest-32] + _ = x[voteFilteredStep-33] + _ = x[nextThresholdStatusRequest-34] + _ = x[nextThresholdStatus-35] + _ = x[freshestBundleRequest-36] + _ = x[freshestBundle-37] + _ = x[dumpVotesRequest-38] + _ = x[dumpVotes-39] + _ = x[wrappedAction-40] + _ = x[checkpointReached-41] } -const _eventType_name = "nonevotePresentpayloadPresentbundlePresentvoteVerifiedpayloadVerifiedbundleVerifiedroundInterruptiontimeoutfastTimeoutsoftThresholdcertThresholdnextThresholdproposalCommittableproposalAcceptedvoteFilteredvoteMalformedbundleFilteredbundleMalformedpayloadRejectedpayloadMalformedpayloadPipelinedpayloadAcceptedproposalFrozenvoteAcceptednewRoundnewPeriodreadStagingreadPinnedvoteFilterRequestvoteFilteredStepnextThresholdStatusRequestnextThresholdStatusfreshestBundleRequestfreshestBundledumpVotesRequestdumpVoteswrappedActioncheckpointReached" +const _eventType_name = "nonevotePresentpayloadPresentbundlePresentvoteVerifiedpayloadVerifiedbundleVerifiedroundInterruptiontimeoutfastTimeoutspeculationTimeoutsoftThresholdcertThresholdnextThresholdproposalCommittableproposalAcceptedvoteFilteredvoteMalformedbundleFilteredbundleMalformedpayloadRejectedpayloadMalformedpayloadPipelinedpayloadAcceptedproposalFrozenvoteAcceptednewRoundnewPeriodreadStagingreadPinnedreadLowestValuereadLowestPayloadvoteFilterRequestvoteFilteredStepnextThresholdStatusRequestnextThresholdStatusfreshestBundleRequestfreshestBundledumpVotesRequestdumpVoteswrappedActioncheckpointReached" -var _eventType_index = [...]uint16{0, 4, 15, 29, 42, 54, 69, 83, 100, 107, 118, 131, 144, 157, 176, 192, 204, 217, 231, 246, 261, 277, 293, 308, 322, 334, 342, 351, 362, 372, 389, 405, 431, 450, 471, 485, 501, 510, 523, 540} +var _eventType_index = [...]uint16{0, 4, 15, 29, 42, 54, 69, 83, 100, 107, 118, 136, 149, 162, 175, 194, 210, 222, 235, 249, 264, 279, 295, 311, 326, 340, 352, 360, 369, 380, 390, 405, 422, 439, 455, 481, 500, 521, 535, 551, 560, 573, 590} func (i eventType) String() string { if i >= eventType(len(_eventType_index)-1) { diff --git a/agreement/fuzzer/ledger_test.go b/agreement/fuzzer/ledger_test.go index a62caee4d9..6dd74f002c 100644 --- a/agreement/fuzzer/ledger_test.go +++ b/agreement/fuzzer/ledger_test.go @@ -112,6 +112,9 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil } +func (f testBlockFactory) StartSpeculativeBlockAssembly(context.Context, agreement.ValidatedBlock, crypto.Digest, bool) { +} + type testLedgerSyncFunc func(l *testLedger, r basics.Round, c agreement.Certificate) bool // If we try to read from high rounds, we panic and do not emit an error to find bugs during testing. diff --git a/agreement/msgp_gen.go b/agreement/msgp_gen.go index 19a803b759..f7bf8e64a9 100644 --- a/agreement/msgp_gen.go +++ b/agreement/msgp_gen.go @@ -3753,9 +3753,12 @@ func PeriodRouterMaxSize() (s int) { // MarshalMsg implements msgp.Marshaler func (z *player) MarshalMsg(b []byte) (o []byte) { o = msgp.Require(b, z.Msgsize()) - // map header, size 8 + // map header, size 9 + // string "ConsensusVersion" + o = append(o, 0x89, 0xb0, 0x43, 0x6f, 0x6e, 0x73, 0x65, 0x6e, 0x73, 0x75, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) + o = (*z).ConsensusVersion.MarshalMsg(o) // string "Deadline" - o = append(o, 0x88, 0xa8, 0x44, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65) + o = append(o, 0xa8, 0x44, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65) o = msgp.AppendDuration(o, (*z).Deadline) // string "FastRecoveryDeadline" o = append(o, 0xb4, 0x46, 0x61, 0x73, 0x74, 0x52, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x44, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65) @@ -3875,6 +3878,14 @@ func (z *player) UnmarshalMsg(bts []byte) (o []byte, err error) { return } } + if zb0001 > 0 { + zb0001-- + bts, err = (*z).ConsensusVersion.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "struct-from-array", "ConsensusVersion") + return + } + } if zb0001 > 0 { err = msgp.ErrTooManyArrayFields(zb0001) if err != nil { @@ -3958,6 +3969,12 @@ func (z *player) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Pending") return } + case "ConsensusVersion": + bts, err = (*z).ConsensusVersion.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "ConsensusVersion") + return + } default: err = msgp.ErrNoField(string(field)) if err != nil { @@ -3978,13 +3995,13 @@ func (_ *player) CanUnmarshalMsg(z interface{}) bool { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *player) Msgsize() (s int) { - s = 1 + 6 + (*z).Round.Msgsize() + 7 + msgp.Uint64Size + 5 + msgp.Uint64Size + 15 + msgp.Uint64Size + 9 + msgp.DurationSize + 8 + msgp.BoolSize + 21 + msgp.DurationSize + 8 + (*z).Pending.Msgsize() + s = 1 + 6 + (*z).Round.Msgsize() + 7 + msgp.Uint64Size + 5 + msgp.Uint64Size + 15 + msgp.Uint64Size + 9 + msgp.DurationSize + 8 + msgp.BoolSize + 21 + msgp.DurationSize + 8 + (*z).Pending.Msgsize() + 17 + (*z).ConsensusVersion.Msgsize() return } // MsgIsZero returns whether this is a zero value func (z *player) MsgIsZero() bool { - return ((*z).Round.MsgIsZero()) && ((*z).Period == 0) && ((*z).Step == 0) && ((*z).LastConcluding == 0) && ((*z).Deadline == 0) && ((*z).Napping == false) && ((*z).FastRecoveryDeadline == 0) && ((*z).Pending.MsgIsZero()) + return ((*z).Round.MsgIsZero()) && ((*z).Period == 0) && ((*z).Step == 0) && ((*z).LastConcluding == 0) && ((*z).Deadline == 0) && ((*z).Napping == false) && ((*z).FastRecoveryDeadline == 0) && ((*z).Pending.MsgIsZero()) && ((*z).ConsensusVersion.MsgIsZero()) } // MaxSize returns a maximum valid message size for this message type diff --git a/agreement/persistence_test.go b/agreement/persistence_test.go index fbd9323b09..19a8ac94e2 100644 --- a/agreement/persistence_test.go +++ b/agreement/persistence_test.go @@ -183,15 +183,18 @@ func randomizeDiskState() (rr rootRouter, p player) { func TestRandomizedEncodingFullDiskState(t *testing.T) { partitiontest.PartitionTest(t) for i := 0; i < 5000; i++ { + if i%100 == 0 { + t.Logf("i=%d", i) + } router, player := randomizeDiskState() a := []action{} clock := timers.MakeMonotonicClock(time.Date(2015, 1, 2, 5, 6, 7, 8, time.UTC)) log := makeServiceLogger(logging.Base()) - e1 := encode(clock, router, player, a, true) - e2 := encode(clock, router, player, a, false) - require.Equalf(t, e1, e2, "msgp and go-codec encodings differ: len(msgp)=%v, len(reflect)=%v", len(e1), len(e2)) - _, rr1, p1, _, err1 := decode(e1, clock, log, true) - _, rr2, p2, _, err2 := decode(e1, clock, log, false) + eReflect := encode(clock, router, player, a, true) // reflect=true + eMsgp := encode(clock, router, player, a, false) + require.Equalf(t, eMsgp, eReflect, "msgp and go-codec encodings differ: len(msgp)=%v, len(reflect)=%v", len(eMsgp), len(eReflect)) + _, rr1, p1, _, err1 := decode(eReflect, clock, log, true) + _, rr2, p2, _, err2 := decode(eReflect, clock, log, false) require.NoErrorf(t, err1, "reflect decoding failed") require.NoErrorf(t, err2, "msgp decoding failed") require.Equalf(t, rr1, rr2, "rootRouters decoded differently") diff --git a/agreement/player.go b/agreement/player.go index 1ae552b0b1..9b7005cde1 100644 --- a/agreement/player.go +++ b/agreement/player.go @@ -52,6 +52,9 @@ type player struct { // Pending holds the player's proposalTable, which stores proposals that // must be verified after some vote has been verified. Pending proposalTable + + // the current consensus version + ConsensusVersion protocol.ConsensusVersion } func (p *player) T() stateMachineTag { @@ -85,6 +88,10 @@ func (p *player) handle(r routerHandle, e event) []action { r.t.logTimeout(*p) } + if e.T == speculationTimeout { + return p.handleSpeculationTimeout(r, e) + } + switch p.Step { case soft: // precondition: nap = false @@ -123,6 +130,16 @@ func (p *player) handle(r routerHandle, e event) []action { } } +// handleSpeculationTimeout TODO: rename this 'timeout' is the START of speculative assembly. +func (p *player) handleSpeculationTimeout(r routerHandle, e timeoutEvent) []action { + if e.Proto.Err != nil { + r.t.log.Errorf("failed to read protocol version for speculationTimeout event (proto %v): %v", e.Proto.Version, e.Proto.Err) + return nil + } + + return p.startSpeculativeBlockAsm(r, nil, false) +} + func (p *player) handleFastTimeout(r routerHandle, e timeoutEvent) []action { if e.Proto.Err != nil { r.t.log.Errorf("failed to read protocol version for fastTimeout event (proto %v): %v", e.Proto.Version, e.Proto.Err) @@ -161,6 +178,7 @@ func (p *player) issueSoftVote(r routerHandle) (actions []action) { // If we arrive due to fast-forward/soft threshold; then answer.Bottom = false and answer.Proposal = bottom // and we should soft-vote normally (not based on the starting value) a.Proposal = nextStatus.Proposal + // TODO: how do we speculative block assemble based on nextStatus.Proposal? return append(actions, a) } @@ -177,8 +195,10 @@ func (p *player) issueSoftVote(r routerHandle) (actions []action) { return nil } - // original proposal: vote for it - return append(actions, a) + // original proposal: vote for it, maybe build + actions = append(actions, a) + actions = p.startSpeculativeBlockAsm(r, actions, false) + return actions } // A committableEvent is the trigger for issuing a cert vote. @@ -217,6 +237,27 @@ func (p *player) issueNextVote(r routerHandle) []action { return actions } +func (p *player) startSpeculativeBlockAsm(r routerHandle, actions []action, onlyIfStarted bool) []action { + if p.Period != 0 { + // If not period 0, cautiously do a simpler protocol. + return actions + } + // get the best proposal we have + re := readLowestEvent{T: readLowestPayload, Round: p.Round, Period: p.Period} + re = r.dispatch(*p, re, proposalMachineRound, p.Round, p.Period, 0).(readLowestEvent) + + // if we have its payload and its been validated already, start speculating on top of it + if re.PayloadOK && re.Payload.ve != nil { + a := pseudonodeAction{T: speculativeAssembly, Round: p.Round, Period: p.Period, ValidatedBlock: re.Payload.ve, Proposal: re.Proposal} + if onlyIfStarted { + // only re-start speculation if we had already started it but got a better block + a.T = speculativeAssemblyIfStarted + } + return append(actions, a) + } + return actions +} + func (p *player) issueFastVote(r routerHandle) (actions []action) { actions = p.partitionPolicy(r) @@ -585,6 +626,13 @@ func (p *player) handleMessageEvent(r routerHandle, e messageEvent) (actions []a actions = append(actions, a) } + // StartSpeculativeBlockAssembly every time we validate a proposal + // TODO: maybe only do this if after speculation has started; interrupt speculation on a block when we get a better block + // TODO: maybe don't do this at all and just delete it? + if ef.t() == payloadAccepted { + actions = p.startSpeculativeBlockAsm(r, actions, true) + } + // If the payload is valid, check it against any received cert threshold. // Of course, this should only trigger for payloadVerified case. // This allows us to handle late payloads (relative to cert-bundles, i.e., certificates) without resorting to catchup. diff --git a/agreement/player_test.go b/agreement/player_test.go index 75987c2ed0..441ebd1469 100644 --- a/agreement/player_test.go +++ b/agreement/player_test.go @@ -114,13 +114,28 @@ func simulateProposalPayloads(t *testing.T, router *rootRouter, player *player, } } +// most player code returns a fixed number of actions, but some player code sometimes adds an exctra speculative block assembly action that we filter out to make tests simpler +func ignoreSpeculativeAssembly(a []action) []action { + var out []action + for _, ia := range a { + switch ia.t() { + case speculativeAssembly, speculativeAssemblyIfStarted: + // do not copy out + default: + out = append(out, ia) + } + } + return out +} + func simulateProposals(t *testing.T, router *rootRouter, player *player, voteBatch []event, payloadBatch []event) { for i, e := range voteBatch { var res []action *player, res = router.submitTop(&playerTracer, *player, e) - earlier := res + earlier := ignoreSpeculativeAssembly(res) *player, res = router.submitTop(&playerTracer, *player, payloadBatch[i]) + res = ignoreSpeculativeAssembly(res) if len(res) != len(earlier) { panic("proposal action mismatch") } @@ -136,13 +151,25 @@ func simulateTimeoutExpectSoft(t *testing.T, router *rootRouter, player *player, var res []action e := makeTimeoutEvent() *player, res = router.submitTop(&playerTracer, *player, e) - if len(res) != 1 { - panic("wrong number of actions") + // one or two actions, one must be attest, other _may_ be speculativeAssembly + require.True(t, 1 <= len(res) && len(res) <= 2, "wrong number of actions") + + hasAttest := false + var a1x action + + for _, ax := range res { + switch ax.t() { + case attest: + hasAttest = true + a1x = ax + case speculativeAssembly: + // ok + default: + t.Fatalf("bad action type: %v", ax.t()) + } } - - a1x := res[0] - if a1x.t() != attest { - panic("action 1 is not attest") + if !hasAttest { + panic("missing attest") } a1 := a1x.(pseudonodeAction) diff --git a/agreement/proposal.go b/agreement/proposal.go index bf021f2cfe..48a72b7c15 100644 --- a/agreement/proposal.go +++ b/agreement/proposal.go @@ -84,7 +84,7 @@ func (p unauthenticatedProposal) value() proposalValue { } } -// A proposal is an Block along with everything needed to validate it. +// A proposal is a Block along with everything needed to validate it. type proposal struct { unauthenticatedProposal diff --git a/agreement/proposalStore.go b/agreement/proposalStore.go index 080609de50..3feea81d76 100644 --- a/agreement/proposalStore.go +++ b/agreement/proposalStore.go @@ -352,6 +352,15 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event { se.Committable = ea.Assembled se.Payload = ea.Payload return se + case readLowestPayload: + re := e.(readLowestEvent) + re.T = readLowestValue + re = r.dispatch(p, re, proposalMachinePeriod, re.Round, re.Period, 0).(readLowestEvent) + re.T = readLowestPayload + ea := store.Assemblers[re.Proposal] + re.PayloadOK = ea.Assembled + re.Payload = ea.Payload + return re case readPinned: se := e.(pinnedValueEvent) ea := store.Assemblers[store.Pinned] // If pinned is bottom, assembled/payloadOK = false, payload = bottom diff --git a/agreement/proposalTracker.go b/agreement/proposalTracker.go index 59ffb77a28..7627cdd28d 100644 --- a/agreement/proposalTracker.go +++ b/agreement/proposalTracker.go @@ -165,6 +165,11 @@ func (t *proposalTracker) handle(r routerHandle, p player, e event) event { t.Freezer = t.Freezer.freeze() return e + case readLowestValue: + e := e.(readLowestEvent) + e.Proposal = t.Freezer.Lowest.R.Proposal + return e + case softThreshold, certThreshold: e := e.(thresholdEvent) t.Staging = e.Proposal diff --git a/agreement/proposalTrackerContract.go b/agreement/proposalTrackerContract.go index 2b995dfcac..5451d8152d 100644 --- a/agreement/proposalTrackerContract.go +++ b/agreement/proposalTrackerContract.go @@ -32,7 +32,7 @@ type proposalTrackerContract struct { // TODO check concrete types of events func (c *proposalTrackerContract) pre(p player, in event) (pre []error) { switch in.t() { - case voteVerified, proposalFrozen, softThreshold, certThreshold, voteFilterRequest, readStaging: + case voteVerified, proposalFrozen, softThreshold, certThreshold, voteFilterRequest, readStaging, readLowestValue, readLowestPayload: default: pre = append(pre, fmt.Errorf("incoming event has invalid type: %v", in.t())) } diff --git a/agreement/pseudonode.go b/agreement/pseudonode.go index 78f6674d7b..3bd55416f2 100644 --- a/agreement/pseudonode.go +++ b/agreement/pseudonode.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/account" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/logging" @@ -60,6 +61,9 @@ type pseudonode interface { // It returns an error if the pseudonode is unable to perform this. MakeProposals(ctx context.Context, r round, p period) (<-chan externalEvent, error) + // StartSpeculativeBlockAssembly calls the BlockFactory's StartSpeculativeBlockAssembly on a new go routine. + StartSpeculativeBlockAssembly(ctx context.Context, ve ValidatedBlock, blockHash crypto.Digest, onlyIfStarted bool) + // MakeVotes returns a vote for a given proposal in some round, period, and step. // // The passed-in context may be used to cancel vote creation and close the channel immediately. @@ -185,6 +189,11 @@ func (n asyncPseudonode) MakeProposals(ctx context.Context, r round, p period) ( } } +func (n asyncPseudonode) StartSpeculativeBlockAssembly(ctx context.Context, ve ValidatedBlock, blockHash crypto.Digest, onlyIfStarted bool) { + // TODO: make this synchronous instead of thread? (thread creation likely moving to inside transactionPool) + go n.factory.StartSpeculativeBlockAssembly(ctx, ve, blockHash, onlyIfStarted) +} + func (n asyncPseudonode) MakeVotes(ctx context.Context, r round, p period, s step, prop proposalValue, persistStateDone chan error) (chan externalEvent, error) { proposalTask := n.makeVotesTask(ctx, r, p, s, prop, persistStateDone) if len(proposalTask.participation) == 0 { diff --git a/agreement/service.go b/agreement/service.go index 3cac126193..708b37c7fa 100644 --- a/agreement/service.go +++ b/agreement/service.go @@ -80,9 +80,10 @@ type parameters Parameters // externalDemuxSignals used to syncronize the external signals that goes to the demux with the main loop. type externalDemuxSignals struct { - Deadline time.Duration - FastRecoveryDeadline time.Duration - CurrentRound round + Deadline time.Duration + FastRecoveryDeadline time.Duration + SpeculativeBlockAsmDeadline time.Duration + CurrentRound round } // MakeService creates a new Agreement Service instance given a set of Parameters. @@ -147,6 +148,7 @@ func (s *Service) Start() { input := make(chan externalEvent) output := make(chan []action) ready := make(chan externalDemuxSignals) + // TODO: use demuxOne() inside of mainLoop() instead of this nonsense pair of threads go s.demuxLoop(ctx, input, output, ready) go s.mainLoop(input, output, ready) } @@ -166,7 +168,7 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out for a := range output { s.do(ctx, a) extSignals := <-ready - e, ok := s.demux.next(s, extSignals.Deadline, extSignals.FastRecoveryDeadline, extSignals.CurrentRound) + e, ok := s.demux.next(s, extSignals.Deadline, extSignals.FastRecoveryDeadline, extSignals.SpeculativeBlockAsmDeadline, extSignals.CurrentRound) if !ok { close(input) break @@ -179,6 +181,12 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out close(s.done) } +// TODO: use demuxOne() inside mainLoop() instead of having a pair of synchronous go threads trading off via chan +func (s *Service) demuxOne(ctx context.Context, a []action, extSignals externalDemuxSignals) (e externalEvent, ok bool) { + s.do(ctx, a) + return s.demux.next(s, extSignals.Deadline, extSignals.FastRecoveryDeadline, extSignals.SpeculativeBlockAsmDeadline, extSignals.CurrentRound) +} + // mainLoop drives the state machine. // // After possibly restoring from disk and then initializing, it does the following in a loop: @@ -225,8 +233,17 @@ func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, r } for { + status.ConsensusVersion, err = s.Ledger.ConsensusVersion(status.Round.SubSaturate(2)) + if err != nil { + s.Panicf("cannot read latest consensus version, round %d: %v", status.Round.SubSaturate(2), err) + } + + // set speculative block assembly based on the current local configuration + specClock := SpeculativeBlockAsmTime(status.Period, status.ConsensusVersion, s.parameters.Local.SpeculativeAsmTimeOffset) + + // TODO: e, ok := s.demuxOne(ctx, a, externalDemuxSignals{Deadline: status.Deadline, FastRecoveryDeadline: status.FastRecoveryDeadline, SpeculativeBlockAsmDeadline: specClock, CurrentRound: status.Round}) output <- a - ready <- externalDemuxSignals{Deadline: status.Deadline, FastRecoveryDeadline: status.FastRecoveryDeadline, CurrentRound: status.Round} + ready <- externalDemuxSignals{Deadline: status.Deadline, FastRecoveryDeadline: status.FastRecoveryDeadline, SpeculativeBlockAsmDeadline: specClock, CurrentRound: status.Round} e, ok := <-input if !ok { break diff --git a/agreement/service_test.go b/agreement/service_test.go index e24f81890c..1af0f4e1e5 100644 --- a/agreement/service_test.go +++ b/agreement/service_test.go @@ -150,7 +150,7 @@ type testingNetworkEndpoint struct { type nodeID int // bufferCapacity is per channel -func makeTestingNetwork(nodes int, bufferCapacity int, validator BlockValidator) *testingNetwork { +func makeTestingNetwork(t *testing.T, nodes int, bufferCapacity int, validator BlockValidator) *testingNetwork { n := new(testingNetwork) n.validator = validator @@ -168,6 +168,7 @@ func makeTestingNetwork(nodes int, bufferCapacity int, validator BlockValidator) m := new(coserviceMonitor) m.id = i + m.t = t n.monitors[nodeID(i)] = m } @@ -507,10 +508,13 @@ type activityMonitor struct { quiet chan struct{} cb func(nodeID, map[coserviceType]uint) + + t *testing.T } -func makeActivityMonitor() (m *activityMonitor) { +func makeActivityMonitor(t *testing.T) (m *activityMonitor) { m = new(activityMonitor) + m.t = t m.sums = make(map[nodeID]uint) m.listeners = make(map[nodeID]coserviceListener) m.activity = make(chan struct{}, 1000) @@ -549,17 +553,29 @@ func (m *activityMonitor) waitForActivity() { } func (m *activityMonitor) waitForQuiet() { + if m.t.Failed() { + return + } + dt := time.NewTicker(100 * time.Millisecond) + defer dt.Stop() + ticksRemaining := 100 select { case <-m.quiet: - case <-time.After(10 * time.Second): - m.dump() + case <-dt.C: + if m.t.Failed() { + return + } + if ticksRemaining <= 0 { + m.dump() - var buf [1000000]byte - n := runtime.Stack(buf[:], true) - fmt.Println("Printing goroutine dump of size", n) - fmt.Println(string(buf[:n])) + var buf [1000000]byte + n := runtime.Stack(buf[:], true) + fmt.Println("Printing goroutine dump of size", n) + fmt.Println(string(buf[:n])) - panic("timed out waiting for quiet...") + panic("timed out waiting for quiet...") + } + ticksRemaining-- } } @@ -708,11 +724,7 @@ func setupAgreementWithValidator(t *testing.T, numNodes int, traceLevel traceLev accounts, balances := createTestAccountsAndBalances(t, numNodes, (&[32]byte{})[:]) baseLedger := ledgerFactory(balances) - // logging - log := logging.Base() - f, _ := os.Create(t.Name() + ".log") - log.SetJSONFormatter() - log.SetOutput(f) + log := logging.TestingLog(t) log.SetLevel(logging.Debug) // node setup @@ -720,8 +732,8 @@ func setupAgreementWithValidator(t *testing.T, numNodes int, traceLevel traceLev ledgers := make([]Ledger, numNodes) dbAccessors := make([]db.Accessor, numNodes) services := make([]*Service, numNodes) - baseNetwork := makeTestingNetwork(numNodes, bufCap, validator) - am := makeActivityMonitor() + baseNetwork := makeTestingNetwork(t, numNodes, bufCap, validator) + am := makeActivityMonitor(t) for i := 0; i < numNodes; i++ { accessor, err := db.MakeAccessor(t.Name()+"_"+strconv.Itoa(i)+"_crash.db", false, true) @@ -747,7 +759,7 @@ func setupAgreementWithValidator(t *testing.T, numNodes int, traceLevel traceLev BlockFactory: testBlockFactory{Owner: i}, Clock: clocks[i], Accessor: accessor, - Local: config.Local{CadaverSizeTarget: 10000000}, + Local: config.Local{CadaverSizeTarget: 10000000, SpeculativeAsmTimeOffset: 9999999999}, RandomSource: &testingRand{}, } @@ -809,12 +821,11 @@ func (m *coserviceMonitor) clearClock() { } } -func expectNewPeriod(clocks []timers.Clock, zeroes uint) (newzeroes uint) { +func expectNewPeriod(t *testing.T, clocks []timers.Clock, zeroes uint) (newzeroes uint) { zeroes++ for i := range clocks { if clocks[i].(*testingClock).zeroes != zeroes { - errstr := fmt.Sprintf("unexpected number of zeroes: %v != %v", clocks[i].(*testingClock).zeroes, zeroes) - panic(errstr) + require.Equal(t, zeroes, clocks[i].(*testingClock).zeroes, "clocks[%d] unexpected number of zeroes %v != %v", i, zeroes, clocks[i].(*testingClock).zeroes) } } return zeroes @@ -841,9 +852,10 @@ func triggerGlobalTimeout(d time.Duration, clocks []timers.Clock, activityMonito activityMonitor.waitForQuiet() } -func runRound(clocks []timers.Clock, activityMonitor *activityMonitor, zeroes uint, filterTimeout time.Duration) (newzeroes uint) { +func runRound(t *testing.T, clocks []timers.Clock, activityMonitor *activityMonitor, zeroes uint, filterTimeout time.Duration) (newzeroes uint) { triggerGlobalTimeout(filterTimeout, clocks, activityMonitor) - return expectNewPeriod(clocks, zeroes) + t.Log("runRound a") + return expectNewPeriod(t, clocks, zeroes) } func sanityCheck(startRound round, numRounds round, ledgers []Ledger) { @@ -882,16 +894,21 @@ func simulateAgreementWithLedgerFactory(t *testing.T, numNodes int, numRounds in for i := 0; i < numNodes; i++ { services[i].Start() } + t.Logf("%d nodes started", numNodes) activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + t.Log("all quiet") + zeroes := expectNewPeriod(t, clocks, 0) + t.Logf("runRound [%d]clocks", len(clocks)) // run round with round-specific consensus version first (since fix in #1896) version, _ := baseLedger.ConsensusVersion(ParamsRound(startRound)) - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + t.Logf("zeroes %d", zeroes) for j := 1; j < numRounds; j++ { + t.Logf("test round j=%d", j) version, _ := baseLedger.ConsensusVersion(ParamsRound(baseLedger.NextRound() + basics.Round(j-1))) - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { @@ -1027,11 +1044,11 @@ func TestAgreementFastRecoveryDownEarly(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // force fast partition recovery into bottom @@ -1049,19 +1066,19 @@ func TestAgreementFastRecoveryDownEarly(t *testing.T) { zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(firstFPR, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // terminate on period 1 { baseNetwork.repairAll() triggerGlobalTimeout(FilterTimeout(1, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { @@ -1085,11 +1102,11 @@ func TestAgreementFastRecoveryDownMiss(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // force fast partition recovery into bottom @@ -1130,19 +1147,19 @@ func TestAgreementFastRecoveryDownMiss(t *testing.T) { zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(secondFPR, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // terminate on period 1 { baseNetwork.repairAll() triggerGlobalTimeout(FilterTimeout(1, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { @@ -1166,11 +1183,11 @@ func TestAgreementFastRecoveryLate(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // force fast partition recovery into value @@ -1232,14 +1249,14 @@ func TestAgreementFastRecoveryLate(t *testing.T) { zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(secondFPR, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // terminate on period 1 { baseNetwork.repairAll() triggerGlobalTimeout(FilterTimeout(1, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } for _, l := range ledgers { @@ -1255,7 +1272,7 @@ func TestAgreementFastRecoveryLate(t *testing.T) { // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { @@ -1279,11 +1296,11 @@ func TestAgreementFastRecoveryRedo(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // force fast partition recovery into value @@ -1345,7 +1362,7 @@ func TestAgreementFastRecoveryRedo(t *testing.T) { zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(secondFPR, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // fail period 1 with value again @@ -1386,14 +1403,14 @@ func TestAgreementFastRecoveryRedo(t *testing.T) { zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(secondFPR, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // terminate on period 2 { baseNetwork.repairAll() triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } for _, l := range ledgers { @@ -1409,7 +1426,7 @@ func TestAgreementFastRecoveryRedo(t *testing.T) { // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { @@ -1433,11 +1450,11 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // fail period 0 @@ -1447,7 +1464,7 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) { zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // fail period 1 on bottom with block @@ -1456,19 +1473,19 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) { zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // terminate on period 2 { baseNetwork.repairAll() triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { @@ -1492,11 +1509,11 @@ func TestAgreementLateCertBug(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // delay minority cert votes to force period 1 @@ -1509,7 +1526,7 @@ func TestAgreementLateCertBug(t *testing.T) { baseNetwork.repairAll() triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // terminate on period 0 in period 1 @@ -1521,12 +1538,12 @@ func TestAgreementLateCertBug(t *testing.T) { baseNetwork.finishAllMulticast() activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { @@ -1550,11 +1567,11 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // force partition recovery into value @@ -1585,7 +1602,7 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) { } triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) require.Equal(t, 4, int(zeroes)) } @@ -1612,7 +1629,7 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) { } triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) require.Equal(t, 5, int(zeroes)) } @@ -1621,13 +1638,13 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) { { baseNetwork.repairAll() triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) require.Equal(t, 6, int(zeroes)) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { services[i].Shutdown() @@ -1650,11 +1667,11 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // force partition recovery into value. @@ -1690,7 +1707,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) { return params }) triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) require.Equal(t, 4, int(zeroes)) } @@ -1716,7 +1733,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) { } } triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } @@ -1724,13 +1741,13 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) { { baseNetwork.repairAll() triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) require.Equal(t, 6, int(zeroes)) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { services[i].Shutdown() @@ -1753,11 +1770,11 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // force partition recovery into both bottom and value. one node enters bottom, the rest enter value @@ -1815,7 +1832,7 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) { lower, upper := (next + 1).nextVoteRanges() delta := time.Duration(testingRand{}.Uint64() % uint64(upper-lower)) triggerGlobalTimeout(lower+delta, clocks[1:], activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) require.Equal(t, 4, int(zeroes)) } @@ -1842,20 +1859,20 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) { } triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // Finish in period 2 { baseNetwork.repairAll() triggerGlobalTimeout(FilterTimeout(2, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) require.Equal(t, 6, int(zeroes)) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { services[i].Shutdown() @@ -1878,11 +1895,11 @@ func TestAgreementSlowPayloadsPreDeadline(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // run round and then start pocketing payloads @@ -1890,7 +1907,7 @@ func TestAgreementSlowPayloadsPreDeadline(t *testing.T) { closeFn := baseNetwork.pocketAllCompound(pocket) // (takes effect next round) { triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // run round with late payload @@ -1908,12 +1925,12 @@ func TestAgreementSlowPayloadsPreDeadline(t *testing.T) { baseNetwork.finishAllMulticast() activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { services[i].Shutdown() @@ -1936,11 +1953,11 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // run round and then start pocketing payloads @@ -1948,7 +1965,7 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) { closeFn := baseNetwork.pocketAllCompound(pocket) // (takes effect next round) { triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // force network into period 1 by delaying proposals @@ -1956,7 +1973,7 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) { triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor) zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // recover in period 1 @@ -1973,12 +1990,12 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) { zeroes = expectNoNewPeriod(clocks, zeroes) triggerGlobalTimeout(FilterTimeout(1, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { services[i].Shutdown() @@ -2001,11 +2018,11 @@ func TestAgreementLargePeriods(t *testing.T) { activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // partition the network, run until period 60 @@ -2017,7 +2034,7 @@ func TestAgreementLargePeriods(t *testing.T) { baseNetwork.repairAll() triggerGlobalTimeout(deadlineTimeout, clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) require.Equal(t, 4+p, int(zeroes)) } } @@ -2025,12 +2042,12 @@ func TestAgreementLargePeriods(t *testing.T) { // terminate { triggerGlobalTimeout(FilterTimeout(60, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // run two more rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } for i := 0; i < numNodes; i++ { services[i].Shutdown() @@ -2099,11 +2116,11 @@ func TestAgreementRegression_WrongPeriodPayloadVerificationCancellation_8ba23942 } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds for j := 0; j < 2; j++ { - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) } // run round and then start pocketing payloads, suspending validation @@ -2112,7 +2129,7 @@ func TestAgreementRegression_WrongPeriodPayloadVerificationCancellation_8ba23942 closeFn := baseNetwork.pocketAllCompound(pocket0) // (takes effect next round) { triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) } // force network into period 1 by failing period 0, entering with bottom and no soft threshold (to prevent proposal value pinning) @@ -2205,15 +2222,15 @@ func TestAgreementRegression_WrongPeriodPayloadVerificationCancellation_8ba23942 } baseNetwork.finishAllMulticast() - zeroes = expectNewPeriod(clocks, zeroes) + zeroes = expectNewPeriod(t, clocks, zeroes) activityMonitor.waitForQuiet() // run two more rounds //for j := 0; j < 2; j++ { - // zeroes = runRound(clocks, activityMonitor, zeroes, period(1-j)) + // zeroes = runRound(t, clocks, activityMonitor, zeroes, period(1-j)) //} - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(1, version)) - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(1, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) for i := 0; i < numNodes; i++ { services[i].Shutdown() @@ -2256,9 +2273,9 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) { } activityMonitor.waitForActivity() activityMonitor.waitForQuiet() - zeroes := expectNewPeriod(clocks, 0) + zeroes := expectNewPeriod(t, clocks, 0) // run two rounds - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) // make sure relay does not see block proposal for round 3 baseNetwork.intercept(func(params multicastParams) multicastParams { if params.tag == protocol.ProposalPayloadTag { @@ -2289,7 +2306,7 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) { return params }) - zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version)) + zeroes = runRound(t, clocks, activityMonitor, zeroes, FilterTimeout(0, version)) // Round 3: // First partition the relay to prevent it from seeing certificate or block @@ -2313,7 +2330,7 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) { }) // And with some hypothetical second relay the network achieves consensus on a certificate and block. triggerGlobalTimeout(FilterTimeout(0, version), clocks, activityMonitor) - zeroes = expectNewPeriod(clocks[1:], zeroes) + zeroes = expectNewPeriod(t, clocks[1:], zeroes) require.Equal(t, uint(3), clocks[0].(*testingClock).zeroes) close(pocketCert) @@ -2332,7 +2349,7 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) { // this relay must still relay initial messages. Note that payloads were already relayed with // the previous global timeout. triggerGlobalTimeout(FilterTimeout(0, version), clocks[1:], activityMonitor) - zeroes = expectNewPeriod(clocks[1:], zeroes) + zeroes = expectNewPeriod(t, clocks[1:], zeroes) require.Equal(t, uint(3), clocks[0].(*testingClock).zeroes) for i := 0; i < numNodes; i++ { diff --git a/agreement/types.go b/agreement/types.go index bc2decb7d0..eae8942b56 100644 --- a/agreement/types.go +++ b/agreement/types.go @@ -38,6 +38,15 @@ func FilterTimeout(p period, v protocol.ConsensusVersion) time.Duration { return config.Consensus[v].AgreementFilterTimeout } +// SpeculativeBlockAsmTime is the time at which we would like to begin speculative assembly +func SpeculativeBlockAsmTime(p period, v protocol.ConsensusVersion, speculativeAsmTimeOffset time.Duration) time.Duration { + hardwait := FilterTimeout(p, v) + if hardwait > speculativeAsmTimeOffset { + return hardwait - speculativeAsmTimeOffset + } + return time.Duration(0) +} + // DeadlineTimeout is the duration of the second agreement step. func DeadlineTimeout() time.Duration { return deadlineTimeout diff --git a/config/localTemplate.go b/config/localTemplate.go index f13fc97f1f..672a55abd4 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -517,6 +517,9 @@ type Local struct { // only relevant if TxIncomingFilteringFlags is non-zero TxIncomingFilterMaxSize uint64 `version[28]:"500000"` + SpeculativeAsmTimeOffset time.Duration `version[30]:"400000000"` + SpeculativeAssemblyDisable bool `version[30]:"false"` + // BlockServiceMemCap is the memory capacity in bytes which is allowed for the block service to use for HTTP block requests. // When it exceeds this capacity, it redirects the block requests to a different node BlockServiceMemCap uint64 `version[28]:"500000000"` diff --git a/config/local_defaults.go b/config/local_defaults.go index a3c55c1e1b..b92833daf7 100644 --- a/config/local_defaults.go +++ b/config/local_defaults.go @@ -122,6 +122,8 @@ var defaultLocal = Local{ RestReadTimeoutSeconds: 15, RestWriteTimeoutSeconds: 120, RunHosted: false, + SpeculativeAsmTimeOffset: 0, + SpeculativeAssemblyDisable: false, StorageEngine: "sqlite", SuggestedFeeBlockHistory: 3, SuggestedFeeSlidingWindowSize: 50, diff --git a/data/datatest/impls.go b/data/datatest/impls.go index fefcb054da..6b5ce99b06 100644 --- a/data/datatest/impls.go +++ b/data/datatest/impls.go @@ -64,6 +64,9 @@ func (i entryFactoryImpl) AssembleBlock(round basics.Round) (agreement.Validated return validatedBlock{blk: &b}, nil } +func (i entryFactoryImpl) StartSpeculativeBlockAssembly(context.Context, agreement.ValidatedBlock, crypto.Digest, bool) { +} + // WithSeed implements the agreement.ValidatedBlock interface. func (ve validatedBlock) WithSeed(s committee.Seed) agreement.ValidatedBlock { newblock := ve.blk.WithSeed(s) diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index a03baea4f0..b6d0188664 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -17,6 +17,7 @@ package pools import ( + "context" "errors" "fmt" "sync" @@ -26,6 +27,7 @@ import ( "github.com/algorand/go-deadlock" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" @@ -35,8 +37,11 @@ import ( "github.com/algorand/go-algorand/logging/telemetryspec" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/condvar" + "github.com/algorand/go-algorand/util/metrics" ) +var speculativeAssemblyDiscarded = metrics.NewTagCounter("algod_speculative_assembly_discarded", "started speculative block assembly but a different block won later") + // A TransactionPool prepares valid blocks for proposal and caches // validated transaction groups. // @@ -49,8 +54,9 @@ import ( // TransactionPool.AssembleBlock constructs a valid block for // proposal given a deadline. type TransactionPool struct { - // feePerByte is stored at the beginning of this struct to ensure it has a 64 bit aligned address. This is needed as it's being used - // with atomic operations which require 64 bit alignment on arm. + // feePerByte is stored at the beginning of this struct to ensure it has a + // 64 bit aligned address. This is needed as it's being used with atomic + // operations which require 64 bit alignment on arm. feePerByte uint64 // const @@ -58,7 +64,7 @@ type TransactionPool struct { logAssembleStats bool expFeeFactor uint64 txPoolMaxSize int - ledger *ledger.Ledger + ledger ledger.LedgerForEvaluator mu deadlock.Mutex cond sync.Cond @@ -93,6 +99,22 @@ type TransactionPool struct { // proposalAssemblyTime is the ProposalAssemblyTime configured for this node. proposalAssemblyTime time.Duration + ctx context.Context + + // specBlockMu protects speculative block assembly vars below: + specBlockMu deadlock.Mutex + specActive bool + // specBlockDigest ValidatedBlock.Block().Digest() + specBlockDigest crypto.Digest + cancelSpeculativeAssembly context.CancelFunc + speculativePool *TransactionPool + // specBlockCh has an assembled speculative block + //specBlockCh chan *ledgercore.ValidatedBlock + // specAsmDone channel is closed when there is no speculative assembly + //specAsmDone <-chan struct{} + // TODO: feed into assemblyMu & assemblyCond above! + + cfg config.Local // stateproofOverflowed indicates that a stateproof transaction was allowed to // exceed the txPoolMaxSize. This flag is reset to false OnNewBlock stateproofOverflowed bool @@ -114,6 +136,7 @@ func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Lo if cfg.TxPoolExponentialIncreaseFactor < 1 { cfg.TxPoolExponentialIncreaseFactor = 1 } + pool := TransactionPool{ pendingTxids: make(map[transactions.Txid]transactions.SignedTxn), rememberedTxids: make(map[transactions.Txid]transactions.SignedTxn), @@ -126,15 +149,57 @@ func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Lo txPoolMaxSize: cfg.TxPoolSize, proposalAssemblyTime: cfg.ProposalAssemblyTime, log: log, + cfg: cfg, + ctx: context.Background(), } pool.cond.L = &pool.mu pool.assemblyCond.L = &pool.assemblyMu - pool.recomputeBlockEvaluator(nil, 0) + pool.recomputeBlockEvaluator(nil, 0, false) return &pool } -// poolAsmResults is used to syncronize the state of the block assembly process. The structure reading/writing is syncronized -// via the pool.assemblyMu lock. +// TODO: this needs a careful read for lock/shallow-copy issues +func (pool *TransactionPool) copyTransactionPoolOverSpecLedger(ctx context.Context, vb *ledgercore.ValidatedBlock) (*TransactionPool, context.CancelFunc, error) { + specLedger, err := ledger.MakeValidatedBlockAsLFE(vb, pool.ledger) + if err != nil { + return nil, nil, err + } + + copyPoolctx, cancel := context.WithCancel(ctx) + + copy := TransactionPool{ + pendingTxids: make(map[transactions.Txid]transactions.SignedTxn), // pendingTxIds is only used for stats and hints + pendingTxGroups: pool.pendingTxGroups[:], + rememberedTxids: make(map[transactions.Txid]transactions.SignedTxn), + expiredTxCount: make(map[basics.Round]int), + ledger: specLedger, + statusCache: makeStatusCache(pool.cfg.TxPoolSize), + logProcessBlockStats: pool.cfg.EnableProcessBlockStats, + logAssembleStats: pool.cfg.EnableAssembleStats, + expFeeFactor: pool.cfg.TxPoolExponentialIncreaseFactor, + txPoolMaxSize: pool.cfg.TxPoolSize, + proposalAssemblyTime: pool.cfg.ProposalAssemblyTime, + assemblyRound: specLedger.Latest() + 1, + log: pool.log, + cfg: pool.cfg, + ctx: copyPoolctx, + } + // TODO: make an 'assembly context struct' with a subset of TransactionPool fields? + copy.cond.L = ©.mu + copy.assemblyCond.L = ©.assemblyMu + + //pool.cancelSpeculativeAssembly = cancel + + // specBlockCh := make(chan *ledgercore.ValidatedBlock, 1) + // pool.specBlockMu.Lock() + // pool.specBlockCh = specBlockCh + // pool.specBlockMu.Unlock() + + return ©, cancel, nil +} + +// poolAsmResults is used to syncronize the state of the block assembly process. +// The structure reading/writing is syncronized via the pool.assemblyMu lock. type poolAsmResults struct { // the ok variable indicates whether the assembly for the block roundStartedEvaluating was complete ( i.e. ok == true ) or // whether it's still in-progress. @@ -142,11 +207,13 @@ type poolAsmResults struct { blk *ledgercore.ValidatedBlock stats telemetryspec.AssembleBlockMetrics err error - // roundStartedEvaluating is the round which we were attempted to evaluate last. It's a good measure for - // which round we started evaluating, but not a measure to whether the evaluation is complete. + // roundStartedEvaluating is the round which we were attempted to evaluate + // last. It's a good measure for which round we started evaluating, but not + // a measure to whether the evaluation is complete. roundStartedEvaluating basics.Round - // assemblyCompletedOrAbandoned is *not* protected via the pool.assemblyMu lock and should be accessed only from the OnNewBlock goroutine. - // it's equivalent to the "ok" variable, and used for avoiding taking the lock. + // assemblyCompletedOrAbandoned is *not* protected via the pool.assemblyMu + // lock and should be accessed only from the OnNewBlock goroutine. it's + // equivalent to the "ok" variable, and used for avoiding taking the lock. assemblyCompletedOrAbandoned bool } @@ -181,7 +248,14 @@ func (pool *TransactionPool) Reset() { pool.numPendingWholeBlocks = 0 pool.pendingBlockEvaluator = nil pool.statusCache.reset() - pool.recomputeBlockEvaluator(nil, 0) + pool.recomputeBlockEvaluator(nil, 0, false) + + // cancel speculative assembly and clear its result + pool.specBlockMu.Lock() + if pool.cancelSpeculativeAssembly != nil { + pool.cancelSpeculativeAssembly() + } + pool.specBlockMu.Unlock() } // NumExpired returns the number of transactions that expired at the @@ -337,7 +411,7 @@ func (pool *TransactionPool) computeFeePerByte() uint64 { return feePerByte } -// checkSufficientFee take a set of signed transactions and verifies that each transaction has +// checkSufficientFee takes a set of signed transactions and verifies that each transaction has // sufficient fee to get into the transaction pool func (pool *TransactionPool) checkSufficientFee(txgroup []transactions.SignedTxn) error { // Special case: the state proof transaction, if issued from the @@ -395,17 +469,18 @@ func (pool *TransactionPool) remember(txgroup []transactions.SignedTxn) error { params := poolIngestParams{ recomputing: false, } - return pool.ingest(txgroup, params) + _, err := pool.ingest(txgroup, params, false) + return err } // add tries to add the transaction group to the pool, bypassing the fee // priority checks. -func (pool *TransactionPool) add(txgroup []transactions.SignedTxn, stats *telemetryspec.AssembleBlockMetrics) error { +func (pool *TransactionPool) add(txgroup []transactions.SignedTxn, stats *telemetryspec.AssembleBlockMetrics, stopAtFirstFullBlock bool) (stoppedAtBlock bool, err error) { params := poolIngestParams{ recomputing: true, stats: stats, } - return pool.ingest(txgroup, params) + return pool.ingest(txgroup, params, stopAtFirstFullBlock) } // ingest checks whether a transaction group could be remembered in the pool, @@ -413,9 +488,9 @@ func (pool *TransactionPool) add(txgroup []transactions.SignedTxn, stats *teleme // // ingest assumes that pool.mu is locked. It might release the lock // while it waits for OnNewBlock() to be called. -func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poolIngestParams) error { +func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poolIngestParams, stopAtFirstFullBlock bool) (stoppedAtBlock bool, err error) { if pool.pendingBlockEvaluator == nil { - return ErrNoPendingBlockEvaluator + return false, ErrNoPendingBlockEvaluator } if !params.recomputing { @@ -427,26 +502,26 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo for pool.pendingBlockEvaluator.Round() <= latest && time.Now().Before(waitExpires) { condvar.TimedWait(&pool.cond, timeoutOnNewBlock) if pool.pendingBlockEvaluator == nil { - return ErrNoPendingBlockEvaluator + return false, ErrNoPendingBlockEvaluator } } - err := pool.checkSufficientFee(txgroup) + err = pool.checkSufficientFee(txgroup) if err != nil { - return err + return false, err } } - err := pool.addToPendingBlockEvaluator(txgroup, params.recomputing, params.stats) + stoppedAtBlock, err = pool.addToPendingBlockEvaluator(txgroup, params.recomputing, params.stats, stopAtFirstFullBlock) if err != nil { - return err + return false, err } pool.rememberedTxGroups = append(pool.rememberedTxGroups, txgroup) for _, t := range txgroup { pool.rememberedTxids[t.ID()] = t } - return nil + return stoppedAtBlock, nil } // RememberOne stores the provided transaction. @@ -496,8 +571,101 @@ func (pool *TransactionPool) Lookup(txid transactions.Txid) (tx transactions.Sig return pool.statusCache.check(txid) } -// OnNewBlock excises transactions from the pool that are included in the specified Block or if they've expired +// StartSpeculativeBlockAssembly handles creating a speculative block +func (pool *TransactionPool) StartSpeculativeBlockAssembly(ctx context.Context, vb *ledgercore.ValidatedBlock, blockHash crypto.Digest, onlyIfStarted bool) { + + if pool.cfg.SpeculativeAssemblyDisable { + return + } + + if blockHash.IsZero() { + // if we don't already have a block hash, calculate it now + blockHash = vb.Block().Digest() + } + + pool.specBlockMu.Lock() + defer pool.specBlockMu.Unlock() + if pool.specActive { + if blockHash == pool.specBlockDigest { + pool.log.Infof("StartSpeculativeBlockAssembly %s already running", blockHash.String()) + return + } + // cancel prior speculative block assembly based on different block + speculativeAssemblyDiscarded.Add("start", 1) + pool.cancelSpeculativeAssembly() + pool.specActive = false + } else if onlyIfStarted { + // not already started, don't start one + return + } + pool.log.Infof("StartSpeculativeBlockAssembly %s", blockHash.String()) + pool.specActive = true + pool.specBlockDigest = blockHash + + pool.mu.Lock() + + // move remembered txns to pending + pool.rememberCommit(false) + + // create shallow pool copy, close the done channel when we're done with + // speculative block assembly. + speculativePool, cancel, err := pool.copyTransactionPoolOverSpecLedger(ctx, vb) + pool.cancelSpeculativeAssembly = cancel + pool.speculativePool = speculativePool + + pool.mu.Unlock() + + if err != nil { + pool.specActive = false + pool.log.Warnf("StartSpeculativeBlockAssembly: %v", err) + return + } + + // process txns only until one block is full + // action on subordinate pool continues asynchronously, to be picked up in tryReadSpeculativeBlock + go speculativePool.onNewBlock(vb.Block(), vb.Delta(), true) +} + +func (pool *TransactionPool) tryReadSpeculativeBlock(branch bookkeeping.BlockHash, round basics.Round, deadline time.Time, stats *telemetryspec.AssembleBlockMetrics) (*ledgercore.ValidatedBlock, error) { + // assumes pool.assemblyMu is held + pool.specBlockMu.Lock() + + if pool.specActive { + if pool.specBlockDigest == crypto.Digest(branch) { + pool.log.Infof("update speculative deadline: %s", deadline.String()) + specPool := pool.speculativePool + pool.specBlockMu.Unlock() + // TODO: is continuing to hold outer pool.assemblyMu here a bad thing? + specPool.assemblyMu.Lock() + assembled, err := specPool.waitForBlockAssembly(round, deadline, stats) + specPool.assemblyMu.Unlock() + return assembled, err + } + speculativeAssemblyDiscarded.Add("read", 1) + pool.cancelSpeculativeAssembly() + pool.specActive = false + } + pool.specBlockMu.Unlock() + // nope, nothing + return nil, nil +} + +// OnNewBlock callback calls the internal implementation, onNewBlock with the “false” parameter to process all transactions. func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta) { + pool.specBlockMu.Lock() + if pool.specActive && block.Digest() != pool.specBlockDigest { + speculativeAssemblyDiscarded.Add("ONB", 1) + pool.cancelSpeculativeAssembly() + pool.specActive = false + // cancel speculative assembly, fall through to starting normal assembly + } + // TODO: make core onNewBlock() _wait_ until speculative block is done, merge state from that result + pool.specBlockMu.Unlock() + pool.onNewBlock(block, delta, false) +} + +// onNewBlock excises transactions from the pool that are included in the specified Block or if they've expired +func (pool *TransactionPool) onNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta, stopReprocessingAtFirstAsmBlock bool) { var stats telemetryspec.ProcessBlockMetrics var knownCommitted uint var unknownCommitted uint @@ -518,6 +686,7 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor pool.mu.Lock() defer pool.mu.Unlock() defer pool.cond.Broadcast() + if pool.pendingBlockEvaluator == nil || block.Round() >= pool.pendingBlockEvaluator.Round() { // Adjust the pool fee threshold. The rules are: // - If there was less than one full block in the pool, reduce @@ -545,7 +714,7 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor // Recompute the pool by starting from the new latest block. // This has the side-effect of discarding transactions that // have been committed (or that are otherwise no longer valid). - stats = pool.recomputeBlockEvaluator(committedTxids, knownCommitted) + stats = pool.recomputeBlockEvaluator(committedTxids, knownCommitted, stopReprocessingAtFirstAsmBlock) } stats.KnownCommittedCount = knownCommitted @@ -564,10 +733,13 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor } } -// isAssemblyTimedOut determines if we should keep attempting complete the block assembly by adding more transactions to the pending evaluator, -// or whether we've ran out of time. It takes into consideration the assemblyDeadline that was set by the AssembleBlock function as well as the -// projected time it's going to take to call the GenerateBlock function before the block assembly would be ready. -// The function expects that the pool.assemblyMu lock would be taken before being called. +// isAssemblyTimedOut determines if we should keep attempting complete the block +// assembly by adding more transactions to the pending evaluator, or whether +// we've ran out of time. It takes into consideration the assemblyDeadline that +// was set by the AssembleBlock function as well as the projected time it's +// going to take to call the GenerateBlock function before the block assembly +// would be ready. The function expects that the pool.assemblyMu lock would be +// taken before being called. func (pool *TransactionPool) isAssemblyTimedOut() bool { if pool.assemblyDeadline.IsZero() { // we have no deadline, so no reason to timeout. @@ -605,9 +777,12 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio pool.assemblyMu.Lock() defer pool.assemblyMu.Unlock() if pool.assemblyRound > pool.pendingBlockEvaluator.Round() { - // the block we're assembling now isn't the one the the AssembleBlock is waiting for. While it would be really cool - // to finish generating the block, it would also be pointless to spend time on it. - // we're going to set the ok and assemblyCompletedOrAbandoned to "true" so we can complete this loop asap + // the block we're assembling now isn't the one the the + // AssembleBlock is waiting for. While it would be really cool + // to finish generating the block, it would also be pointless to + // spend time on it. we're going to set the ok and + // assemblyCompletedOrAbandoned to "true" so we can complete + // this loop asap pool.assemblyResults.ok = true pool.assemblyResults.assemblyCompletedOrAbandoned = true stats.StopReason = telemetryspec.AssembleBlockAbandon @@ -643,20 +818,23 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio return err } -func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup []transactions.SignedTxn, recomputing bool, stats *telemetryspec.AssembleBlockMetrics) error { - err := pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats) +func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup []transactions.SignedTxn, recomputing bool, stats *telemetryspec.AssembleBlockMetrics, addUntilFirstBlockFull bool) (shouldContinue bool, err error) { + err = pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats) if err == ledgercore.ErrNoSpace { pool.numPendingWholeBlocks++ + if addUntilFirstBlockFull { + return true, nil + } pool.pendingBlockEvaluator.ResetTxnBytes() err = pool.addToPendingBlockEvaluatorOnce(txgroup, recomputing, stats) } - return err + return false, err } // recomputeBlockEvaluator constructs a new BlockEvaluator and feeds all // in-pool transactions to it (removing any transactions that are rejected // by the BlockEvaluator). Expects that the pool.mu mutex would be already taken. -func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transactions.Txid]ledgercore.IncludedTransactions, knownCommitted uint) (stats telemetryspec.ProcessBlockMetrics) { +func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transactions.Txid]ledgercore.IncludedTransactions, knownCommitted uint, stopAtFirstBlock bool) (stats telemetryspec.ProcessBlockMetrics) { pool.pendingBlockEvaluator = nil latest := pool.ledger.Latest() @@ -700,6 +878,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact if hint < 0 || int(knownCommitted) < 0 { hint = 0 } + pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint, 0, nil) if err != nil { // The pendingBlockEvaluator is an interface, and in case of an evaluator error @@ -725,6 +904,12 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact // Feed the transactions in order for _, txgroup := range txgroups { + select { + case <-pool.ctx.Done(): + return + default: // continue processing transactions + } + if len(txgroup) == 0 { asmStats.InvalidCount++ continue @@ -733,7 +918,8 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact asmStats.EarlyCommittedCount++ continue } - err := pool.add(txgroup, &asmStats) + + hasBlock, err := pool.add(txgroup, &asmStats, stopAtFirstBlock) if err != nil { for _, tx := range txgroup { pool.statusCache.put(tx, err.Error()) @@ -753,16 +939,18 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact case *ledgercore.LeaseInLedgerError: asmStats.LeaseErrorCount++ stats.RemovedInvalidCount++ - pool.log.Infof("Cannot re-add pending transaction to pool: %v", err) + pool.log.Infof("Cannot re-add pending transaction to pool (lease): %v", err) case *transactions.MinFeeError: asmStats.MinFeeErrorCount++ stats.RemovedInvalidCount++ - pool.log.Infof("Cannot re-add pending transaction to pool: %v", err) + pool.log.Infof("Cannot re-add pending transaction to pool (fee): %v", err) default: asmStats.InvalidCount++ stats.RemovedInvalidCount++ pool.log.Warnf("Cannot re-add pending transaction to pool: %v", err) } + } else if hasBlock { + break } } @@ -791,7 +979,10 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact } pool.assemblyMu.Unlock() - pool.rememberCommit(true) + // remember the changes made to the tx pool if we're not in speculative assembly + if !stopAtFirstBlock { + pool.rememberCommit(true) + } return } @@ -888,14 +1079,33 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim defer pool.assemblyMu.Unlock() if pool.assemblyResults.roundStartedEvaluating > round { - // we've already assembled a round in the future. Since we're clearly won't go backward, it means - // that the agreement is far behind us, so we're going to return here with error code to let - // the agreement know about it. - // since the network is already ahead of us, there is no issue here in not generating a block ( since the block would get discarded anyway ) + // we've already assembled a round in the future. Since we're clearly + // won't go backward, it means that the agreement is far behind us, so + // we're going to return here with error code to let the agreement know + // about it. since the network is already ahead of us, there is no issue + // here in not generating a block ( since the block would get discarded + // anyway ) pool.log.Infof("AssembleBlock: requested round is behind transaction pool round %d < %d", round, pool.assemblyResults.roundStartedEvaluating) return nil, ErrStaleBlockAssemblyRequest } + // Maybe we have a block already assembled + // TODO: this needs to be changed if a speculative block is in flight and if it is valid. If it is not valid, cancel it, if it is valid, wait for it. + prev, err := pool.ledger.Block(round.SubSaturate(1)) + if err == nil { + specBlock, specErr := pool.tryReadSpeculativeBlock(prev.Hash(), round, deadline, &stats) + if specBlock != nil || specErr != nil { + pool.log.Infof("got spec block for %s, specErr %v", prev.Hash().String(), specErr) + return specBlock, specErr + } + } + + assembled, err = pool.waitForBlockAssembly(round, deadline, &stats) + return +} + +// should be called with assemblyMu held +func (pool *TransactionPool) waitForBlockAssembly(round basics.Round, deadline time.Time, stats *telemetryspec.AssembleBlockMetrics) (assembled *ledgercore.ValidatedBlock, err error) { pool.assemblyDeadline = deadline pool.assemblyRound = round for time.Now().Before(deadline) && (!pool.assemblyResults.ok || pool.assemblyResults.roundStartedEvaluating != round) { @@ -903,9 +1113,11 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim } if !pool.assemblyResults.ok { - // we've passed the deadline, so we're either going to have a partial block, or that we won't make it on time. - // start preparing an empty block in case we'll miss the extra time (assemblyWaitEps). - // the assembleEmptyBlock is using the database, so we want to unlock here and take the lock again later on. + // we've passed the deadline, so we're either going to have a partial + // block, or that we won't make it on time. start preparing an empty + // block in case we'll miss the extra time (assemblyWaitEps). the + // assembleEmptyBlock is using the database, so we want to unlock here + // and take the lock again later on. pool.assemblyMu.Unlock() emptyBlock, emptyBlockErr := pool.assembleEmptyBlock(round) pool.assemblyMu.Lock() @@ -954,12 +1166,13 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim pool.assemblyResults.roundStartedEvaluating, round) } - stats = pool.assemblyResults.stats + *stats = pool.assemblyResults.stats return pool.assemblyResults.blk, nil } -// assembleEmptyBlock construct a new block for the given round. Internally it's using the ledger database calls, so callers -// need to be aware that it might take a while before it would return. +// assembleEmptyBlock construct a new block for the given round. Internally it's +// using the ledger database calls, so callers need to be aware that it might +// take a while before it would return. func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *ledgercore.ValidatedBlock, err error) { prevRound := round - 1 prev, err := pool.ledger.BlockHdr(prevRound) @@ -973,8 +1186,9 @@ func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled * var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval if errors.As(err, &nonSeqBlockEval) { if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound { - // in the case that the ledger have already moved beyond that round, just let the agreement know that - // we don't generate a block and it's perfectly fine. + // in the case that the ledger have already moved beyond that + // round, just let the agreement know that we don't generate a + // block and it's perfectly fine. return nil, ErrStaleBlockAssemblyRequest } } @@ -990,7 +1204,7 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.Valid defer pool.mu.Unlock() // drop the current block evaluator and start with a new one. - pool.recomputeBlockEvaluator(nil, 0) + pool.recomputeBlockEvaluator(nil, 0, false) // The above was already pregenerating the entire block, // so there won't be any waiting on this call. diff --git a/data/pools/transactionPool_test.go b/data/pools/transactionPool_test.go index f03eadf7da..a90a4cfdd8 100644 --- a/data/pools/transactionPool_test.go +++ b/data/pools/transactionPool_test.go @@ -19,12 +19,14 @@ package pools import ( "bufio" "bytes" + "context" "fmt" "math/rand" "os" "runtime" "runtime/pprof" "strings" + "sync" "testing" "time" @@ -44,6 +46,7 @@ import ( "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/logging/telemetryspec" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/stateproof" "github.com/algorand/go-algorand/stateproof/verify" @@ -1198,7 +1201,7 @@ func BenchmarkTransactionPoolRecompute(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - transactionPool[i].recomputeBlockEvaluator(committedTxIds[i], knownCommitted[i]) + transactionPool[i].recomputeBlockEvaluator(committedTxIds[i], knownCommitted[i], false) } b.StopTimer() if profF != nil { @@ -1528,7 +1531,7 @@ func TestStateProofLogging(t *testing.T) { err = transactionPool.RememberOne(stxn) require.NoError(t, err) - transactionPool.recomputeBlockEvaluator(nil, 0) + transactionPool.recomputeBlockEvaluator(nil, 0, false) _, err = transactionPool.AssembleBlock(514, time.Time{}) require.NoError(t, err) @@ -1612,3 +1615,309 @@ func generateProofForTesting( return proof } + +func TestSpeculativeBlockAssembly(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfAccounts := 10 + // Generate accounts + secrets := make([]*crypto.SignatureSecrets, numOfAccounts) + addresses := make([]basics.Address, numOfAccounts) + + for i := 0; i < numOfAccounts; i++ { + secret := keypair() + addr := basics.Address(secret.SignatureVerifier) + secrets[i] = secret + addresses[i] = addr + } + + mockLedger := makeMockLedger(t, initAccFixed(addresses, 1<<32)) + cfg := config.GetDefaultLocal() + cfg.TxPoolSize = testPoolSize + cfg.EnableProcessBlockStats = false + log := logging.TestingLog(t) + transactionPool := MakeTransactionPool(mockLedger, cfg, log) + + savedTransactions := 0 + for i, sender := range addresses { + amount := uint64(0) + for _, receiver := range addresses { + if sender != receiver { + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: sender, + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee + amount}, + FirstValid: 0, + LastValid: 10, + Note: make([]byte, 0), + GenesisHash: mockLedger.GenesisHash(), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: receiver, + Amount: basics.MicroAlgos{Raw: 0}, + }, + } + amount++ + + signedTx := tx.Sign(secrets[i]) + require.NoError(t, transactionPool.RememberOne(signedTx)) + savedTransactions++ + } + } + } + pending := transactionPool.PendingTxGroups() + require.Len(t, pending, savedTransactions) + + secret := keypair() + recv := basics.Address(secret.SignatureVerifier) + + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: addresses[0], + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee}, + FirstValid: 0, + LastValid: 10, + Note: []byte{1}, + GenesisHash: mockLedger.GenesisHash(), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: recv, + Amount: basics.MicroAlgos{Raw: 0}, + }, + } + signedTx := tx.Sign(secrets[0]) + + blockEval := newBlockEvaluator(t, mockLedger) + err := blockEval.Transaction(signedTx, transactions.ApplyData{}) + require.NoError(t, err) + + // simulate this transaction was applied + block, err := blockEval.GenerateBlock() + require.NoError(t, err) + + t.Logf("prev block digest %s", block.Block().Digest().String()) + t.Logf("prev block hash %s", block.Block().Hash().String()) + + transactionPool.StartSpeculativeBlockAssembly(context.Background(), block, crypto.Digest{}, false) + // TODO(yg): shouldn't we wait for it to finish? + //<-transactionPool.specAsmDone + + // add the block + err = mockLedger.AddBlock(block.Block(), agreement.Certificate{}) + require.NoError(t, err) + + // empty tx pool + transactionPool.pendingTxids = make(map[transactions.Txid]transactions.SignedTxn) + transactionPool.pendingTxGroups = nil + + // check that we still assemble the block + specBlock, err := transactionPool.AssembleBlock(block.Block().Round()+1, time.Now().Add(time.Second)) + require.NoError(t, err) + require.Equal(t, specBlock.Block().Branch, block.Block().Hash(), "spec.Branch %s", specBlock.Block().Branch.String()) + require.NotNil(t, specBlock) + require.Len(t, specBlock.Block().Payset, savedTransactions) +} + +func TestSpeculativeBlockAssemblyWithOverlappingBlock(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfAccounts := 10 + // Generate accounts + secrets := make([]*crypto.SignatureSecrets, numOfAccounts) + addresses := make([]basics.Address, numOfAccounts) + + for i := 0; i < numOfAccounts; i++ { + secret := keypair() + addr := basics.Address(secret.SignatureVerifier) + secrets[i] = secret + addresses[i] = addr + } + + mockLedger := makeMockLedger(t, initAccFixed(addresses, 1<<32)) + cfg := config.GetDefaultLocal() + cfg.TxPoolSize = testPoolSize + cfg.EnableProcessBlockStats = false + transactionPool := MakeTransactionPool(mockLedger, cfg, logging.Base()) + + savedTransactions := 0 + pendingTxn := transactions.SignedTxn{} + pendingTxIDSet := make(map[crypto.Signature]bool) + for i, sender := range addresses { + amount := uint64(0) + for _, receiver := range addresses { + if sender != receiver { + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: sender, + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee + amount}, + FirstValid: 0, + LastValid: 10, + Note: make([]byte, 0), + GenesisHash: mockLedger.GenesisHash(), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: receiver, + Amount: basics.MicroAlgos{Raw: 0}, + }, + } + amount++ + + pendingTxn = tx.Sign(secrets[i]) + require.NoError(t, transactionPool.RememberOne(pendingTxn)) + pendingTxIDSet[pendingTxn.Sig] = true + savedTransactions++ + } + } + } + pending := transactionPool.PendingTxGroups() + require.Len(t, pending, savedTransactions) + require.Len(t, pendingTxIDSet, savedTransactions) + + blockEval := newBlockEvaluator(t, mockLedger) + err := blockEval.Transaction(pendingTxn, transactions.ApplyData{}) + require.NoError(t, err) + + // simulate this transaction was applied + block, err := blockEval.GenerateBlock() + require.NoError(t, err) + + transactionPool.StartSpeculativeBlockAssembly(context.Background(), block, crypto.Digest{}, false) + //<-transactionPool.specAsmDone + var stats telemetryspec.AssembleBlockMetrics + specBlock, specErr := transactionPool.tryReadSpeculativeBlock(block.Block().Hash(), block.Block().Round()+1, time.Now().Add(time.Second), &stats) + require.NoError(t, specErr) + require.NotNil(t, specBlock) + // assembled block doesn't have txn in the speculated block + require.Len(t, specBlock.Block().Payset, savedTransactions-1) + + // tx pool unaffected + require.Len(t, transactionPool.PendingTxIDs(), savedTransactions) + + for _, txn := range specBlock.Block().Payset { + require.NotEqual(t, txn.SignedTxn.Sig, pendingTxn.Sig) + require.True(t, pendingTxIDSet[txn.SignedTxn.Sig]) + } +} + +// This test runs the speculative block assembly and adds txns to the pool in another thread +func TestSpeculativeBlockAssemblyDataRace(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfAccounts := 10 + // Generate accounts + secrets := make([]*crypto.SignatureSecrets, numOfAccounts) + addresses := make([]basics.Address, numOfAccounts) + + for i := 0; i < numOfAccounts; i++ { + secret := keypair() + addr := basics.Address(secret.SignatureVerifier) + secrets[i] = secret + addresses[i] = addr + } + + mockLedger := makeMockLedger(t, initAccFixed(addresses, 1<<32)) + cfg := config.GetDefaultLocal() + cfg.TxPoolSize = testPoolSize + cfg.EnableProcessBlockStats = false + transactionPool := MakeTransactionPool(mockLedger, cfg, logging.Base()) + + savedTransactions := 0 + pendingTxn := transactions.SignedTxn{} + pendingTxIDSet := make(map[crypto.Signature]bool) + for i, sender := range addresses { + amount := uint64(0) + for _, receiver := range addresses { + if sender != receiver { + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: sender, + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee + amount}, + FirstValid: 0, + LastValid: 10, + Note: make([]byte, 0), + GenesisHash: mockLedger.GenesisHash(), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: receiver, + Amount: basics.MicroAlgos{Raw: 0}, + }, + } + amount++ + + pendingTxn = tx.Sign(secrets[i]) + require.NoError(t, transactionPool.RememberOne(pendingTxn)) + pendingTxIDSet[pendingTxn.Sig] = true + savedTransactions++ + } + } + } + t.Logf("savedTransactions=%d", savedTransactions) + pending := transactionPool.PendingTxGroups() + require.Len(t, pending, savedTransactions) + require.Len(t, pendingTxIDSet, savedTransactions) + + blockEval := newBlockEvaluator(t, mockLedger) + err := blockEval.Transaction(pendingTxn, transactions.ApplyData{}) + require.NoError(t, err) + + // simulate this transaction was applied + block, err := blockEval.GenerateBlock() + require.NoError(t, err) + + transactionPool.StartSpeculativeBlockAssembly(context.Background(), block, crypto.Digest{}, false) + newSavedTransactions := 0 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for i, sender := range addresses { + amount := uint64(0) + for _, receiver := range addresses { + if sender != receiver { + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: sender, + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee + amount}, + FirstValid: 0, + LastValid: 11, + Note: make([]byte, 0), + GenesisHash: mockLedger.GenesisHash(), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: receiver, + Amount: basics.MicroAlgos{Raw: 0}, + }, + } + amount++ + + pendingTxn = tx.Sign(secrets[i]) + require.NoError(t, transactionPool.RememberOne(pendingTxn)) + pendingTxIDSet[pendingTxn.Sig] = true + newSavedTransactions++ + } + } + } + }() + wg.Wait() + t.Logf("newSavedTransactions=%d", newSavedTransactions) + + // tx pool should have old txns and new txns + require.Len(t, transactionPool.PendingTxIDs(), savedTransactions+newSavedTransactions) + + var stats telemetryspec.AssembleBlockMetrics + specBlock, specErr := transactionPool.tryReadSpeculativeBlock(block.Block().Hash(), block.Block().Round()+1, time.Now().Add(time.Second), &stats) + require.NoError(t, specErr) + require.NotNil(t, specBlock) + // assembled block doesn't have txn in the speculated block + require.Len(t, specBlock.Block().Payset, savedTransactions-1, "len(Payset)=%d, savedTransactions=%d", len(specBlock.Block().Payset), savedTransactions) + + for _, txn := range specBlock.Block().Payset { + require.NotEqual(t, txn.SignedTxn.Sig, pendingTxn.Sig) + require.True(t, pendingTxIDSet[txn.SignedTxn.Sig]) + } +} diff --git a/installer/config.json.example b/installer/config.json.example index 8522011ce6..70bcc758b9 100644 --- a/installer/config.json.example +++ b/installer/config.json.example @@ -101,6 +101,8 @@ "RestReadTimeoutSeconds": 15, "RestWriteTimeoutSeconds": 120, "RunHosted": false, + "SpeculativeAsmTimeOffset": 0, + "SpeculativeAssemblyDisable": false, "StorageEngine": "sqlite", "SuggestedFeeBlockHistory": 3, "SuggestedFeeSlidingWindowSize": 50, diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index 5edb6d20e1..e99574487c 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -1102,7 +1102,7 @@ func testLedgerSingleTxApplyData(t *testing.T, version protocol.ConsensusVersion VoteLast: 10000, } - // depends on what the concensus is need to generate correct KeyregTxnFields. + // depends on what the consensus is need to generate correct KeyregTxnFields. if proto.EnableStateProofKeyregCheck { frst, lst := uint64(correctKeyregFields.VoteFirst), uint64(correctKeyregFields.VoteLast) store, err := db.MakeAccessor("test-DB", false, true) diff --git a/ledger/ledgercore/validatedBlock.go b/ledger/ledgercore/validatedBlock.go index 84d09e2a9a..0ace3a24f4 100644 --- a/ledger/ledgercore/validatedBlock.go +++ b/ledger/ledgercore/validatedBlock.go @@ -17,8 +17,11 @@ package ledgercore import ( + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/committee" + "github.com/algorand/go-algorand/data/transactions" ) // ValidatedBlock represents the result of a block validation. It can @@ -50,6 +53,27 @@ func (vb ValidatedBlock) WithSeed(s committee.Seed) ValidatedBlock { } } +// CheckDup checks whether a txn is a duplicate +func (vb ValidatedBlock) CheckDup(currentProto config.ConsensusParams, firstValid, lastValid basics.Round, txid transactions.Txid, txl Txlease) error { + _, present := vb.delta.Txids[txid] + if present { + return &TransactionInLedgerError{Txid: txid} + } + + if currentProto.SupportTransactionLeases && (txl.Lease != [32]byte{}) { + expires, ok := vb.delta.Txleases[txl] + if ok && vb.blk.Round() <= expires { + return MakeLeaseInLedgerError(txid, txl, false) + } + } + return nil +} + +// Hash returns the hash of the block +func (vb ValidatedBlock) Hash() bookkeeping.BlockHash { + return vb.blk.Hash() +} + // MakeValidatedBlock creates a validated block. func MakeValidatedBlock(blk bookkeeping.Block, delta StateDelta) ValidatedBlock { return ValidatedBlock{ diff --git a/ledger/speculative.go b/ledger/speculative.go new file mode 100644 index 0000000000..d6b251f9c8 --- /dev/null +++ b/ledger/speculative.go @@ -0,0 +1,305 @@ +// Copyright (C) 2019-2023 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "errors" + "fmt" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/data/transactions/logic" + "github.com/algorand/go-algorand/data/transactions/verify" + "github.com/algorand/go-algorand/ledger/eval" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/logging" +) + +// LedgerForEvaluator defines the ledger interface needed by the evaluator. +type LedgerForEvaluator interface { //nolint:revive //LedgerForEvaluator is a long established but newly leaking-out name, and there really isn't a better name for it despite how lint dislikes ledger.LedgerForEvaluator + eval.LedgerForEvaluator + LookupKv(rnd basics.Round, key string) ([]byte, error) + FlushCaches() + + // and a few more Ledger functions + Block(basics.Round) (bookkeeping.Block, error) + Latest() basics.Round + VerifiedTransactionCache() verify.VerifiedTransactionCache + StartEvaluator(hdr bookkeeping.BlockHeader, paysetHint, maxTxnBytesPerBlock int, tracer logic.EvalTracer) (*eval.BlockEvaluator, error) + GetStateProofVerificationContext(stateProofLastAttestedRound basics.Round) (*ledgercore.StateProofVerificationContext, error) +} + +// ErrRoundTooHigh "round too high" try to get ledger record in the future +var ErrRoundTooHigh = errors.New("round too high") + +// validatedBlockAsLFE presents a LedgerForEvaluator interface on top of +// a ValidatedBlock. This makes it possible to construct a BlockEvaluator +// on top, which in turn allows speculatively constructing a subsequent +// block, before the ValidatedBlock is committed to the ledger. +// +// ledger ValidatedBlock -------> Block +// | ^ blk +// | | vb +// | l | +// \---------- validatedBlockAsLFE +// +// where ledger is the full ledger. +type validatedBlockAsLFE struct { + // l points to the underlying ledger; it might be another instance + // of validatedBlockAsLFE if we are speculating on a chain of many + // blocks. + l LedgerForEvaluator + + // vb points to the ValidatedBlock that logically extends the + // state of the ledger. + vb *ledgercore.ValidatedBlock +} + +// MakeValidatedBlockAsLFE constructs a new validatedBlockAsLFE from a ValidatedBlock. +func MakeValidatedBlockAsLFE(vb *ledgercore.ValidatedBlock, l LedgerForEvaluator) (*validatedBlockAsLFE, error) { + latestRound := l.Latest() + if vb.Block().Round().SubSaturate(1) != latestRound { + return nil, fmt.Errorf("MakeBlockAsLFE: Ledger round %d mismatches next block round %d", latestRound, vb.Block().Round()) + } + hdr, err := l.BlockHdr(latestRound) + if err != nil { + return nil, err + } + if vb.Block().Branch != hdr.Hash() { + return nil, fmt.Errorf("MakeBlockAsLFE: Ledger latest block hash %x mismatches block's prev hash %x", hdr.Hash(), vb.Block().Branch) + } + + return &validatedBlockAsLFE{ + l: l, + vb: vb, + }, nil +} + +// Block implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) Block(r basics.Round) (bookkeeping.Block, error) { + if r == v.vb.Block().Round() { + return v.vb.Block(), nil + } + + return v.l.Block(r) +} + +// BlockHdr implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) BlockHdr(r basics.Round) (bookkeeping.BlockHeader, error) { + if r == v.vb.Block().Round() { + return v.vb.Block().BlockHeader, nil + } + + return v.l.BlockHdr(r) +} + +// CheckDup implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) CheckDup(currentProto config.ConsensusParams, current basics.Round, firstValid basics.Round, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error { + if current == v.vb.Block().Round() { + return v.vb.CheckDup(currentProto, firstValid, lastValid, txid, txl) + } + + return v.l.CheckDup(currentProto, current, firstValid, lastValid, txid, txl) +} + +// GenesisHash implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) GenesisHash() crypto.Digest { + return v.l.GenesisHash() +} + +// Latest implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) Latest() basics.Round { + return v.vb.Block().Round() +} + +// LatestTotals returns the totals of all accounts for the most recent round, as well as the round number. +func (v *validatedBlockAsLFE) LatestTotals() (basics.Round, ledgercore.AccountTotals, error) { + return v.Latest(), v.vb.Delta().Totals, nil +} + +// VotersForStateProof implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) VotersForStateProof(r basics.Round) (*ledgercore.VotersForRound, error) { + if r >= v.vb.Block().Round() { + // We do not support computing the compact cert voters for rounds + // that have not been committed to the ledger yet. This should not + // be a problem as long as the speculation depth does not + // exceed CompactCertVotersLookback. + err := fmt.Errorf("validatedBlockAsLFE.CompactCertVoters(%d): validated block is for round %d, voters not available", r, v.vb.Block().Round()) + logging.Base().Warn(err.Error()) + return nil, err + } + + return v.l.VotersForStateProof(r) +} + +// GetCreatorForRound implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) GetCreatorForRound(rnd basics.Round, cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error) { + vbround := v.vb.Block().Round() + if rnd > vbround { + return basics.Address{}, false, ErrRoundTooHigh + } + if rnd == vbround { + delta, ok := v.vb.Delta().Creatables[cidx] + if ok { + if delta.Created && delta.Ctype == ctype { + return delta.Creator, true, nil + } + return basics.Address{}, false, nil + } + + // no change in this block, so lookup in previous + rnd-- + } + + return v.l.GetCreatorForRound(rnd, cidx, ctype) +} + +// GenesisProto returns the initial protocol for this ledger. +func (v *validatedBlockAsLFE) GenesisProto() config.ConsensusParams { + return v.l.GenesisProto() +} + +// LookupApplication loads an application resource that matches the request parameters from the ledger. +func (v *validatedBlockAsLFE) LookupApplication(rnd basics.Round, addr basics.Address, aidx basics.AppIndex) (ledgercore.AppResource, error) { + vbround := v.vb.Block().Round() + if rnd > vbround { + return ledgercore.AppResource{}, ErrRoundTooHigh + } + if rnd == vbround { + // Intentionally apply (pending) rewards up to rnd. + res, ok := v.vb.Delta().Accts.GetResource(addr, basics.CreatableIndex(aidx), basics.AppCreatable) + if ok { + return ledgercore.AppResource{AppParams: res.AppParams, AppLocalState: res.AppLocalState}, nil + } + + // fall back to looking up asset in ledger, until previous block + rnd-- + } + + return v.l.LookupApplication(rnd, addr, aidx) +} + +// LookupAsset loads an asset resource that matches the request parameters from the ledger. +func (v *validatedBlockAsLFE) LookupAsset(rnd basics.Round, addr basics.Address, aidx basics.AssetIndex) (ledgercore.AssetResource, error) { + vbround := v.vb.Block().Round() + if rnd > vbround { + return ledgercore.AssetResource{}, ErrRoundTooHigh + } + if rnd == vbround { + // Intentionally apply (pending) rewards up to rnd. + res, ok := v.vb.Delta().Accts.GetResource(addr, basics.CreatableIndex(aidx), basics.AssetCreatable) + if ok { + return ledgercore.AssetResource{AssetParams: res.AssetParams, AssetHolding: res.AssetHolding}, nil + } + // fall back to looking up asset in ledger, until previous block + rnd-- + } + + return v.l.LookupAsset(rnd, addr, aidx) +} + +// LookupWithoutRewards implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) LookupWithoutRewards(rnd basics.Round, a basics.Address) (ledgercore.AccountData, basics.Round, error) { + vbround := v.vb.Block().Round() + if rnd > vbround { + return ledgercore.AccountData{}, rnd, ErrRoundTooHigh + } + if rnd == vbround { + data, ok := v.vb.Delta().Accts.GetData(a) + if ok { + return data, rnd, nil + } + // fall back to looking up account in ledger, until previous block + rnd-- + } + + // account didn't change in last round. Subtract 1 so we can lookup the most recent change in the ledger + acctData, fallbackrnd, err := v.l.LookupWithoutRewards(rnd, a) + if err != nil { + return acctData, fallbackrnd, err + } + return acctData, rnd, err +} + +// VerifiedTransactionCache implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) VerifiedTransactionCache() verify.VerifiedTransactionCache { + return v.l.VerifiedTransactionCache() +} + +// VerifiedTransactionCache implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) BlockHdrCached(rnd basics.Round) (hdr bookkeeping.BlockHeader, err error) { + vbround := v.vb.Block().Round() + if rnd > vbround { + return bookkeeping.BlockHeader{}, ErrRoundTooHigh + } + if rnd == vbround { + return v.vb.Block().BlockHeader, nil + } + return v.l.BlockHdrCached(rnd) +} + +// StartEvaluator implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) StartEvaluator(hdr bookkeeping.BlockHeader, paysetHint, maxTxnBytesPerBlock int, tracer logic.EvalTracer) (*eval.BlockEvaluator, error) { + if hdr.Round.SubSaturate(1) != v.Latest() { + return nil, fmt.Errorf("StartEvaluator: LFE round %d mismatches next block round %d", v.Latest(), hdr.Round) + } + + return eval.StartEvaluator(v, hdr, + eval.EvaluatorOptions{ + PaysetHint: paysetHint, + Tracer: tracer, + Generate: true, + Validate: true, + MaxTxnBytesPerBlock: maxTxnBytesPerBlock, + }) +} + +// GetStateProofVerificationContext implements the ledgerForEvaluator interface. +func (v *validatedBlockAsLFE) GetStateProofVerificationContext(stateProofLastAttestedRound basics.Round) (*ledgercore.StateProofVerificationContext, error) { + //TODO(yg): is this behavior correct? what happens if round is the last block's round? + return v.l.GetStateProofVerificationContext(stateProofLastAttestedRound) +} + +// FlushCaches is part of ledger/eval.LedgerForEvaluator interface +func (v *validatedBlockAsLFE) FlushCaches() { +} + +// LookupKv implements LookupKv +func (v *validatedBlockAsLFE) LookupKv(rnd basics.Round, key string) ([]byte, error) { + vbround := v.vb.Block().Round() + if rnd > vbround { + return nil, ErrRoundTooHigh + } + if rnd == vbround { + data, ok := v.vb.Delta().KvMods[key] + if ok { + return data.Data, nil + } + // fall back to looking up account in ledger, until previous block + rnd-- + } + + // account didn't change in last round. Subtract 1 so we can lookup the most recent change in the ledger + data, err := v.l.LookupKv(rnd, key) + if err != nil { + return nil, err + } + return data, nil +} diff --git a/ledger/speculative_test.go b/ledger/speculative_test.go new file mode 100644 index 0000000000..bcbb693cb7 --- /dev/null +++ b/ledger/speculative_test.go @@ -0,0 +1,119 @@ +// Copyright (C) 2019-2023 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger/eval" + + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/ledger/ledgercore" + ledgertesting "github.com/algorand/go-algorand/ledger/testing" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/protocol" +) + +func TestSpeculative(t *testing.T) { + genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 1000) + const inMem = true + cfg := config.GetDefaultLocal() + log := logging.TestingLog(t) + l, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + require.NoError(t, err, "could not open ledger") + defer l.Close() + + blk0, err := l.BlockHdr(l.Latest()) + require.NoError(t, err) + + var blk1 bookkeeping.Block + blk1.CurrentProtocol = protocol.ConsensusCurrentVersion + blk1.Branch = blk0.Hash() + blk1.RewardsPool = testPoolAddr + blk1.FeeSink = testSinkAddr + blk1.BlockHeader.GenesisHash = genesisInitState.GenesisHash + blk1.BlockHeader.Round = l.Latest() + 1 + + //sl, err := MakeSpeculativeLedger(l) + //require.NoError(t, err) + + require.NoError(t, err) + + state, err := eval.Eval(context.Background(), l, blk1, false, l.VerifiedTransactionCache(), nil, nil) + require.NoError(t, err) + vblk1 := ledgercore.MakeValidatedBlock(blk1, state) + + blk1aslfe, err := MakeValidatedBlockAsLFE(&vblk1, l) + require.NoError(t, err) + + blk2 := blk1 + blk2.BlockHeader.Round++ + blk2.Branch = blk1.Hash() + + // Pick some accounts at random + var addr1, addr2 basics.Address + for a := range genesisInitState.Accounts { + if addr1 == (basics.Address{}) { + addr1 = a + } else if addr2 == (basics.Address{}) { + addr2 = a + } else { + break + } + } + + var tx21 transactions.Transaction + tx21.Type = protocol.PaymentTx + tx21.Sender = addr1 + tx21.Receiver = addr2 + tx21.FirstValid = blk2.BlockHeader.Round + tx21.LastValid = blk2.BlockHeader.Round + tx21.Amount.Raw = 1000000 + blk2.Payset = append(blk2.Payset, transactions.SignedTxnInBlock{ + SignedTxnWithAD: transactions.SignedTxnWithAD{ + SignedTxn: transactions.SignedTxn{ + Txn: tx21, + }, + }, + HasGenesisID: true, + }) + + state, err = eval.Eval(context.Background(), blk1aslfe, blk2, false, blk1aslfe.VerifiedTransactionCache(), nil, nil) + require.NoError(t, err) + vblk2 := ledgercore.MakeValidatedBlock(blk2, state) + + blk2aslfe, err := MakeValidatedBlockAsLFE(&vblk2, blk1aslfe) + require.NoError(t, err) + + ad11, rnd, err := blk2aslfe.LookupWithoutRewards(blk1.Round(), addr1) + require.NoError(t, err) + // account was never changed + require.Equal(t, rnd, blk1.Round()) + + ad22, rnd, err := blk2aslfe.LookupWithoutRewards(blk2.Round(), addr1) + require.NoError(t, err) + // account changed at blk2 + require.Equal(t, rnd, blk2.Round()) + + require.Equal(t, ad22.MicroAlgos.Raw, ad11.MicroAlgos.Raw-1000000) +} diff --git a/logging/logspec/agreement.go b/logging/logspec/agreement.go index c5df1948bd..526c8301d2 100644 --- a/logging/logspec/agreement.go +++ b/logging/logspec/agreement.go @@ -58,6 +58,7 @@ const ( // has been reached for a given value during some (round, period, step). ThresholdReached // BlockAssembled is emitted when the source receives all parts of a block. + // BlockAssembled is not when the local node has assembled a block, but when a block has been _received_ and verified from the network, or possibly when a locally assembled block has been added to the proposal cache of blocks validaded and available for voting for the next round. TODO: rename? BlockAssembled // BlockCommittable is emitted when the source observes a // block B and a threshold of soft-votes for H(B). It is diff --git a/node/node.go b/node/node.go index 87ffd3e2a4..1203e3a17b 100644 --- a/node/node.go +++ b/node/node.go @@ -1293,6 +1293,16 @@ func (node *AlgorandFullNode) AssembleBlock(round basics.Round) (agreement.Valid return validatedBlock{vb: lvb}, nil } +// StartSpeculativeBlockAssembly handles creating a speculative block +func (node *AlgorandFullNode) StartSpeculativeBlockAssembly(ctx context.Context, avb agreement.ValidatedBlock, blockHash crypto.Digest, onlyIfStarted bool) { + vb, ok := avb.(validatedBlock) + if ok { + node.transactionPool.StartSpeculativeBlockAssembly(ctx, vb.vb, blockHash, onlyIfStarted) + } else { + node.log.Panicf("cannot convert agreement ValidatedBlock to ValidateBlock, got %T", avb) + } +} + // getOfflineClosedStatus will return an int with the appropriate bit(s) set if it is offline and/or online func getOfflineClosedStatus(acctData basics.OnlineAccountData) int { rval := 0 diff --git a/test/scripts/e2e_client_runner.py b/test/scripts/e2e_client_runner.py index 7b425ef925..233e8244a4 100755 --- a/test/scripts/e2e_client_runner.py +++ b/test/scripts/e2e_client_runner.py @@ -446,6 +446,19 @@ def main(): retcode = 0 capv = args.version.capitalize() xrun(['goal', 'network', 'create', '-r', netdir, '-n', 'tbd', '-t', os.path.join(repodir, f'test/testdata/nettemplates/TwoNodes50Each{capv}.json')], timeout=90) +<<<<<<< HEAD + env['ALGORAND_DATA'] = os.path.join(netdir, 'Node') + env['ALGORAND_DATA2'] = os.path.join(netdir, 'Primary') + cfgpath = os.path.join(netdir, 'Node', 'config.json') + with open(cfgpath, 'rt') as fin: + ncfg = json.load(fin) + ncfg['EnableDeveloperAPI'] = True + with open(cfgpath, 'wt') as fout: + json.dump(ncfg, fout) + xrun(['goal', 'network', 'start', '-r', netdir], timeout=90) + atexit.register(goal_network_stop, netdir, env) + +======= nodeDataDir = os.path.join(netdir, 'Node') primaryDataDir = os.path.join(netdir, 'Primary') @@ -465,6 +478,7 @@ def main(): env['ALGORAND_DATA'] = nodeDataDir env['ALGORAND_DATA2'] = primaryDataDir +>>>>>>> master if args.unsafe_scrypt: create_kmd_config_with_unsafe_scrypt(env['ALGORAND_DATA']) diff --git a/test/scripts/e2e_subs/e2e-app-real-assets-round.sh b/test/scripts/e2e_subs/e2e-app-real-assets-round.sh index 25a543300f..905f236c29 100755 --- a/test/scripts/e2e_subs/e2e-app-real-assets-round.sh +++ b/test/scripts/e2e_subs/e2e-app-real-assets-round.sh @@ -20,10 +20,29 @@ ACCOUNT=$(${gcmd} account list|awk '{ print $3 }') ${gcmd} asset create --creator ${ACCOUNT} --name bogocoin --unitname bogo --total 1337 ASSET_ID=$(${gcmd} asset info --creator $ACCOUNT --unitname bogo|grep 'Asset ID'|awk '{ print $3 }') +#${gcmd} account info -a ${ACCOUNT} + # Create app that reads asset balance and checks asset details and checks round ROUND=$(goal node status | grep 'Last committed' | awk '{ print $4 }') -TIMESTAMP=$(goal ledger block --strict ${ROUND} | jq .block.ts) -APP_ID=$(${gcmd} app create --creator ${ACCOUNT} --foreign-asset $ASSET_ID --app-arg "int:$ASSET_ID" --app-arg "int:1337" --app-arg "int:0" --app-arg "int:0" --app-arg "int:1337" --app-arg "str:bogo" --app-arg "int:$ROUND" --app-arg "int:$TIMESTAMP" --approval-prog ${DIR}/tealprogs/assetround.teal --global-byteslices 0 --global-ints 0 --local-byteslices 0 --local-ints 0 --clear-prog <(printf "#pragma version 2\nint 1") | grep Created | awk '{ print $6 }') +#TIMESTAMP=$(goal ledger block --strict ${ROUND} | jq .block.ts) +TIMESTAMP=$(date +%s) +#${gcmd} app create --dryrun-dump -o ${TEMPDIR}/ac.txn --creator ${ACCOUNT} --foreign-asset $ASSET_ID --app-arg "int:$ASSET_ID" --app-arg "int:1337" --app-arg "int:0" --app-arg "int:0" --app-arg "int:1337" --app-arg "str:bogo" --app-arg "int:$ROUND" --app-arg "int:$TIMESTAMP" --approval-prog ${DIR}/tealprogs/assetround.teal --global-byteslices 0 --global-ints 0 --local-byteslices 0 --local-ints 0 --clear-prog ${DIR}/tealprogs/approve-all.teal +#${gcmd} clerk dryrun-remote -v -D ${TEMPDIR}/ac.txn + +set +e +date '+before %s' +APP_ID=$(${gcmd} app create --creator ${ACCOUNT} --foreign-asset $ASSET_ID --app-arg "int:$ASSET_ID" --app-arg "int:1337" --app-arg "int:0" --app-arg "int:0" --app-arg "int:1337" --app-arg "str:bogo" --app-arg "int:$ROUND" --app-arg "int:$TIMESTAMP" --approval-prog ${DIR}/tealprogs/assetround.teal --global-byteslices 0 --global-ints 0 --local-byteslices 0 --local-ints 0 --clear-prog ${DIR}/tealprogs/approve-all.teal | grep Created | awk '{ print $6 }') +date '+after %s' + +# if [ "x${APP_ID}x" = "xx" ]; then +# set -e +# sleep 10 +# date '+before %s' +# ROUND=$(goal node status | grep 'Last committed' | awk '{ print $4 }') +# TIMESTAMP=$(date +%s) +# APP_ID=$(${gcmd} app create --creator ${ACCOUNT} --foreign-asset $ASSET_ID --app-arg "int:$ASSET_ID" --app-arg "int:1337" --app-arg "int:0" --app-arg "int:0" --app-arg "int:1337" --app-arg "str:bogo" --app-arg "int:$ROUND" --app-arg "int:$TIMESTAMP" --approval-prog ${DIR}/tealprogs/assetround.teal --global-byteslices 0 --global-ints 0 --local-byteslices 0 --local-ints 0 --clear-prog ${DIR}/tealprogs/approve-all.teal | grep Created | awk '{ print $6 }') +# date '+after %s' +# fi # Create another account, fund it, send it some asset ACCOUNTB=$(${gcmd} account new|awk '{ print $6 }') diff --git a/test/scripts/e2e_subs/tealprogs/assetround.teal b/test/scripts/e2e_subs/tealprogs/assetround.teal index e6b3c6144f..f147a8ac0c 100644 --- a/test/scripts/e2e_subs/tealprogs/assetround.teal +++ b/test/scripts/e2e_subs/tealprogs/assetround.teal @@ -69,39 +69,47 @@ bz fail round: -// Check round against arg 6 (arg < global Round, arg + 4 > global Round) +// Check round against arg 6 // arg <= Round <= arg + 4 +// (arg <= global Round, arg + 4 >= global Round) txna ApplicationArgs 6 btoi +dup global Round -< +<= bz fail -txna ApplicationArgs 6 -btoi +//txna ApplicationArgs 6 // from dup +//btoi int 4 + -// Check timestamp against arg 7 (arg < global LatestTimestamp + 60, arg + 60 > global LatestTimestamp) +global Round +>= +bz fail + +// Check timestamp against arg 7 (arg-60 <= timestamp <= arg+60) +// (arg <= global LatestTimestamp + 60, arg + 60 >= global LatestTimestamp) txna ApplicationArgs 7 btoi +dup global LatestTimestamp int 60 + -< +<= bz fail -txna ApplicationArgs 7 -btoi +//txna ApplicationArgs 7 // from dup +//btoi int 60 + global LatestTimestamp -> +>= bz fail success: diff --git a/test/testdata/configs/config-v28.json b/test/testdata/configs/config-v28.json index 7b6ceb5326..b885aa68a7 100644 --- a/test/testdata/configs/config-v28.json +++ b/test/testdata/configs/config-v28.json @@ -98,6 +98,8 @@ "RestReadTimeoutSeconds": 15, "RestWriteTimeoutSeconds": 120, "RunHosted": false, + "SpeculativeAsmTimeOffset": 0, + "SpeculativeAssemblyDisable": false, "StorageEngine": "sqlite", "SuggestedFeeBlockHistory": 3, "SuggestedFeeSlidingWindowSize": 50, diff --git a/test/testdata/configs/config-v29.json b/test/testdata/configs/config-v29.json index 8522011ce6..70bcc758b9 100644 --- a/test/testdata/configs/config-v29.json +++ b/test/testdata/configs/config-v29.json @@ -101,6 +101,8 @@ "RestReadTimeoutSeconds": 15, "RestWriteTimeoutSeconds": 120, "RunHosted": false, + "SpeculativeAsmTimeOffset": 0, + "SpeculativeAssemblyDisable": false, "StorageEngine": "sqlite", "SuggestedFeeBlockHistory": 3, "SuggestedFeeSlidingWindowSize": 50, diff --git a/test/testdata/configs/config-v30.json b/test/testdata/configs/config-v30.json new file mode 100644 index 0000000000..e73de9a943 --- /dev/null +++ b/test/testdata/configs/config-v30.json @@ -0,0 +1,126 @@ +{ + "Version": 30, + "AccountUpdatesStatsInterval": 5000000000, + "AccountsRebuildSynchronousMode": 1, + "AgreementIncomingBundlesQueueLength": 15, + "AgreementIncomingProposalsQueueLength": 50, + "AgreementIncomingVotesQueueLength": 20000, + "AnnounceParticipationKey": true, + "Archival": false, + "BaseLoggerDebugLevel": 4, + "BlockServiceCustomFallbackEndpoints": "", + "BlockServiceMemCap": 500000000, + "BroadcastConnectionsLimit": -1, + "CadaverDirectory": "", + "CadaverSizeTarget": 0, + "CatchpointFileHistoryLength": 365, + "CatchpointInterval": 10000, + "CatchpointTracking": 0, + "CatchupBlockDownloadRetryAttempts": 1000, + "CatchupBlockValidateMode": 0, + "CatchupFailurePeerRefreshRate": 10, + "CatchupGossipBlockFetchTimeoutSec": 4, + "CatchupHTTPBlockFetchTimeoutSec": 4, + "CatchupLedgerDownloadRetryAttempts": 50, + "CatchupParallelBlocks": 16, + "ConnectionsRateLimitingCount": 60, + "ConnectionsRateLimitingWindowSeconds": 1, + "DNSBootstrapID": ".algorand.network?backup=.algorand.net&dedup=.algorand-.(network|net)", + "DNSSecurityFlags": 1, + "DeadlockDetection": 0, + "DeadlockDetectionThreshold": 30, + "DisableLedgerLRUCache": false, + "DisableLocalhostConnectionRateLimit": true, + "DisableNetworking": false, + "DisableOutgoingConnectionThrottling": false, + "EnableAccountUpdatesStats": false, + "EnableAgreementReporting": false, + "EnableAgreementTimeMetrics": false, + "EnableAssembleStats": false, + "EnableBlockService": false, + "EnableBlockServiceFallbackToArchiver": true, + "EnableCatchupFromArchiveServers": false, + "EnableDeveloperAPI": false, + "EnableExperimentalAPI": false, + "EnableFollowMode": false, + "EnableGossipBlockService": true, + "EnableIncomingMessageFilter": false, + "EnableLedgerService": false, + "EnableMetricReporting": false, + "EnableOutgoingNetworkMessageFiltering": true, + "EnablePingHandler": true, + "EnableProcessBlockStats": false, + "EnableProfiler": false, + "EnableRequestLogger": false, + "EnableRuntimeMetrics": false, + "EnableTopAccountsReporting": false, + "EnableTxBacklogRateLimiting": false, + "EnableTxnEvalTracer": false, + "EnableUsageLog": false, + "EnableVerbosedTransactionSyncLogging": false, + "EndpointAddress": "127.0.0.1:0", + "FallbackDNSResolverAddress": "", + "ForceFetchTransactions": false, + "ForceRelayMessages": false, + "GossipFanout": 4, + "HeartbeatUpdateInterval": 600, + "IncomingConnectionsLimit": 2400, + "IncomingMessageFilterBucketCount": 5, + "IncomingMessageFilterBucketSize": 512, + "LedgerSynchronousMode": 2, + "LogArchiveMaxAge": "", + "LogArchiveName": "node.archive.log", + "LogSizeLimit": 1073741824, + "MaxAPIBoxPerApplication": 100000, + "MaxAPIResourcesPerAccount": 100000, + "MaxAcctLookback": 4, + "MaxCatchpointDownloadDuration": 43200000000000, + "MaxConnectionsPerIP": 15, + "MinCatchpointFileDownloadBytesPerSecond": 20480, + "NetAddress": "", + "NetworkMessageTraceServer": "", + "NetworkProtocolVersion": "", + "NodeExporterListenAddress": ":9100", + "NodeExporterPath": "./node_exporter", + "OptimizeAccountsDatabaseOnStartup": false, + "OutgoingMessageFilterBucketCount": 3, + "OutgoingMessageFilterBucketSize": 128, + "P2PEnable": false, + "P2PPersistPeerID": true, + "P2PPrivateKeyLocation": "", + "ParticipationKeysRefreshInterval": 60000000000, + "PeerConnectionsUpdateInterval": 3600, + "PeerPingPeriodSeconds": 0, + "PriorityPeers": {}, + "ProposalAssemblyTime": 500000000, + "PublicAddress": "", + "ReconnectTime": 60000000000, + "ReservedFDs": 256, + "RestConnectionsHardLimit": 2048, + "RestConnectionsSoftLimit": 1024, + "RestReadTimeoutSeconds": 15, + "RestWriteTimeoutSeconds": 120, + "RunHosted": false, + "StorageEngine": "sqlite", + "SpeculativeAsmTimeOffset": 400000000, + "SpeculativeAssemblyDisable": false, + "SuggestedFeeBlockHistory": 3, + "SuggestedFeeSlidingWindowSize": 50, + "TLSCertFile": "", + "TLSKeyFile": "", + "TelemetryToLog": true, + "TransactionSyncDataExchangeRate": 0, + "TransactionSyncSignificantMessageThreshold": 0, + "TxBacklogReservedCapacityPerPeer": 20, + "TxBacklogServiceRateWindowSeconds": 10, + "TxBacklogSize": 26000, + "TxIncomingFilterMaxSize": 500000, + "TxIncomingFilteringFlags": 1, + "TxPoolExponentialIncreaseFactor": 2, + "TxPoolSize": 75000, + "TxSyncIntervalSeconds": 60, + "TxSyncServeResponseSize": 1000000, + "TxSyncTimeoutSeconds": 30, + "UseXForwardedForAddressField": "", + "VerifiedTranscationsCacheSize": 150000 +}