Skip to content

Commit

Permalink
core/txpool/legacypool: implement transaction tracking for handling l…
Browse files Browse the repository at this point in the history
…ocal txs
  • Loading branch information
holiman committed Mar 20, 2024
1 parent de08f3d commit 28c2f3e
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 3 deletions.
182 changes: 182 additions & 0 deletions core/txpool/legacypool/tx_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Package legacypool implements the normal EVM execution transaction pool.
package legacypool

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"golang.org/x/exp/slices"
)

var recheckInterval = time.Minute

// TxTracker is a struct used to track priority transactions; it will check from
// time to time if the main pool has forgotten about any of the transaction
// it is tracking, and if so, submit it again.
// This is used to track 'locals'.
// This struct does not care about transaction validity, price-bumps or account limits,
// but optimistically accepts transactions.
type TxTracker struct {
all map[common.Hash]*types.Transaction // All tracked transactions
byAddr map[common.Address]*sortedMap // Transactions by address

journal *journal // Journal of local transaction to back up to disk
rejournal time.Duration // How often to rotate journal
pool txpool.SubPool // The 'main' subpool to interact with
signer types.Signer

shutdownCh chan struct{}
mu sync.Mutex
wg sync.WaitGroup
}

func NewTxTracker(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next txpool.SubPool) *TxTracker {
signer := types.LatestSigner(chainConfig)
pool := &TxTracker{
all: make(map[common.Hash]*types.Transaction),
byAddr: make(map[common.Address]*sortedMap),
signer: signer,
shutdownCh: make(chan struct{}),
pool: next,
}
if journalPath != "" {
pool.journal = newTxJournal(journalPath)
pool.rejournal = journalTime
}
return pool
}

// Track adds a transaction to the tracked set.
func (tracker *TxTracker) Track(tx *types.Transaction) {
tracker.TrackAll([]*types.Transaction{tx})
}

// TrackAll adds a list of transactions to the tracked set.
func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
tracker.mu.Lock()
defer tracker.mu.Unlock()
for _, tx := range txs {
// If we're already tracking it, it's a no-op
if _, ok := tracker.all[tx.Hash()]; ok {
continue
}
tracker.all[tx.Hash()] = tx
addr, _ := types.Sender(tracker.signer, tx)
if tracker.byAddr[addr] == nil {
tracker.byAddr[addr] = newSortedMap()
}
tracker.byAddr[addr].Put(tx)
}
}

// recheck checks and returns any transactions that needs to be resubmitted.
func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) {
tracker.mu.Lock()
defer tracker.mu.Unlock()
var (
nStales = 0
nOk = 0
)
for sender, txs := range tracker.byAddr {
stales := txs.Forward(tracker.pool.Nonce(sender))
// Wipe the stales
for _, tx := range stales {
delete(tracker.all, tx.Hash())
}
nStales += len(stales)
// Check the non-stale
for _, tx := range txs.Flatten() {
if tracker.pool.Has(tx.Hash()) {
nOk++
continue
}
resubmits = append(resubmits, tx)
}
}

if journalCheck { // rejournal
rejournal = make(map[common.Address]types.Transactions)
for _, tx := range tracker.all {
addr, _ := types.Sender(tracker.signer, tx)
rejournal[addr] = append(rejournal[addr], tx)
}
// Sort them
for _, list := range rejournal {
// cmp(a, b) should return a negative number when a < b,
slices.SortFunc(list, func(a, b *types.Transaction) int {
return int(a.Nonce() - b.Nonce())
})
}
}
log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", nStales, "ok", nOk)
return resubmits, rejournal
}

// Start implements node.Lifecycle interface
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
func (tracker *TxTracker) Start() error {
tracker.wg.Add(1)
go tracker.loop()
return nil
}

// Start implements node.Lifecycle interface
// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
func (tracker *TxTracker) Stop() error {
close(tracker.shutdownCh)
tracker.wg.Wait()
return nil
}

func (tracker *TxTracker) loop() {
defer tracker.wg.Done()
tracker.journal.load(func(transactions []*types.Transaction) []error {
tracker.TrackAll(transactions)
return nil
})
var lastJournal = time.Now()
// Do initial check after 10 seconds, do rechecks more seldom.
t := time.NewTimer(10 * time.Second)
for {
select {
case <-tracker.shutdownCh:
return
case <-t.C:
checkJournal := time.Since(lastJournal) > tracker.rejournal
resubmits, rejournal := tracker.recheck(checkJournal)
if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false, false)
}
if checkJournal {
lastJournal = time.Now()
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
}
t.Reset(recheckInterval)
}
}
}
3 changes: 2 additions & 1 deletion core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// back the errors into the original sort order.
errsets := make([][]error, len(p.subpools))
for i := 0; i < len(p.subpools); i++ {
errsets[i] = p.subpools[i].Add(txsets[i], local, sync)
// Note: local is explicitly set to false here.
errsets[i] = p.subpools[i].Add(txsets[i], false, sync)
}
errs := make([]error, len(txs))
for i, split := range splits {
Expand Down
3 changes: 3 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
}

func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
if locals := b.eth.localTxTracker; locals != nil {
locals.Track(signedTx)
}
return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0]
}

Expand Down
25 changes: 23 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type Ethereum struct {
config *ethconfig.Config

// Handlers
txPool *txpool.TxPool
txPool *txpool.TxPool
localTxTracker *legacypool.TxTracker

blockchain *core.BlockChain
handler *handler
Expand Down Expand Up @@ -225,7 +226,27 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if config.TxPool.Journal != "" {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}
legacyPool := legacypool.New(config.TxPool, eth.blockchain)
// Init the legacypool without locals, to avoid collisions on the
// locals-journal.
legacyPool := legacypool.New(legacypool.Config{
Locals: nil,
NoLocals: true,
Journal: "",
Rejournal: 0,
PriceLimit: config.TxPool.PriceLimit,
PriceBump: config.TxPool.PriceBump,
AccountSlots: config.TxPool.AccountSlots,
GlobalSlots: config.TxPool.GlobalSlots,
AccountQueue: config.TxPool.AccountQueue,
GlobalQueue: config.TxPool.GlobalQueue,
Lifetime: config.TxPool.Lifetime,
}, eth.blockchain)
if !config.TxPool.NoLocals {
eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal,
config.TxPool.Rejournal,
eth.blockchain.Config(), legacyPool)
stack.RegisterLifecycle(eth.localTxTracker)
}

eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool})
if err != nil {
Expand Down

0 comments on commit 28c2f3e

Please sign in to comment.