Skip to content

Commit

Permalink
Merge pull request #24 from Layr-Labs/metric-updates
Browse files Browse the repository at this point in the history
Metric updates
  • Loading branch information
samlaf authored Mar 30, 2024
2 parents 3cc4743 + 5162684 commit 58d372a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 37 deletions.
38 changes: 17 additions & 21 deletions avssync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"
"sort"
"strconv"
"time"
Expand Down Expand Up @@ -87,10 +86,7 @@ func (a *AvsSync) Start() {
// we first sleep some amount of time before the first sync, which allows the syncs to happen at some preferred time
// for eg midnight every night, without needing to schedule the start of avssync outside of this program
time.Sleep(a.sleepBeforeFirstSyncDuration)
err := a.updateStakes()
if err != nil {
a.logger.Error("Error updating stakes", err)
}
a.updateStakes()

if a.syncInterval == 0 {
a.logger.Infof("Sync interval is 0, running updateStakes once and exiting")
Expand All @@ -102,15 +98,12 @@ func (a *AvsSync) Start() {
defer ticker.Stop()

for range ticker.C {
err := a.updateStakes()
if err != nil {
a.logger.Error("Error updating stakes", err)
}
a.updateStakes()
a.logger.Infof("Sleeping for %s", a.syncInterval)
}
}

func (a *AvsSync) updateStakes() error {
func (a *AvsSync) updateStakes() {
if len(a.operators) == 0 {
a.logger.Info("Updating stakes of entire operator set")
a.maybeUpdateQuorumSet()
Expand All @@ -122,22 +115,23 @@ func (a *AvsSync) updateStakes() error {
a.tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum, a.retrySyncNTimes)
}
a.logger.Info("Completed stake update. Check logs to make sure every quorum update succeeded successfully.")
return nil
} else {
a.logger.Infof("Updating stakes of operators: %v", a.operators)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.writerTimeoutDuration)
defer cancel()
// this one we update all quorums at once, since we're only updating a subset of operators (which should be a small number)
receipt, err := a.avsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators)
if err != nil {
erroredTxs.Inc()
return err
updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError)}).Inc()
a.logger.Error("Error updating stakes of operator subset for all quorums", err)
return
} else if receipt.Status == gethtypes.ReceiptStatusFailed {
revertedTxs.Inc()
return errors.New("Update stakes of operator subset for all quorums reverted")
txRevertedTotal.Inc()
a.logger.Error("Update stakes of operator subset for all quorums reverted")
return
}
a.logger.Info("Completed stake update successfully")
return nil
return
}
}

Expand All @@ -151,6 +145,7 @@ func (a *AvsSync) maybeUpdateQuorumSet() {
quorumCount, err := a.avsReader.GetQuorumCount(&bind.CallOpts{Context: timeoutCtx})
if err != nil {
a.logger.Error("Error fetching quorum set dynamically", err)
return
}
// quorums are numbered from 0 to quorumCount-1,
// so we just create a list of bytes from 0 to quorumCount-1
Expand All @@ -171,7 +166,7 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
// in between us fetching it and trying to update it (the contract makes sure the entire operator set is updated and reverts if not)
operatorAddrsPerQuorum, err := a.avsReader.GetOperatorAddrsInQuorumsAtCurrentBlock(&bind.CallOpts{Context: timeoutCtx}, types.QuorumNums{types.QuorumNum(quorum)})
if err != nil {
a.logger.Error("Error fetching operator addresses in quorums", "err", err, "quorum", quorum, "retryNTimes", retryNTimes, "try", i+1)
a.logger.Warn("Error fetching operator addresses in quorums", "err", err, "quorum", quorum, "retryNTimes", retryNTimes, "try", i+1)
continue
}
var operators []common.Address
Expand All @@ -185,17 +180,18 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
defer cancel()
receipt, err := a.avsWriter.UpdateStakesOfEntireOperatorSetForQuorums(timeoutCtx, [][]common.Address{operators}, types.QuorumNums{types.QuorumNum(quorum)})
if err != nil {
erroredTxs.Inc()
a.logger.Error("Error updating stakes of entire operator set for quorum", "err", err, "quorum", int(quorum))
a.logger.Warn("Error updating stakes of entire operator set for quorum", "err", err, "quorum", int(quorum), "retryNTimes", retryNTimes, "try", i+1)
continue
}
if receipt.Status == gethtypes.ReceiptStatusFailed {
revertedTxs.Inc()
a.logger.Infof("Successfully updated stakes of operators in quorum %d", int(quorum))
txRevertedTotal.Inc()
a.logger.Error("Update stakes of entire operator set for quorum reverted", "quorum", int(quorum))
continue
}
updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusSucceed)}).Inc()
return
}
updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError)}).Inc()
a.logger.Error("Giving up after retrying", "retryNTimes", retryNTimes)
}

Expand Down
10 changes: 5 additions & 5 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ var (
EnvVar: envVarPrefix + "SYNC_INTERVAL",
}
/* Optional Flags */
PrometheusServerAddrFlag = cli.StringFlag{
Name: "prometheus-server-addr",
Usage: "Prometheus server address",
MetricsAddrFlag = cli.StringFlag{
Name: "metrics-addr",
Usage: "Prometheus server address (ip:port)",
Value: ":9090",
EnvVar: envVarPrefix + "PROMETHEUS_SERVER_ADDR",
}
FirstSyncTimeFlag = cli.StringFlag{
Name: "first-sync-time",
Required: false,
Usage: "Set the HH:MI:SS time at which to run the first sync update",
Usage: "Set the HH:MI:SS time at which to run the first sync update (in UTC)",
EnvVar: envVarPrefix + "FIRST_SYNC_TIME",
}
OperatorListFlag = cli.StringSliceFlag{
Expand Down Expand Up @@ -118,7 +118,7 @@ var RequiredFlags = []cli.Flag{
}

var OptionalFlags = []cli.Flag{
PrometheusServerAddrFlag,
MetricsAddrFlag,
FirstSyncTimeFlag,
OperatorListFlag,
QuorumListFlag,
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func avsSyncMain(cliCtx *cli.Context) error {
cliCtx.Int(retrySyncNTimes.Name),
readerTimeout,
writerTimeout,
cliCtx.String(PrometheusServerAddrFlag.Name),
cliCtx.String(MetricsAddrFlag.Name),
)

avsSync.Start()
Expand Down
32 changes: 22 additions & 10 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,36 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const metricsNamespace = "avssync"

type UpdateStakeStatus string

const (
UpdateStakeStatusError UpdateStakeStatus = "error"
UpdateStakeStatusSucceed UpdateStakeStatus = "succeed"
)

var (
erroredTxs = promauto.NewCounter(prometheus.CounterOpts{
Name: "errored_txs_total",
Help: "The total number of transactions that errored (failed to get processed by chain)",
})
revertedTxs = promauto.NewCounter(prometheus.CounterOpts{
Name: "reverted_txs_total",
Help: "The total number of transactions that reverted (processed by chain but reverted)",
updateStakeAttempt = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "update_stake_attempt",
Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).",
}, []string{"status"})
txRevertedTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "tx_reverted_total",
Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)",
})
operatorsUpdated = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "operators_updated",
Help: "The total number of operators updated (during the last quorum sync)",
Namespace: metricsNamespace,
Name: "operators_updated",
Help: "The total number of operators updated (during the last quorum sync)",
}, []string{"quorum"})
)

func StartMetricsServer(metricsAddr string) {
registry := prometheus.NewRegistry()
registry.MustRegister(erroredTxs, revertedTxs, operatorsUpdated)
registry.MustRegister(updateStakeAttempt, txRevertedTotal, operatorsUpdated)
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
// not sure if we need to handle this error, since if metric server errors, then we will get alerts from grafana
go http.ListenAndServe(metricsAddr, nil)
Expand Down

0 comments on commit 58d372a

Please sign in to comment.