Skip to content

Commit

Permalink
Merge pull request #38 from Layr-Labs/refactor-metrics
Browse files Browse the repository at this point in the history
Refactor metrics on AVS sync
  • Loading branch information
samlaf authored May 15, 2024
2 parents 7b17542 + 0130407 commit 7388343
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 28 deletions.
26 changes: 19 additions & 7 deletions avssync/avssync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AvsSync struct {
readerTimeoutDuration time.Duration
writerTimeoutDuration time.Duration
prometheusServerAddr string
Metrics *Metrics
}

// NewAvsSync creates a new AvsSync object
Expand All @@ -44,7 +45,10 @@ func NewAvsSync(
quorums []byte, fetchQuorumsDynamically bool, retrySyncNTimes int,
readerTimeoutDuration time.Duration, writerTimeoutDuration time.Duration,
prometheusServerAddr string,
prometheusRegistry *prometheus.Registry,
) *AvsSync {
metrics := NewMetrics(prometheusRegistry)

return &AvsSync{
AvsReader: avsReader,
AvsWriter: avsWriter,
Expand All @@ -58,6 +62,7 @@ func NewAvsSync(
readerTimeoutDuration: readerTimeoutDuration,
writerTimeoutDuration: writerTimeoutDuration,
prometheusServerAddr: prometheusServerAddr,
Metrics: metrics,
}
}

Expand All @@ -76,7 +81,7 @@ func (a *AvsSync) Start(ctx context.Context) {
)

if a.prometheusServerAddr != "" {
StartMetricsServer(a.prometheusServerAddr)
a.Metrics.Start(a.prometheusServerAddr)
} else {
a.logger.Info("Prometheus server address not set, not starting metrics server")
}
Expand Down Expand Up @@ -129,11 +134,13 @@ func (a *AvsSync) updateStakes() {
receipt, err := a.AvsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators)
if err != nil {
// no quorum label means we are updating all quorums
updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError), "quorum": ""}).Inc()
for _, quorum := range a.quorums {
a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusError, strconv.Itoa(int(quorum)))
}
a.logger.Error("Error updating stakes of operator subset for all quorums", err)
return
} else if receipt.Status == gethtypes.ReceiptStatusFailed {
txRevertedTotal.Inc()
a.Metrics.TxRevertedTotalInc()
a.logger.Error("Update stakes of operator subset for all quorums reverted")
return
}
Expand Down Expand Up @@ -178,7 +185,6 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
}
var operators []common.Address
operators = append(operators, operatorAddrsPerQuorum[0]...)
operatorsUpdated.With(prometheus.Labels{"quorum": strconv.Itoa(int(quorum))}).Set(float64(len(operators)))
sort.Slice(operators, func(i, j int) bool {
return operators[i].Big().Cmp(operators[j].Big()) < 0
})
Expand All @@ -191,14 +197,20 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
continue
}
if receipt.Status == gethtypes.ReceiptStatusFailed {
txRevertedTotal.Inc()
a.Metrics.TxRevertedTotalInc()
a.logger.Error("Update stakes of entire operator set for quorum reverted", "quorum", int(quorum))
continue
}
updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusSucceed), "quorum": strconv.Itoa(int(quorum))}).Inc()

// Update metrics on success
a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusSucceed, strconv.Itoa(int(quorum)))
a.Metrics.OperatorsUpdatedSet(strconv.Itoa(int(quorum)), len(operators))

return
}
updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError), "quorum": strconv.Itoa(int(quorum))}).Inc()

// Update metrics on failure
a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusError, strconv.Itoa(int(quorum)))
a.logger.Error("Giving up after retrying", "retryNTimes", retryNTimes)
}

Expand Down
68 changes: 47 additions & 21 deletions avssync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,54 @@ const (
UpdateStakeStatusSucceed UpdateStakeStatus = "succeed"
)

var (
updateStakeAttempt = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "update_stake_attempt",
Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).",
}, []string{"status", "quorum"})
txRevertedTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "tx_reverted_total",
Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)",
})
operatorsUpdated = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "operators_updated",
Help: "The total number of operators updated (during the last quorum sync)",
}, []string{"quorum"})
)
type Metrics struct {
updateStakeAttempts *prometheus.CounterVec
txRevertedTotal prometheus.Counter
operatorsUpdated *prometheus.GaugeVec

registry *prometheus.Registry
}

func NewMetrics(reg *prometheus.Registry) *Metrics {
metrics := &Metrics{
updateStakeAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "update_stake_attempt",
Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).",
}, []string{"status", "quorum"}),

txRevertedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "tx_reverted_total",
Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)",
}),

operatorsUpdated: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "operators_updated",
Help: "The total number of operators updated (during the last quorum sync)",
}, []string{"quorum"}),

registry: reg,
}

return metrics
}

func (g *Metrics) UpdateStakeAttemptInc(status UpdateStakeStatus, quorum string) {
g.updateStakeAttempts.WithLabelValues(string(status), quorum).Inc()
}

func (g *Metrics) TxRevertedTotalInc() {
g.txRevertedTotal.Inc()
}

func (g *Metrics) OperatorsUpdatedSet(quorum string, operators int) {
g.operatorsUpdated.WithLabelValues(quorum).Set(float64(operators))
}

func StartMetricsServer(metricsAddr string) {
registry := prometheus.NewRegistry()
registry.MustRegister(updateStakeAttempt, txRevertedTotal, operatorsUpdated)
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
func (g *Metrics) Start(metricsAddr string) {
http.Handle("/metrics", promhttp.HandlerFor(g.registry, promhttp.HandlerOpts{}))
// not sure if we need to handle this error, since if metric server errors, then we will get alerts from grafana
go func() {
_ = http.ListenAndServe(metricsAddr, nil)
Expand Down
2 changes: 2 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -274,6 +275,7 @@ func NewAvsSyncComponents(t *testing.T, anvilHttpEndpoint string, contractAddres
time.Second,
time.Second,
"", // no metrics server (otherwise parallel tests all try to start server at same endpoint and error out)
prometheus.NewRegistry(),
)
return &AvsSyncComponents{
avsSync: avsSync,
Expand Down
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/Layr-Labs/eigensdk-go/signerv2"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -208,6 +209,10 @@ func avsSyncMain(cliCtx *cli.Context) error {
sleepBeforeFirstSyncDuration = firstSyncTime.Sub(now)
}
logger.Infof("Sleeping for %v before first sync, so that it happens at %v", sleepBeforeFirstSyncDuration, time.Now().Add(sleepBeforeFirstSyncDuration))

// Create new prometheus registry
reg := prometheus.NewRegistry()

avsSync := avssync.NewAvsSync(
logger,
avsReader,
Expand All @@ -221,6 +226,7 @@ func avsSyncMain(cliCtx *cli.Context) error {
readerTimeout,
writerTimeout,
cliCtx.String(MetricsAddrFlag.Name),
reg,
)

avsSync.Start(context.Background())
Expand Down

0 comments on commit 7388343

Please sign in to comment.