From 31468cba11ddb3f747c561037ea4c42e774f59c9 Mon Sep 17 00:00:00 2001 From: Lazar <12626340+Lazar955@users.noreply.github.com> Date: Tue, 14 Jan 2025 16:11:14 +0100 Subject: [PATCH] fix: delegation iter (#171) Debug logs for testnet --- CHANGELOG.md | 4 ++ .../stakingeventwatcher.go | 13 +++--- .../tracked_delegations.go | 45 +++++++++++-------- 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c2f750..b118d2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## Unreleased +### Bug Fixes + +* [#171](https://github.com/babylonlabs-io/vigilante/pull/171) fix: delegation iter + ## v0.19.3 ### Bug Fixes diff --git a/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go b/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go index 87b6375..6117fe4 100644 --- a/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go +++ b/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go @@ -29,7 +29,7 @@ import ( var ( fixedDelyTypeWithJitter = retry.DelayType(retry.CombineDelay(retry.FixedDelay, retry.RandomDelay)) retryForever = retry.Attempts(0) - maxConcurrentActivations = int64(1000) + maxConcurrentActivations = int64(1500) ) func (sew *StakingEventWatcher) quitContext() (context.Context, func()) { @@ -610,7 +610,9 @@ func (sew *StakingEventWatcher) checkBtcForStakingTx() { } for del := range sew.pendingTracker.DelegationsIter(1000) { - if inProgDel := sew.inProgressTracker.GetDelegation(del.StakingTx.TxHash()); inProgDel != nil && inProgDel.ActivationInProgress { + if inProgDel := sew.inProgressTracker.GetDelegation(del.StakingTx.TxHash()); inProgDel != nil { + sew.logger.Debugf("skipping tx %s, already in progress", inProgDel.StakingTx.TxHash().String()) + continue } @@ -649,7 +651,7 @@ func (sew *StakingEventWatcher) checkBtcForStakingTx() { del.DelegationStartHeight, false, ); err != nil { - sew.logger.Warnf("add del: %s", err) + sew.logger.Debugf("add del: %s", err) continue } @@ -678,7 +680,7 @@ func (sew *StakingEventWatcher) activateBtcDelegation( defer sew.inProgressTracker.RemoveDelegation(stakingTxHash) if err := sew.waitForRequiredDepth(ctx, stakingTxHash, &inclusionBlockHash, requiredDepth); err != nil { - sew.logger.Warnf("exceeded waiting for required depth, will try later: err %v", err) + sew.logger.Warnf("exceeded waiting for required depth for tx: %s, will try later: err %v", stakingTxHash.String(), err) return } @@ -704,7 +706,6 @@ func (sew *StakingEventWatcher) activateBtcDelegation( } sew.metrics.ReportedActivateDelegationsCounter.Inc() - sew.pendingTracker.RemoveDelegation(stakingTxHash) sew.metrics.NumberOfVerifiedDelegations.Dec() @@ -754,7 +755,7 @@ func (sew *StakingEventWatcher) waitForRequiredDepth( return nil }, retry.Context(ctx), - retry.Attempts(10), + retry.Attempts(20), fixedDelyTypeWithJitter, retry.MaxDelay(sew.cfg.RetrySubmitUnbondingTxInterval), retry.MaxJitter(sew.cfg.RetryJitter), diff --git a/btcstaking-tracker/stakingeventwatcher/tracked_delegations.go b/btcstaking-tracker/stakingeventwatcher/tracked_delegations.go index caa8ba1..894f2eb 100644 --- a/btcstaking-tracker/stakingeventwatcher/tracked_delegations.go +++ b/btcstaking-tracker/stakingeventwatcher/tracked_delegations.go @@ -88,42 +88,51 @@ func (td *TrackedDelegation) Clone() *TrackedDelegation { } } -// DelegationsIter returns an iterator that yields copies of delegations in chunks +// DelegationsIter returns an iterator that yields copies of delegations in chunks. +// If chunkSize <= 0, it defaults to 100. func (td *TrackedDelegations) DelegationsIter(chunkSize int) iter.Seq[*TrackedDelegation] { if chunkSize <= 0 { - chunkSize = 100 // Default chunk size + chunkSize = 100 } return func(yield func(*TrackedDelegation) bool) { - offset := 0 + // Pre-allocate the processed map with expected capacity + td.mu.RLock() + processedCap := len(td.mapping) + td.mu.RUnlock() + processed := make(map[chainhash.Hash]struct{}, processedCap) + + // Create a buffer for batch processing + buffer := make([]*TrackedDelegation, 0, chunkSize) + for { + buffer = buffer[:0] // Reset buffer without reallocating + td.mu.RLock() - chunk := make([]*TrackedDelegation, 0, chunkSize) - i := 0 - for _, v := range td.mapping { - if i >= offset && len(chunk) < chunkSize { - chunk = append(chunk, v.Clone()) - } - i++ - if len(chunk) >= chunkSize { - break + // Collect a batch of unprocessed items directly + for k, v := range td.mapping { + if _, ok := processed[k]; !ok { + buffer = append(buffer, v.Clone()) + processed[k] = struct{}{} + if len(buffer) >= chunkSize { + break + } } } - remainingItems := len(td.mapping) - offset + mapSize := len(td.mapping) td.mu.RUnlock() - // Process the chunk - for _, delegation := range chunk { + // Process the batch + for _, delegation := range buffer { if !yield(delegation) { return } } - // Check if we've processed all items - if remainingItems <= chunkSize { + // Check if we've processed everything + if len(processed) >= mapSize { break } - offset += chunkSize } } }