diff --git a/avssync.go b/avssync.go index f61b4fe..f3ea4f2 100644 --- a/avssync.go +++ b/avssync.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "sort" "strconv" "time" @@ -87,10 +86,7 @@ func (a *AvsSync) Start() { // we first sleep some amount of time before the first sync, which allows the syncs to happen at some preferred time // for eg midnight every night, without needing to schedule the start of avssync outside of this program time.Sleep(a.sleepBeforeFirstSyncDuration) - err := a.updateStakes() - if err != nil { - a.logger.Error("Error updating stakes", err) - } + a.updateStakes() if a.syncInterval == 0 { a.logger.Infof("Sync interval is 0, running updateStakes once and exiting") @@ -102,15 +98,12 @@ func (a *AvsSync) Start() { defer ticker.Stop() for range ticker.C { - err := a.updateStakes() - if err != nil { - a.logger.Error("Error updating stakes", err) - } + a.updateStakes() a.logger.Infof("Sleeping for %s", a.syncInterval) } } -func (a *AvsSync) updateStakes() error { +func (a *AvsSync) updateStakes() { if len(a.operators) == 0 { a.logger.Info("Updating stakes of entire operator set") a.maybeUpdateQuorumSet() @@ -122,7 +115,6 @@ func (a *AvsSync) updateStakes() error { a.tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum, a.retrySyncNTimes) } a.logger.Info("Completed stake update. Check logs to make sure every quorum update succeeded successfully.") - return nil } else { a.logger.Infof("Updating stakes of operators: %v", a.operators) timeoutCtx, cancel := context.WithTimeout(context.Background(), a.writerTimeoutDuration) @@ -130,14 +122,16 @@ func (a *AvsSync) updateStakes() error { // this one we update all quorums at once, since we're only updating a subset of operators (which should be a small number) receipt, err := a.avsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators) if err != nil { - erroredTxs.Inc() - return err + updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError)}).Inc() + a.logger.Error("Error updating stakes of operator subset for all quorums", err) + return } else if receipt.Status == gethtypes.ReceiptStatusFailed { - revertedTxs.Inc() - return errors.New("Update stakes of operator subset for all quorums reverted") + txRevertedTotal.Inc() + a.logger.Error("Update stakes of operator subset for all quorums reverted") + return } a.logger.Info("Completed stake update successfully") - return nil + return } } @@ -151,6 +145,7 @@ func (a *AvsSync) maybeUpdateQuorumSet() { quorumCount, err := a.avsReader.GetQuorumCount(&bind.CallOpts{Context: timeoutCtx}) if err != nil { a.logger.Error("Error fetching quorum set dynamically", err) + return } // quorums are numbered from 0 to quorumCount-1, // so we just create a list of bytes from 0 to quorumCount-1 @@ -171,7 +166,7 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte, // in between us fetching it and trying to update it (the contract makes sure the entire operator set is updated and reverts if not) operatorAddrsPerQuorum, err := a.avsReader.GetOperatorAddrsInQuorumsAtCurrentBlock(&bind.CallOpts{Context: timeoutCtx}, types.QuorumNums{types.QuorumNum(quorum)}) if err != nil { - a.logger.Error("Error fetching operator addresses in quorums", "err", err, "quorum", quorum, "retryNTimes", retryNTimes, "try", i+1) + a.logger.Warn("Error fetching operator addresses in quorums", "err", err, "quorum", quorum, "retryNTimes", retryNTimes, "try", i+1) continue } var operators []common.Address @@ -185,17 +180,18 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte, defer cancel() receipt, err := a.avsWriter.UpdateStakesOfEntireOperatorSetForQuorums(timeoutCtx, [][]common.Address{operators}, types.QuorumNums{types.QuorumNum(quorum)}) if err != nil { - erroredTxs.Inc() - a.logger.Error("Error updating stakes of entire operator set for quorum", "err", err, "quorum", int(quorum)) + a.logger.Warn("Error updating stakes of entire operator set for quorum", "err", err, "quorum", int(quorum), "retryNTimes", retryNTimes, "try", i+1) continue } if receipt.Status == gethtypes.ReceiptStatusFailed { - revertedTxs.Inc() - a.logger.Infof("Successfully updated stakes of operators in quorum %d", int(quorum)) + txRevertedTotal.Inc() + a.logger.Error("Update stakes of entire operator set for quorum reverted", "quorum", int(quorum)) continue } + updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusSucceed)}).Inc() return } + updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError)}).Inc() a.logger.Error("Giving up after retrying", "retryNTimes", retryNTimes) } diff --git a/flags.go b/flags.go index 0939885..03afea9 100644 --- a/flags.go +++ b/flags.go @@ -36,16 +36,16 @@ var ( EnvVar: envVarPrefix + "SYNC_INTERVAL", } /* Optional Flags */ - PrometheusServerAddrFlag = cli.StringFlag{ - Name: "prometheus-server-addr", - Usage: "Prometheus server address", + MetricsAddrFlag = cli.StringFlag{ + Name: "metrics-addr", + Usage: "Prometheus server address (ip:port)", Value: ":9090", EnvVar: envVarPrefix + "PROMETHEUS_SERVER_ADDR", } FirstSyncTimeFlag = cli.StringFlag{ Name: "first-sync-time", Required: false, - Usage: "Set the HH:MI:SS time at which to run the first sync update", + Usage: "Set the HH:MI:SS time at which to run the first sync update (in UTC)", EnvVar: envVarPrefix + "FIRST_SYNC_TIME", } OperatorListFlag = cli.StringSliceFlag{ @@ -118,7 +118,7 @@ var RequiredFlags = []cli.Flag{ } var OptionalFlags = []cli.Flag{ - PrometheusServerAddrFlag, + MetricsAddrFlag, FirstSyncTimeFlag, OperatorListFlag, QuorumListFlag, diff --git a/main.go b/main.go index 2d94707..28beaf2 100644 --- a/main.go +++ b/main.go @@ -181,7 +181,7 @@ func avsSyncMain(cliCtx *cli.Context) error { cliCtx.Int(retrySyncNTimes.Name), readerTimeout, writerTimeout, - cliCtx.String(PrometheusServerAddrFlag.Name), + cliCtx.String(MetricsAddrFlag.Name), ) avsSync.Start() diff --git a/metrics.go b/metrics.go index 76d69d0..0fe5988 100644 --- a/metrics.go +++ b/metrics.go @@ -8,24 +8,36 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +const metricsNamespace = "avssync" + +type UpdateStakeStatus string + +const ( + UpdateStakeStatusError UpdateStakeStatus = "error" + UpdateStakeStatusSucceed UpdateStakeStatus = "succeed" +) + var ( - erroredTxs = promauto.NewCounter(prometheus.CounterOpts{ - Name: "errored_txs_total", - Help: "The total number of transactions that errored (failed to get processed by chain)", - }) - revertedTxs = promauto.NewCounter(prometheus.CounterOpts{ - Name: "reverted_txs_total", - Help: "The total number of transactions that reverted (processed by chain but reverted)", + updateStakeAttempt = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "update_stake_attempt", + Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).", + }, []string{"status"}) + txRevertedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "tx_reverted_total", + Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)", }) operatorsUpdated = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "operators_updated", - Help: "The total number of operators updated (during the last quorum sync)", + Namespace: metricsNamespace, + Name: "operators_updated", + Help: "The total number of operators updated (during the last quorum sync)", }, []string{"quorum"}) ) func StartMetricsServer(metricsAddr string) { registry := prometheus.NewRegistry() - registry.MustRegister(erroredTxs, revertedTxs, operatorsUpdated) + registry.MustRegister(updateStakeAttempt, txRevertedTotal, operatorsUpdated) http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) // not sure if we need to handle this error, since if metric server errors, then we will get alerts from grafana go http.ListenAndServe(metricsAddr, nil)