diff --git a/services/leanhelixterm/leanhelix_term.go b/services/leanhelixterm/leanhelix_term.go index dfd452d..2c44fef 100644 --- a/services/leanhelixterm/leanhelix_term.go +++ b/services/leanhelixterm/leanhelix_term.go @@ -20,9 +20,13 @@ import ( "github.com/orbs-network/lean-helix-go/spec/types/go/protocol" "github.com/orbs-network/lean-helix-go/state" "github.com/orbs-network/scribe/log" + "github.com/pkg/errors" "math" + "time" ) +const CallCommitteeContractInterval = 200 * time.Millisecond + type LeanHelixTerm struct { *ConsensusMessagesFilter termInCommittee *termincommittee.TermInCommittee @@ -35,21 +39,21 @@ func NewLeanHelixTerm(ctx context.Context, logger logger.LHLogger, config *inter myMemberId := config.Membership.MyMemberId() messageFactory := messagesfactory.NewMessageFactory(config.InstanceId, config.KeyManager, myMemberId, randomSeed) - committeeMembers, err := requestOrderedCommittee(state, blockHeight, randomSeed, config) + committeeMembers, err := requestOrderedCommitteePersist(state, blockHeight, randomSeed, config, logger) if err != nil { - logger.Info("OUT OF COMMITTEE WITH ERROR RECEIVING COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, error=%s", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, err) - return termNotInCommittee(randomSeed, config) + logger.Info("ERROR RECEIVING COMMITTEE: H=%d, error=%s", blockHeight, err) } - - isParticipating := isParticipatingInCommittee(myMemberId, committeeMembers) - logger.Debug("RECEIVED COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, members=%s, isParticipating=%t", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, termincommittee.ToCommitteeMembersStr(committeeMembers), isParticipating) - logger.ConsensusTrace("got committee for the current consensus round", nil, log.StringableSlice("committee", committeeMembers)) + // on ctx terminated requestOrderedCommitteePersist returns nil committee + isParticipating := isParticipatingInTerm(myMemberId, committeeMembers) if !isParticipating { - logger.Info("OUT OF COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, members=%s, isParticipating=%t", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, termincommittee.ToCommitteeMembersStr(committeeMembers), isParticipating) + logger.Debug("OUT OF COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, members=%s, isParticipating=%t", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, termincommittee.ToCommitteeMembersStr(committeeMembers), isParticipating) return termNotInCommittee(randomSeed, config) } + logger.Debug("RECEIVED COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, members=%s, isParticipating=%t", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, termincommittee.ToCommitteeMembersStr(committeeMembers), isParticipating) + logger.ConsensusTrace("got committee for the current consensus round", nil, log.StringableSlice("committee", committeeMembers)) + termInCommittee := termincommittee.NewTermInCommittee(logger, config, state, messageFactory, electionTrigger, committeeMembers, prevBlock, canBeFirstLeader, CommitsToProof(logger, config.KeyManager, onCommit)) return &LeanHelixTerm{ ConsensusMessagesFilter: NewConsensusMessagesFilter(termInCommittee, config.KeyManager, randomSeed), @@ -57,19 +61,44 @@ func NewLeanHelixTerm(ctx context.Context, logger logger.LHLogger, config *inter } } -func requestOrderedCommittee(s *state.State, blockHeight primitives.BlockHeight, randomSeed uint64, config *interfaces.Config) ([]primitives.MemberId, error) { +func requestOrderedCommitteePersist(s *state.State, blockHeight primitives.BlockHeight, randomSeed uint64, config *interfaces.Config, logger logger.LHLogger) ([]primitives.MemberId, error) { const maxView = primitives.View(math.MaxUint64) ctx, err := s.Contexts.For(state.NewHeightView(blockHeight, maxView)) // term-level context if err != nil { return nil, err } - committeeMembers, err := config.Membership.RequestOrderedCommittee(ctx, blockHeight, randomSeed) - if err != nil { - return nil, err + logger.Debug("Polling RequestOrderedCommittee: H=%d, interval-between-attempts=%d", blockHeight, CallCommitteeContractInterval) + + attempts := 1 + for { + + // exit on term update (node sync) or system shutdown + if ctx.Err() != nil { + return nil, errors.Wrap(ctx.Err(), "requestOrderedCommitteePersist: context terminated") + } + + committeeMembers, err := config.Membership.RequestOrderedCommittee(ctx, blockHeight, randomSeed) + if err == nil { + return committeeMembers, nil + } + + // log every 500 failures + if attempts%500 == 1 { + if ctx.Err() == nil { // this may fail rightfully on graceful shutdown (ctx.Done), we don't want to report an error in this case + logger.Info("requestOrderedCommitteePersist: cannot get ordered committee #attempts=%d, error=%s", attempts, err) + } + } + + // sleep or wait for ctx done, whichever comes first + sleepOrShutdown, cancel := context.WithTimeout(ctx, CallCommitteeContractInterval) + <-sleepOrShutdown.Done() + cancel() + + attempts++ } - return committeeMembers, nil } + func termNotInCommittee(randomSeed uint64, config *interfaces.Config) *LeanHelixTerm { return &LeanHelixTerm{ ConsensusMessagesFilter: NewConsensusMessagesFilter(nil, config.KeyManager, randomSeed), @@ -84,7 +113,7 @@ func (lht *LeanHelixTerm) Dispose() { } } -func isParticipatingInCommittee(myMemberId primitives.MemberId, committeeMembers []primitives.MemberId) bool { +func isParticipatingInTerm(myMemberId primitives.MemberId, committeeMembers []primitives.MemberId) bool { for _, committeeMember := range committeeMembers { if myMemberId.Equal(committeeMember) { return true @@ -99,3 +128,4 @@ func printShortBlockProofBytes(b []byte) string { } return fmt.Sprintf("%s..%s", hex.EncodeToString(b[:6]), hex.EncodeToString(b[len(b)-6:])) } + diff --git a/services/leanhelixterm/leanhelix_term_participating_test.go b/services/leanhelixterm/leanhelix_term_participating_test.go index a1888b9..8969e37 100644 --- a/services/leanhelixterm/leanhelix_term_participating_test.go +++ b/services/leanhelixterm/leanhelix_term_participating_test.go @@ -18,7 +18,7 @@ func TestParticipating(t *testing.T) { memberId2 := primitives.MemberId("Member 2") memberId3 := primitives.MemberId("Member 3") committeeMembers := []primitives.MemberId{myMemberId, memberId1, memberId2, memberId3} - actual := isParticipatingInCommittee(myMemberId, committeeMembers) + actual := isParticipatingInTerm(myMemberId, committeeMembers) require.True(t, actual) } @@ -28,7 +28,7 @@ func TestParticipatingLastInList(t *testing.T) { memberId2 := primitives.MemberId("Member 2") memberId3 := primitives.MemberId("Member 3") committeeMembers := []primitives.MemberId{memberId1, memberId2, memberId3, myMemberId} - actual := isParticipatingInCommittee(myMemberId, committeeMembers) + actual := isParticipatingInTerm(myMemberId, committeeMembers) require.True(t, actual) } @@ -38,6 +38,6 @@ func TestNotParticipating(t *testing.T) { memberId2 := primitives.MemberId("Member 2") memberId3 := primitives.MemberId("Member 3") committeeMembers := []primitives.MemberId{memberId1, memberId2, memberId3} - actual := isParticipatingInCommittee(myMemberId, committeeMembers) + actual := isParticipatingInTerm(myMemberId, committeeMembers) require.False(t, actual) }