Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start poller after fast sync is completed #54

Open
wants to merge 7 commits into
base: base/consumer-chain-support
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion finality-provider/service/chain_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func NewChainPoller(

func (cp *ChainPoller) Start(startHeight uint64) error {
if cp.isStarted.Swap(true) {
return fmt.Errorf("the poller is already started")
cp.logger.Info("the poller is already started")
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change to return nil? If so, it will hide the fact that the poller is called twice, which we do not expect.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for a mistake, this update is for the Stop function to fix the failed e2e TestFastSync.

}

cp.logger.Info("starting the chain poller")
Expand Down
209 changes: 130 additions & 79 deletions finality-provider/service/fp_instance.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package service

import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -116,12 +115,22 @@ func (fp *FinalityProviderInstance) Start() error {
zap.String("pk", fp.GetBtcPkHex()), zap.Uint64("height", startHeight))

poller := NewChainPoller(fp.logger, fp.cfg.PollerConfig, fp.cc, fp.consumerCon, fp.metrics)
fp.poller = poller

if err := poller.Start(startHeight); err != nil {
return fmt.Errorf("failed to start the poller: %w", err)
// get the last finalized height
lastFinalizedBlock, err := fp.latestFinalizedBlockWithRetry()
if err != nil {
return err
}

fp.poller = poller
// Start the poller if fast sync is disabled or there's no finalized block
if (fp.cfg.FastSyncInterval == 0 || lastFinalizedBlock == nil) && !fp.poller.IsRunning() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without lastFinalizedBlock == nil, babylon e2e TestFastSync will fail b/c tm.WaitForFpVoteCast(t, fpIns) will timeout

this is b/c in the code below, tryFastSync will not do anything

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move !fp.poller.IsRunning() out to a new if condition? imo if !fp.poller.IsRunning() then we need to return err? Also if this if statement is false, when will the poller start?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this if statement is false, we start the poller after fast sync is finished.

// start poller after fast sync is finished
if !fp.poller.IsRunning() {
   err := fp.poller.Start(res.LastProcessedHeight + 1)
   if err != nil {
   	fp.logger.Error("failed to start the poller", zap.Error(err))
   	fp.reportCriticalErr(err)
   }
   continue
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we should add these check because up to here the fast sync is already finished and should be safe to start the poller. Can we have a workaround in e2e?

if err := fp.poller.Start(startHeight); err != nil {
fp.logger.Error("failed to start the poller", zap.Error(err))
fp.reportCriticalErr(err)
return err
}
}

fp.laggingTargetChan = make(chan uint64, 1)

Expand All @@ -143,7 +152,7 @@ func (fp *FinalityProviderInstance) bootstrap() (uint64, error) {
return 0, err
}

if fp.checkLagging(latestBlockHeight) {
if fp.cfg.FastSyncInterval != 0 && fp.checkLagging(latestBlockHeight) {
_, err := fp.tryFastSync(latestBlockHeight)
if err != nil && !fpcc.IsExpected(err) {
return 0, err
Expand Down Expand Up @@ -186,71 +195,6 @@ func (fp *FinalityProviderInstance) finalitySigSubmissionLoop() {

for {
select {
case b := <-fp.poller.GetBlockInfoChan():
fp.logger.Debug(
"the finality-provider received a new block, start processing",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("height", b.Height),
zap.String("block_hash", hex.EncodeToString(b.Hash)),
)

// check whether the block has been processed before
if fp.hasProcessed(b.Height) {
continue
}
// check whether the finality provider has voting power
hasVp, err := fp.hasVotingPower(b.Height)
if err != nil {
fp.reportCriticalErr(err)
continue
}
if !hasVp {
// the finality provider does not have voting power
// and it will never will at this block
fp.MustSetLastProcessedHeight(b.Height)
fp.metrics.IncrementFpTotalBlocksWithoutVotingPower(fp.GetBtcPkHex())
continue
}
// check whether the randomness has been committed
// the retry will end if max retry times is reached
// or the target block is finalized
isFinalized, err := fp.retryCheckRandomnessUntilBlockFinalized(b)
if err != nil {
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
break
}
// the block is finalized, no need to submit finality signature
if isFinalized {
fp.MustSetLastProcessedHeight(b.Height)
continue
}

// use the copy of the block to avoid the impact to other receivers
nextBlock := *b
res, err := fp.retrySubmitFinalitySignatureUntilBlockFinalized(&nextBlock)
if err != nil {
fp.metrics.IncrementFpTotalFailedVotes(fp.GetBtcPkHex())
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
continue
}
if res == nil {
// this can happen when a finality signature is not needed
// either if the block is already submitted or the signature
// is already submitted
continue
}
fp.logger.Info(
"successfully submitted a finality signature to the consumer chain",
zap.String("consumer_id", string(fp.GetChainID())),
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("height", b.Height),
zap.String("tx_hash", res.TxHash),
)

case targetBlock := <-fp.laggingTargetChan:
res, err := fp.tryFastSync(targetBlock)
fp.isLagging.Store(false)
Expand All @@ -275,6 +219,16 @@ func (fp *FinalityProviderInstance) finalitySigSubmissionLoop() {
zap.Uint64("last_processed_height", res.LastProcessedHeight),
)

// start poller after fast sync is finished
if !fp.poller.IsRunning() {
err := fp.poller.Start(res.LastProcessedHeight + 1)
if err != nil {
fp.logger.Error("failed to start the poller", zap.Error(err))
fp.reportCriticalErr(err)
}
continue
}
Comment on lines +223 to +230
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the start of the poller should be controlled within the sig submission loop. The poller should be already started when the loop is on


// inform the poller to skip to the next block of the last
// processed one
err := fp.poller.SkipToHeight(fp.GetLastProcessedHeight() + 1)
Expand All @@ -288,10 +242,96 @@ func (fp *FinalityProviderInstance) finalitySigSubmissionLoop() {
case <-fp.quit:
fp.logger.Info("the finality signature submission loop is closing")
return
default:
pollerBlocks := fp.getAllBlocksFromChan()
if len(pollerBlocks) == 0 {
continue
}
targetHeight := pollerBlocks[len(pollerBlocks)-1].Height
fp.logger.Debug("the finality-provider received new block(s), start processing",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
)
res, err := fp.retrySubmitFinalitySignatureUntilBlocksFinalized(pollerBlocks)
if err != nil {
fp.metrics.IncrementFpTotalFailedVotes(fp.GetBtcPkHex())
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
continue
}
if res == nil {
// this can happen when a finality signature is not needed
// either if the block is already submitted or the signature
// is already submitted
continue
}
fp.logger.Info(
"successfully submitted the finality signature to the consumer chain",
zap.String("consumer_id", string(fp.GetChainID())),
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
zap.String("tx_hash", res.TxHash),
)
}
}
}

func (fp *FinalityProviderInstance) getAllBlocksFromChan() []*types.BlockInfo {
var pollerBlocks []*types.BlockInfo
for {
select {
case b := <-fp.poller.GetBlockInfoChan():
shouldProcess, err := fp.shouldProcessBlock(b)
if err != nil {
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
break
}
if shouldProcess {
pollerBlocks = append(pollerBlocks, b)
}
default:
return pollerBlocks
}
}
}

func (fp *FinalityProviderInstance) shouldProcessBlock(b *types.BlockInfo) (bool, error) {
// check whether the block has been processed before
if fp.hasProcessed(b.Height) {
return false, nil
}
// check whether the finality provider has voting power
hasVp, err := fp.hasVotingPower(b.Height)
if err != nil {
return false, err
}
if !hasVp {
// the finality provider does not have voting power
// and it will never will at this block
fp.MustSetLastProcessedHeight(b.Height)
fp.metrics.IncrementFpTotalBlocksWithoutVotingPower(fp.GetBtcPkHex())
return false, nil
}
// check whether the randomness has been committed
// the retry will end if max retry times is reached
// or the target block is finalized
isFinalized, err := fp.retryCheckRandomnessUntilBlockFinalized(b)
if err != nil {
return false, err
}
// the block is finalized, no need to submit finality signature
if isFinalized {
fp.MustSetLastProcessedHeight(b.Height)
return false, nil
}
return true, nil
}

func (fp *FinalityProviderInstance) randomnessCommitmentLoop(startHeight uint64) {
defer fp.wg.Done()

Expand Down Expand Up @@ -399,6 +439,11 @@ func (fp *FinalityProviderInstance) checkLaggingLoop() {
}

func (fp *FinalityProviderInstance) tryFastSync(targetBlockHeight uint64) (*FastSyncResult, error) {
fp.logger.Debug(
"trying fast sync",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("target_block_height", targetBlockHeight))

if fp.inSync.Load() {
return nil, fmt.Errorf("the finality-provider %s is already in sync", fp.GetBtcPkHex())
}
Expand Down Expand Up @@ -576,23 +621,29 @@ func (fp *FinalityProviderInstance) retryCheckRandomnessUntilBlockFinalized(targ
}
}

// retrySubmitFinalitySignatureUntilBlockFinalized periodically tries to submit finality signature until success or the block is finalized
// retrySubmitFinalitySignatureUntilBlocksFinalized periodically tries to submit finality signature until success or the block is finalized
// error will be returned if maximum retries have been reached or the query to the consumer chain fails
func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlockFinalized(targetBlock *types.BlockInfo) (*types.TxResponse, error) {
func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlocksFinalized(targetBlocks []*types.BlockInfo) (*types.TxResponse, error) {
var failedCycles uint32

targetHeight := targetBlocks[len(targetBlocks)-1].Height
// we break the for loop if the block is finalized or the signature is successfully submitted
// error will be returned if maximum retries have been reached or the query to the consumer chain fails
for {
// error will be returned if max retries have been reached
res, err := fp.SubmitFinalitySignature(targetBlock)
var res *types.TxResponse
var err error
if len(targetBlocks) == 1 {
res, err = fp.SubmitFinalitySignature(targetBlocks[0])
} else {
res, err = fp.SubmitBatchFinalitySignatures(targetBlocks)
}
if err != nil {

fp.logger.Debug(
"failed to submit finality signature to the consumer chain",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint32("current_failures", failedCycles),
zap.Uint64("target_block_height", targetBlock.Height),
zap.Uint64("target_start_height", targetBlocks[0].Height),
zap.Uint64("target_end_height", targetHeight),
zap.Error(err),
)

Expand All @@ -615,15 +666,15 @@ func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlockFinali
select {
case <-time.After(fp.cfg.SubmissionRetryInterval):
// periodically query the index block to be later checked whether it is Finalized
finalized, err := fp.consumerCon.QueryIsBlockFinalized(targetBlock.Height)
finalized, err := fp.consumerCon.QueryIsBlockFinalized(targetHeight)
if err != nil {
return nil, fmt.Errorf("failed to query block finalization at height %v: %w", targetBlock.Height, err)
return nil, fmt.Errorf("failed to query block finalization at height %v: %w", targetHeight, err)
}
if finalized {
fp.logger.Debug(
"the block is already finalized, skip submission",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("target_height", targetBlock.Height),
zap.Uint64("target_height", targetHeight),
)
// TODO: returning nil here is to safely break the loop
// the error still exists
Expand Down
Loading
Loading