diff --git a/CHANGELOG.md b/CHANGELOG.md index d98cdce5..05eba75e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) * [#136](https://github.com/babylonlabs-io/vigilante/pull/136) rate limit activations * [#141](https://github.com/babylonlabs-io/vigilante/pull/141) decrement tracked delegations in atomic slasher * [#143](https://github.com/babylonlabs-io/vigilante/pull/143) adds nlreturn linter rule +* [#145](https://github.com/babylonlabs-io/vigilante/pull/145) fix: tracked delegation mutex ## v0.18.0 diff --git a/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go b/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go index c406228e..87b63757 100644 --- a/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go +++ b/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go @@ -609,7 +609,7 @@ func (sew *StakingEventWatcher) checkBtcForStakingTx() { return } - for del := range sew.pendingTracker.DelegationsIter() { + for del := range sew.pendingTracker.DelegationsIter(1000) { if inProgDel := sew.inProgressTracker.GetDelegation(del.StakingTx.TxHash()); inProgDel != nil && inProgDel.ActivationInProgress { continue } @@ -677,7 +677,11 @@ func (sew *StakingEventWatcher) activateBtcDelegation( defer sew.latency("activateBtcDelegation")() defer sew.inProgressTracker.RemoveDelegation(stakingTxHash) - sew.waitForRequiredDepth(ctx, stakingTxHash, &inclusionBlockHash, requiredDepth) + if err := sew.waitForRequiredDepth(ctx, stakingTxHash, &inclusionBlockHash, requiredDepth); err != nil { + sew.logger.Warnf("exceeded waiting for required depth, will try later: err %v", err) + + return + } defer sew.latency("activateDelegationRPC")() @@ -702,8 +706,8 @@ func (sew *StakingEventWatcher) activateBtcDelegation( sew.metrics.ReportedActivateDelegationsCounter.Inc() sew.pendingTracker.RemoveDelegation(stakingTxHash) - sew.metrics.NumberOfVerifiedDelegations.Dec() + sew.logger.Debugf("staking tx activated %s", stakingTxHash.String()) return nil @@ -724,11 +728,12 @@ func (sew *StakingEventWatcher) waitForRequiredDepth( stakingTxHash chainhash.Hash, inclusionBlockHash *chainhash.Hash, requiredDepth uint32, -) { +) error { defer sew.latency("waitForRequiredDepth")() var depth uint32 - _ = retry.Do(func() error { + + return retry.Do(func() error { var err error depth, err = sew.babylonNodeAdapter.QueryHeaderDepth(inclusionBlockHash) if err != nil { @@ -749,7 +754,7 @@ func (sew *StakingEventWatcher) waitForRequiredDepth( return nil }, retry.Context(ctx), - retryForever, + retry.Attempts(10), fixedDelyTypeWithJitter, retry.MaxDelay(sew.cfg.RetrySubmitUnbondingTxInterval), retry.MaxJitter(sew.cfg.RetryJitter), diff --git a/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher_test.go b/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher_test.go new file mode 100644 index 00000000..ee174bb4 --- /dev/null +++ b/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher_test.go @@ -0,0 +1,99 @@ +package stakingeventwatcher + +import ( + "github.com/babylonlabs-io/babylon/testutil/datagen" + "github.com/babylonlabs-io/vigilante/btcclient" + "github.com/babylonlabs-io/vigilante/config" + "github.com/babylonlabs-io/vigilante/metrics" + "github.com/babylonlabs-io/vigilante/testutil/mocks" + "github.com/golang/mock/gomock" + "github.com/lightningnetwork/lnd/chainntnfs" + promtestutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "golang.org/x/sync/semaphore" + "math/rand" + "testing" + "time" +) + +func TestHandlingDelegations(t *testing.T) { + t.Parallel() + r := rand.New(rand.NewSource(time.Now().Unix())) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cfg := config.DefaultBTCStakingTrackerConfig() + cfg.CheckDelegationsInterval = 1 * time.Second + + mockBTCClient := mocks.NewMockBTCClient(ctrl) + mockBabylonNodeAdapter := NewMockBabylonNodeAdapter(ctrl) + + mockBabylonNodeAdapter.EXPECT().BtcClientTipHeight().Return(uint32(0), nil).AnyTimes() + bsMetrics := metrics.NewBTCStakingTrackerMetrics() + + sew := StakingEventWatcher{ + logger: zap.NewNop().Sugar(), + quit: make(chan struct{}), + cfg: &cfg, + babylonNodeAdapter: mockBabylonNodeAdapter, + btcClient: mockBTCClient, + unbondingTracker: NewTrackedDelegations(), + pendingTracker: NewTrackedDelegations(), + inProgressTracker: NewTrackedDelegations(), + unbondingDelegationChan: make(chan *newDelegation), + unbondingRemovalChan: make(chan *delegationInactive), + activationLimiter: semaphore.NewWeighted(10), + metrics: bsMetrics.UnbondingWatcherMetrics, + } + + defer close(sew.quit) + + expectedActivated := 1000 + delegations := make([]Delegation, 0, expectedActivated) + for i := 0; i < expectedActivated; i++ { + stk := datagen.GenRandomTx(r) + delegations = append(delegations, Delegation{ + StakingTx: stk, + StakingOutputIdx: 0, + DelegationStartHeight: 0, + UnbondingOutput: nil, + HasProof: false, + }) + } + + mockBabylonNodeAdapter.EXPECT(). + DelegationsByStatus(gomock.Any(), gomock.Any(), gomock.Any()). + Return(delegations, nil).AnyTimes() + + mockBabylonNodeAdapter.EXPECT(). + ActivateDelegation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil).AnyTimes() + + mockBabylonNodeAdapter.EXPECT().QueryHeaderDepth(gomock.Any()).Return(uint32(2), nil).AnyTimes() + mockBabylonNodeAdapter.EXPECT().IsDelegationVerified(gomock.Any()).Return(true, nil).AnyTimes() + + params := BabylonParams{ConfirmationTimeBlocks: 1} + mockBabylonNodeAdapter.EXPECT().Params().Return(¶ms, nil).AnyTimes() + + block, _ := datagen.GenRandomBtcdBlock(r, 10, nil) + bh := block.BlockHash() + details := &chainntnfs.TxConfirmation{ + BlockHash: &bh, + BlockHeight: 100, + TxIndex: 1, + Tx: nil, + Block: block, + } + mockBTCClient.EXPECT().TxDetails(gomock.Any(), gomock.Any()).Return(details, btcclient.TxInChain, nil).AnyTimes() + + sew.wg.Add(2) + go func() { + go sew.fetchDelegations() + go sew.handlerVerifiedDelegations() + }() + + require.Eventually(t, func() bool { + return promtestutil.ToFloat64(sew.metrics.ReportedActivateDelegationsCounter) >= float64(expectedActivated) + }, 60*time.Second, 100*time.Millisecond) +} diff --git a/btcstaking-tracker/stakingeventwatcher/tracked_delegations.go b/btcstaking-tracker/stakingeventwatcher/tracked_delegations.go index de6692e8..caa8ba10 100644 --- a/btcstaking-tracker/stakingeventwatcher/tracked_delegations.go +++ b/btcstaking-tracker/stakingeventwatcher/tracked_delegations.go @@ -35,7 +35,6 @@ func (td *TrackedDelegations) GetDelegation(stakingTxHash chainhash.Hash) *Track defer td.mu.RUnlock() del, ok := td.mapping[stakingTxHash] - if !ok { return nil } @@ -59,16 +58,72 @@ func (td *TrackedDelegations) GetDelegations() []*TrackedDelegation { return delegations } -func (td *TrackedDelegations) DelegationsIter() iter.Seq[*TrackedDelegation] { +// Clone creates a deep copy of the TrackedDelegation +func (td *TrackedDelegation) Clone() *TrackedDelegation { + if td == nil { + return nil + } + + // Deep copy the StakingTx + var stakingTx *wire.MsgTx + if td.StakingTx != nil { + stakingTx = td.StakingTx.Copy() + } + + // Deep copy the UnbondingOutput + var unbondingOutput *wire.TxOut + if td.UnbondingOutput != nil { + unbondingOutput = &wire.TxOut{ + Value: td.UnbondingOutput.Value, + PkScript: append([]byte(nil), td.UnbondingOutput.PkScript...), + } + } + + return &TrackedDelegation{ + StakingTx: stakingTx, + StakingOutputIdx: td.StakingOutputIdx, + UnbondingOutput: unbondingOutput, + DelegationStartHeight: td.DelegationStartHeight, + ActivationInProgress: td.ActivationInProgress, + } +} + +// DelegationsIter returns an iterator that yields copies of delegations in chunks +func (td *TrackedDelegations) DelegationsIter(chunkSize int) iter.Seq[*TrackedDelegation] { + if chunkSize <= 0 { + chunkSize = 100 // Default chunk size + } + return func(yield func(*TrackedDelegation) bool) { - td.mu.RLock() - defer td.mu.RUnlock() + offset := 0 + for { + td.mu.RLock() + chunk := make([]*TrackedDelegation, 0, chunkSize) + i := 0 + for _, v := range td.mapping { + if i >= offset && len(chunk) < chunkSize { + chunk = append(chunk, v.Clone()) + } + i++ + if len(chunk) >= chunkSize { + break + } + } + remainingItems := len(td.mapping) - offset + td.mu.RUnlock() + + // Process the chunk + for _, delegation := range chunk { + if !yield(delegation) { + return + } + } - // we lock for the entirety of the iteration - for _, v := range td.mapping { - if !yield(v) { - return + // Check if we've processed all items + if remainingItems <= chunkSize { + break } + offset += chunkSize } } } @@ -119,8 +174,8 @@ func (td *TrackedDelegations) HasDelegationChanged( stakingTxHash chainhash.Hash, newDelegation *newDelegation, ) (bool, bool) { - td.mu.Lock() - defer td.mu.Unlock() + td.mu.RLock() + defer td.mu.RUnlock() // Check if the delegation exists in the map existingDelegation, exists := td.mapping[stakingTxHash] @@ -152,3 +207,10 @@ func (td *TrackedDelegations) UpdateActivation(tx chainhash.Hash, inProgress boo return nil } + +func (td *TrackedDelegations) Count() int { + td.mu.RLock() + defer td.mu.RUnlock() + + return len(td.mapping) +}