Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: more precise mempool metrics #1158

Merged
merged 12 commits into from
Jan 3, 2024
9 changes: 9 additions & 0 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "mempool"

FailedPrecheck = "precheck"
FailedAdding = "adding"
FailedRecheck = "recheck"

EvictedNewTxFullMempool = "full-removed-incoming"
EvictedExistingTxFullMempool = "full-removed-existing"
EvictedTxExpiredBlocks = "expired-ttl-blocks"
EvictedTxExpiredTime = "expired-ttl-time"
)

// Metrics contains metrics exposed by this package.
Expand Down
37 changes: 30 additions & 7 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -56,6 +58,8 @@ type TxMempool struct {
txs *clist.CList // valid transactions (passed CheckTx)
txByKey map[types.TxKey]*clist.CElement
txBySender map[string]*clist.CElement // for sender != ""

traceClient *trace.Client
}

// NewTxMempool constructs a new, empty priority mempool at the specified
Expand All @@ -79,6 +83,7 @@ func NewTxMempool(
height: height,
txByKey: make(map[types.TxKey]*clist.CElement),
txBySender: make(map[string]*clist.CElement),
traceClient: &trace.Client{},
}
if cfg.CacheSize > 0 {
txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
Expand Down Expand Up @@ -110,6 +115,12 @@ func WithMetrics(metrics *mempool.Metrics) TxMempoolOption {
return func(txmp *TxMempool) { txmp.metrics = metrics }
}

func WithTraceClient(tc *trace.Client) TxMempoolOption {
return func(txmp *TxMempool) {
txmp.traceClient = tc
}
}

// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
// release the lock when finished.
func (txmp *TxMempool) Lock() { txmp.mtx.Lock() }
Expand Down Expand Up @@ -192,7 +203,8 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
// If a precheck hook is defined, call it before invoking the application.
if txmp.preCheck != nil {
if err := txmp.preCheck(tx); err != nil {
txmp.metrics.FailedTxs.Add(1)
txmp.metrics.FailedTxs.With(mempool.FailedPrecheck).Add(1)
schema.WriteMempoolRejected(txmp.traceClient, err.Error())
return 0, mempool.ErrPreCheck{Reason: err}
}
}
Expand Down Expand Up @@ -469,7 +481,15 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
"post_check_err", err,
)

txmp.metrics.FailedTxs.Add(1)
txmp.metrics.FailedTxs.With(mempool.FailedAdding).Add(1)
reason := fmt.Sprintf(
"code: %d codespace: %s logs: %s postCheck error: %v",
checkTxRes.Code,
checkTxRes.Codespace,
checkTxRes.Log,
err,
)
schema.WriteMempoolRejected(txmp.traceClient, reason)

// Remove the invalid transaction from the cache, unless the operator has
// instructed us to keep invalid transactions.
Expand Down Expand Up @@ -537,7 +557,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
checkTxRes.MempoolError =
fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)",
wtx.tx.Hash())
txmp.metrics.EvictedTxs.Add(1)
txmp.metrics.EvictedTxs.With(mempool.EvictedNewTxFullMempool).Add(1)
return
}

Expand Down Expand Up @@ -569,7 +589,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
txmp.metrics.EvictedTxs.With(mempool.EvictedExistingTxFullMempool).Add(1)

// We may not need to evict all the eligible transactions. Bail out
// early if we have made enough room.
Expand Down Expand Up @@ -646,9 +666,12 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.Respons
"code", checkTxRes.Code,
)
txmp.removeTxByElement(elt)
txmp.metrics.FailedTxs.Add(1)
txmp.metrics.FailedTxs.With(mempool.FailedRecheck).Add(1)
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
if !txmp.config.KeepInvalidTxsInCache {
txmp.cache.Remove(wtx.tx)
if err != nil {
schema.WriteMempoolRejected(txmp.traceClient, err.Error())
}
}
txmp.metrics.Size.Set(float64(txmp.Size()))
}
Expand Down Expand Up @@ -758,11 +781,11 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
txmp.metrics.EvictedTxs.With(mempool.EvictedTxExpiredBlocks).Add(1)
} else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
txmp.metrics.EvictedTxs.With(mempool.EvictedTxExpiredTime).Add(1)
}
cur = next
}
Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func createMempoolAndMempoolReactor(
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
mempoolv1.WithTraceClient(traceClient),
)

reactor := mempoolv1.NewReactor(
Expand Down
23 changes: 17 additions & 6 deletions pkg/trace/schema/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func MempoolTables() []string {
return []string{
MempoolTxTable,
MempoolPeerStateTable,
MempoolRejectedTable,
}
}

Expand Down Expand Up @@ -103,10 +104,10 @@ const (
func WriteMempoolPeerState(client *trace.Client, peer p2p.ID, stateUpdate, transferType, version string) {
// this check is redundant to what is checked during WritePoint, although it
// is an optimization to avoid allocations from creating the map of fields.
if !client.IsCollecting(RoundStateTable) {
if !client.IsCollecting(MempoolPeerStateTable) {
return
}
client.WritePoint(RoundStateTable, map[string]interface{}{
client.WritePoint(MempoolPeerStateTable, map[string]interface{}{
PeerFieldKey: peer,
TransferTypeFieldKey: transferType,
StateUpdateFieldKey: stateUpdate,
Expand All @@ -115,8 +116,18 @@ func WriteMempoolPeerState(client *trace.Client, peer p2p.ID, stateUpdate, trans
}

const (
// LocalTable is the tracing "measurement" (aka table) for the local mempool
// updates, such as when a tx is added or removed.
// TODO: actually implement the local mempool tracing
// LocalTable = "mempool_local"
MempoolRejectedTable = "mempool_rejected"
ReasonFieldKey = "reason"
)

// WriteMempoolRejected records why a transaction was rejected.
func WriteMempoolRejected(client *trace.Client, reason string) {
// this check is redundant to what is checked during WritePoint, although it
// is an optimization to avoid allocations from creating the map of fields.
if !client.IsCollecting(MempoolRejectedTable) {
return
}
client.WritePoint(MempoolRejectedTable, map[string]interface{}{
ReasonFieldKey: reason,
})
}
Loading