Skip to content

Commit

Permalink
tx retry with higher gas price
Browse files Browse the repository at this point in the history
  • Loading branch information
0g-peterzhb committed Mar 10, 2025
1 parent e9f618e commit 2362362
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 65 deletions.
8 changes: 7 additions & 1 deletion cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type uploadArgument struct {

fragmentSize int64
maxGasPrice uint
nRetries int
step int64

timeout time.Duration
}
Expand All @@ -81,6 +83,8 @@ func bindUploadFlags(cmd *cobra.Command, args *uploadArgument) {

cmd.Flags().IntVar(&args.routines, "routines", runtime.GOMAXPROCS(0), "number of go routines for uploading simutanously")
cmd.Flags().UintVar(&args.maxGasPrice, "max-gas-price", 0, "max gas price to send transaction")
cmd.Flags().IntVar(&args.nRetries, "n-retries", 0, "number of retries for uploading when it's not gas price issue")
cmd.Flags().Int64Var(&args.step, "step", 15, "step of gas price increasing, step / 10 (for 15, the new gas price is 1.5 * last gas price)")

cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")
}
Expand Down Expand Up @@ -139,7 +143,9 @@ func upload(*cobra.Command, []string) {
SkipTx: uploadArgs.skipTx,
Fee: fee,
Nonce: nonce,
MaxGasPrice: maxGasPrice,
MaxGasPrice: maxGasPrice,
NRetries: uploadArgs.nRetries,
Step: uploadArgs.step,
}

file, err := core.Open(uploadArgs.file)
Expand Down
11 changes: 0 additions & 11 deletions common/blockchain/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,13 @@ func WaitForReceipt(ctx context.Context, client *web3go.Client, txHash common.Ha
opt = opts[0]
} else {
opt.Interval = time.Second * 3
opt.NRetries = 20
}

if opt.Interval == 0 {
opt.Interval = time.Second * 3
}

if opt.NRetries == 0 {
opt.NRetries = 20
}

reminder := util.NewReminder(opt.logger, time.Minute)
nRetries := 0
for receipt == nil {
if receipt, err = client.WithContext(ctx).Eth.TransactionReceipt(txHash); err != nil {
return nil, err
Expand All @@ -94,11 +88,6 @@ func WaitForReceipt(ctx context.Context, client *web3go.Client, txHash common.Ha
reminder.RemindWith("Transaction not executed yet", "hash", txHash)
}

nRetries += 1
if nRetries >= opt.NRetries {
return nil, errors.Errorf("Transaction not executed after %v retries, timeout", opt.NRetries)
}

time.Sleep(opt.Interval)
}

Expand Down
166 changes: 126 additions & 40 deletions contract/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/openweb3/web3go"
"github.com/openweb3/web3go/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand All @@ -26,11 +27,13 @@ type TxRetryOption struct {
Timeout time.Duration
MaxNonGasRetries int
MaxGasPrice *big.Int
Step int64
}

var SpecifiedBlockError = "Specified block header does not exist"
var DefaultTimeout = 15 * time.Second
var DefaultTimeout = 15 * time.Minute
var DefaultMaxNonGasRetries = 20
var DefaultStep = int64(15)

func IsRetriableSubmitLogEntryError(msg string) bool {
return strings.Contains(msg, SpecifiedBlockError) || strings.Contains(msg, "mempool") || strings.Contains(msg, "timeout")
Expand All @@ -52,6 +55,22 @@ func NewFlowContract(flowAddress common.Address, clientWithSigner *web3go.Client
return &FlowContract{contract, flow, clientWithSigner}, nil
}

func (f *FlowContract) GetNonce(ctx context.Context) (*big.Int, error) {
sm, err := f.clientWithSigner.GetSignerManager()
if err != nil {
return nil, err
}

addr := sm.List()[0].Address()

nonce, err := f.clientWithSigner.Eth.TransactionCount(addr, nil)
if err != nil {
return nil, err
}

return nonce, nil
}

func (f *FlowContract) GetGasPrice() (*big.Int, error) {
gasPrice, err := f.clientWithSigner.Eth.GasPrice()
if err != nil {
Expand Down Expand Up @@ -127,70 +146,137 @@ func TransactWithGasAdjustment(
retryOpts.Timeout = DefaultTimeout
}

if retryOpts.Step == 0 {
retryOpts.Step = DefaultStep
}

logrus.WithField("timeout", retryOpts.Timeout).WithField("maxNonGasRetries", retryOpts.MaxNonGasRetries).Debug("Set retry options")

if opts.Nonce == nil {
// Get the current nonce if not set.
nonce, err := contract.GetNonce(opts.Context)
if err != nil {
return nil, err
}
// add one to the nonce
opts.Nonce = nonce
}

logrus.WithField("nonce", opts.Nonce).Info("Set nonce")

if opts.GasPrice == nil {
// Get the current gas price if not set.
gasPrice, err := contract.GetGasPrice()
if err != nil {
return nil, fmt.Errorf("failed to get gas price: %w", err)
return nil, errors.WithMessage(err, "failed to get gas price")
}
opts.GasPrice = gasPrice
logrus.WithField("gasPrice", opts.GasPrice).Debug("Receive current gas price from chain node")
}

logrus.WithField("gasPrice", opts.GasPrice).Info("Set gas price")
// Instead of calling WaitForReceipt directly, do it in a goroutine
receiptCh := make(chan *types.Receipt, 1)
errCh := make(chan error, 1)
failCh := make(chan error, 1)
// Create a fresh context per iteration.
ctx, cancel := context.WithTimeout(context.Background(), retryOpts.Timeout)
opts.Context = ctx

// calculate number of gas retry by dividing max gas price by current gas price and the ration
nGasRetry := 0
if retryOpts.MaxGasPrice != nil {
gasPrice := opts.GasPrice
for gasPrice.Cmp(retryOpts.MaxGasPrice) < 0 {
gasPrice = new(big.Int).Mul(gasPrice, big.NewInt(retryOpts.Step))
gasPrice.Div(gasPrice, big.NewInt(10))
nGasRetry++
}
}

nRetries := 0
for {
// Create a fresh context per iteration.
ctx, cancel := context.WithTimeout(context.Background(), retryOpts.Timeout)
opts.Context = ctx
tx, err := contract.FlowTransactor.contract.Transact(opts, method, params...)

var receipt *types.Receipt
if err == nil {
// Wait for successful execution
receipt, err = contract.WaitForReceipt(ctx, tx.Hash(), true, blockchain.RetryOption{NRetries: retryOpts.MaxNonGasRetries})
go func() {
nRetries := 0
for {
select {
case <- ctx.Done():
// main or another goroutine canceled the context
logrus.Info("Context canceled; stopping outer loop")
return
default:
}
tx, err := contract.FlowTransactor.contract.Transact(opts, method, params...)

var receipt *types.Receipt
if err == nil {
cancel() // cancel this iteration's context
return receipt, nil
// Wait for successful execution
go func() {
receipt, err = contract.WaitForReceipt(ctx, tx.Hash(), true, blockchain.RetryOption{NRetries: retryOpts.MaxNonGasRetries})
if err == nil {
receiptCh <- receipt
return
}
errCh <- err
}()
time.Sleep(15 * time.Second)
err = fmt.Errorf("timeout")
}
}
cancel() // cancel this iteration's context

logrus.WithError(err).Error("Failed to send transaction")
logrus.WithError(err).Error("Failed to send transaction")

errStr := strings.ToLower(err.Error())
errStr := strings.ToLower(err.Error())

if !IsRetriableSubmitLogEntryError(errStr) {
return nil, fmt.Errorf("failed to send transaction: %w", err)
}
if !IsRetriableSubmitLogEntryError(errStr) {
failCh <- errors.WithMessage(err, "failed to send transaction")
return
}

if strings.Contains(errStr, "mempool") || strings.Contains(errStr, "timeout") {
if retryOpts.MaxGasPrice == nil {
return nil, fmt.Errorf("mempool full and no max gas price is set, failed to send transaction: %w", err)
} else {
newGasPrice := new(big.Int).Mul(opts.GasPrice, big.NewInt(15))
newGasPrice.Div(newGasPrice, big.NewInt(10))
if newGasPrice.Cmp(retryOpts.MaxGasPrice) > 0 {
opts.GasPrice = new(big.Int).Set(retryOpts.MaxGasPrice)
if strings.Contains(errStr, "mempool") || strings.Contains(errStr, "timeout") {
if retryOpts.MaxGasPrice == nil {
failCh <- errors.WithMessage(err, "mempool full and no max gas price is set, failed to send transaction")
return
} else if opts.GasPrice.Cmp(retryOpts.MaxGasPrice) >= 0 {
return
} else {
opts.GasPrice = newGasPrice
newGasPrice := new(big.Int).Mul(opts.GasPrice, big.NewInt(retryOpts.Step))
newGasPrice.Div(newGasPrice, big.NewInt(10))
if newGasPrice.Cmp(retryOpts.MaxGasPrice) > 0 {
opts.GasPrice = new(big.Int).Set(retryOpts.MaxGasPrice)
} else {
opts.GasPrice = newGasPrice
}
logrus.WithError(err).Infof("Increasing gas price to %v due to mempool/timeout error", opts.GasPrice)
}
logrus.WithError(err).Infof("Increasing gas price to %v due to mempool/timeout error", opts.GasPrice)
}
} else {
nRetries++
if nRetries >= retryOpts.MaxNonGasRetries {
return nil, fmt.Errorf("failed to send transaction after %d retries: %w", nRetries, err)
} else {
nRetries++
if nRetries >= retryOpts.MaxNonGasRetries {
failCh <- errors.WithMessage(err, "failed to send transaction")
return
}
logrus.WithError(err).Infof("Retrying with same gas price %v, attempt %d", opts.GasPrice, nRetries)
}
logrus.WithError(err).Infof("Retrying with same gas price %v, attempt %d", opts.GasPrice, nRetries)

time.Sleep(10 * time.Second)
}
}()

time.Sleep(10 * time.Second)
nErr := 0
for {
select {
case receipt := <-receiptCh:
cancel()
return receipt, nil
case err := <-errCh:
nErr++
if nErr >= nGasRetry {
failCh <- errors.WithMessage(err, "All gas price retries failed")
cancel()
return nil, err
}
case err := <-failCh:
cancel()
return nil, err
}
}

}

func (submission Submission) Fee(pricePerSector *big.Int) *big.Int {
Expand Down
5 changes: 4 additions & 1 deletion tests/go_tests/segment_upload_test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ func runTest() error {
if err != nil {
return errors.WithMessage(err, "failed to initialize uploader")
}
_, _, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, make([][]byte, 1), nil, nil, nil)
_, _, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, make([][]byte, 1), transfer.SubmitLogEntryOption{
NRetries: 5,
Step: 15,
})
if err != nil {
return errors.WithMessage(err, "failed to sub log entry")
}
Expand Down
Loading

0 comments on commit 2362362

Please sign in to comment.