Skip to content

Commit

Permalink
add writer timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Feb 8, 2024
1 parent 3f16c13 commit 83221e4
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 15 deletions.
21 changes: 12 additions & 9 deletions avssync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type AvsSync struct {
fetchQuorumsDynamically bool
retrySyncNTimes int

rpcTimeoutDuration time.Duration
readerTimeoutDuration time.Duration
writerTimeoutDuration time.Duration
}

// NewAvsSync creates a new AvsSync object
Expand All @@ -36,7 +37,8 @@ func NewAvsSync(
avsReader avsregistry.AvsRegistryReader, avsWriter avsregistry.AvsRegistryWriter,
sleepBeforeFirstSyncDuration time.Duration, syncInterval time.Duration, operators []common.Address,
quorums []byte, fetchQuorumsDynamically bool, retrySyncNTimes int,
rpcTimeoutDuration time.Duration,
readerTimeoutDuration time.Duration,
writerTimeoutDuration time.Duration,
) *AvsSync {
return &AvsSync{
logger: logger,
Expand All @@ -48,13 +50,14 @@ func NewAvsSync(
quorums: quorums,
fetchQuorumsDynamically: fetchQuorumsDynamically,
retrySyncNTimes: retrySyncNTimes,
rpcTimeoutDuration: rpcTimeoutDuration,
readerTimeoutDuration: readerTimeoutDuration,
writerTimeoutDuration: writerTimeoutDuration,
}
}

func (a *AvsSync) Start() {
a.logger.Infof("Starting avs sync with sleepBeforeFirstSyncDuration=%s, syncInterval=%s, operators=%v, quorums=%v, fetchQuorumsDynamically=%v, rpcTimeoutDuration=%s",
a.sleepBeforeFirstSyncDuration, a.syncInterval, a.operators, a.quorums, a.fetchQuorumsDynamically, a.rpcTimeoutDuration)
a.logger.Infof("Starting avs sync with sleepBeforeFirstSyncDuration=%s, syncInterval=%s, operators=%v, quorums=%v, fetchQuorumsDynamically=%v, readerTimeoutDuration=%s, writerTimeoutDuration=%s",
a.sleepBeforeFirstSyncDuration, a.syncInterval, a.operators, a.quorums, a.fetchQuorumsDynamically, a.readerTimeoutDuration, a.writerTimeoutDuration)

// run something every syncInterval
ticker := time.NewTicker(a.syncInterval)
Expand Down Expand Up @@ -94,7 +97,7 @@ func (a *AvsSync) updateStakes() error {
return nil
} else {
a.logger.Infof("Updating stakes of operators: %v", a.operators)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.rpcTimeoutDuration)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.readerTimeoutDuration)
defer cancel()
// this one we update all quorums at once, since we're only updating a subset of operators (which should be a small number)
_, err := a.avsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators)
Expand All @@ -110,7 +113,7 @@ func (a *AvsSync) maybeUpdateQuorumSet() {
return
}
a.logger.Info("Fetching quorum set dynamically")
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.rpcTimeoutDuration)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.readerTimeoutDuration)
defer cancel()
quorumCount, err := a.avsReader.GetQuorumCount(&bind.CallOpts{Context: timeoutCtx})
if err != nil {
Expand All @@ -129,7 +132,7 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
for i := 0; i < retryNTimes; i++ {
a.logger.Debug("tryNTimesUpdateStakesOfEntireOperatorSetForQuorum", "quorum", quorum, "retryNTimes", retryNTimes, "try", i+1)

timeoutCtx, cancel := context.WithTimeout(context.Background(), a.rpcTimeoutDuration)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.readerTimeoutDuration)
defer cancel()
operatorAddrsPerQuorum, err := a.avsReader.GetOperatorAddrsInQuorumsAtCurrentBlock(&bind.CallOpts{Context: timeoutCtx}, []byte{quorum})
if err != nil {
Expand All @@ -142,7 +145,7 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
return operators[i].Big().Cmp(operators[j].Big()) < 0
})
a.logger.Infof("Updating stakes of operators in quorum %d: %v", quorum, operators)
timeoutCtx, cancel = context.WithTimeout(context.Background(), a.rpcTimeoutDuration)
timeoutCtx, cancel = context.WithTimeout(context.Background(), a.writerTimeoutDuration)
defer cancel()
_, err = a.avsWriter.UpdateStakesOfEntireOperatorSetForQuorums(timeoutCtx, [][]common.Address{operators}, []byte{quorum})
if err != nil {
Expand Down
17 changes: 12 additions & 5 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ var (
Usage: "If set to true, will fetch the list of quorums registered in the contract and update all of them",
EnvVar: envVarPrefix + "FETCH_QUORUMS_DYNAMICALLY",
}
RpcTimeoutDurationFlag = cli.DurationFlag{
Name: "rpc-timeout-duration",
Usage: "Timeout duration for rpc calls in `SECONDS`",
ReaderTimeoutDurationFlag = cli.DurationFlag{
Name: "reader-timeout-duration",
Usage: "Timeout duration for rpc calls to read from chain in `SECONDS`",
Value: 5 * time.Second,
EnvVar: envVarPrefix + "RPC_TIMEOUT_DURATION",
EnvVar: envVarPrefix + "READER_TIMEOUT_DURATION",
}
WriterTimeoutDurationFlag = cli.DurationFlag{
Name: "transaction-timeout-duration",
Usage: "Timeout duration for transactions to be confirmed in `SECONDS`",
Value: 90 * time.Second,
EnvVar: envVarPrefix + "WRITER_TIMEOUT_DURATION",
}
retrySyncNTimes = cli.IntFlag{
Name: "retry-sync-n-times",
Expand All @@ -91,7 +97,8 @@ var OptionalFlags = []cli.Flag{
OperatorListFlag,
QuorumListFlag,
FetchQuorumDynamicallyFlag,
RpcTimeoutDurationFlag,
ReaderTimeoutDurationFlag,
WriterTimeoutDurationFlag,
retrySyncNTimes,
}

Expand Down
1 change: 1 addition & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func NewTestAvsSync(anvilHttpEndpoint string, contractAddresses ContractAddresse
false,
1, // 1 retry
5*time.Second,
5*time.Second,
)
return avsSync
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func avsSyncMain(cliCtx *cli.Context) error {
quorums,
cliCtx.Bool(FetchQuorumDynamicallyFlag.Name),
cliCtx.Int(retrySyncNTimes.Name),
cliCtx.Duration(RpcTimeoutDurationFlag.Name),
cliCtx.Duration(ReaderTimeoutDurationFlag.Name),
cliCtx.Duration(WriterTimeoutDurationFlag.Name),
)
avsSync.Start()

Expand Down

0 comments on commit 83221e4

Please sign in to comment.