Skip to content

Commit

Permalink
fix: delegation iter (#171)
Browse files Browse the repository at this point in the history
Debug logs for testnet
  • Loading branch information
Lazar955 authored Jan 14, 2025
1 parent 1459a84 commit 31468cb
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

## Unreleased

### Bug Fixes

* [#171](https://github.com/babylonlabs-io/vigilante/pull/171) fix: delegation iter

## v0.19.3

### Bug Fixes
Expand Down
13 changes: 7 additions & 6 deletions btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
var (
fixedDelyTypeWithJitter = retry.DelayType(retry.CombineDelay(retry.FixedDelay, retry.RandomDelay))
retryForever = retry.Attempts(0)
maxConcurrentActivations = int64(1000)
maxConcurrentActivations = int64(1500)
)

func (sew *StakingEventWatcher) quitContext() (context.Context, func()) {
Expand Down Expand Up @@ -610,7 +610,9 @@ func (sew *StakingEventWatcher) checkBtcForStakingTx() {
}

for del := range sew.pendingTracker.DelegationsIter(1000) {
if inProgDel := sew.inProgressTracker.GetDelegation(del.StakingTx.TxHash()); inProgDel != nil && inProgDel.ActivationInProgress {
if inProgDel := sew.inProgressTracker.GetDelegation(del.StakingTx.TxHash()); inProgDel != nil {
sew.logger.Debugf("skipping tx %s, already in progress", inProgDel.StakingTx.TxHash().String())

continue
}

Expand Down Expand Up @@ -649,7 +651,7 @@ func (sew *StakingEventWatcher) checkBtcForStakingTx() {
del.DelegationStartHeight,
false,
); err != nil {
sew.logger.Warnf("add del: %s", err)
sew.logger.Debugf("add del: %s", err)

continue
}
Expand Down Expand Up @@ -678,7 +680,7 @@ func (sew *StakingEventWatcher) activateBtcDelegation(
defer sew.inProgressTracker.RemoveDelegation(stakingTxHash)

if err := sew.waitForRequiredDepth(ctx, stakingTxHash, &inclusionBlockHash, requiredDepth); err != nil {
sew.logger.Warnf("exceeded waiting for required depth, will try later: err %v", err)
sew.logger.Warnf("exceeded waiting for required depth for tx: %s, will try later: err %v", stakingTxHash.String(), err)

return
}
Expand All @@ -704,7 +706,6 @@ func (sew *StakingEventWatcher) activateBtcDelegation(
}

sew.metrics.ReportedActivateDelegationsCounter.Inc()

sew.pendingTracker.RemoveDelegation(stakingTxHash)
sew.metrics.NumberOfVerifiedDelegations.Dec()

Expand Down Expand Up @@ -754,7 +755,7 @@ func (sew *StakingEventWatcher) waitForRequiredDepth(
return nil
},
retry.Context(ctx),
retry.Attempts(10),
retry.Attempts(20),
fixedDelyTypeWithJitter,
retry.MaxDelay(sew.cfg.RetrySubmitUnbondingTxInterval),
retry.MaxJitter(sew.cfg.RetryJitter),
Expand Down
45 changes: 27 additions & 18 deletions btcstaking-tracker/stakingeventwatcher/tracked_delegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,42 +88,51 @@ func (td *TrackedDelegation) Clone() *TrackedDelegation {
}
}

// DelegationsIter returns an iterator that yields copies of delegations in chunks
// DelegationsIter returns an iterator that yields copies of delegations in chunks.
// If chunkSize <= 0, it defaults to 100.
func (td *TrackedDelegations) DelegationsIter(chunkSize int) iter.Seq[*TrackedDelegation] {
if chunkSize <= 0 {
chunkSize = 100 // Default chunk size
chunkSize = 100
}

return func(yield func(*TrackedDelegation) bool) {
offset := 0
// Pre-allocate the processed map with expected capacity
td.mu.RLock()
processedCap := len(td.mapping)
td.mu.RUnlock()
processed := make(map[chainhash.Hash]struct{}, processedCap)

// Create a buffer for batch processing
buffer := make([]*TrackedDelegation, 0, chunkSize)

for {
buffer = buffer[:0] // Reset buffer without reallocating

td.mu.RLock()
chunk := make([]*TrackedDelegation, 0, chunkSize)
i := 0
for _, v := range td.mapping {
if i >= offset && len(chunk) < chunkSize {
chunk = append(chunk, v.Clone())
}
i++
if len(chunk) >= chunkSize {
break
// Collect a batch of unprocessed items directly
for k, v := range td.mapping {
if _, ok := processed[k]; !ok {
buffer = append(buffer, v.Clone())
processed[k] = struct{}{}
if len(buffer) >= chunkSize {
break
}
}
}
remainingItems := len(td.mapping) - offset
mapSize := len(td.mapping)
td.mu.RUnlock()

// Process the chunk
for _, delegation := range chunk {
// Process the batch
for _, delegation := range buffer {
if !yield(delegation) {
return
}
}

// Check if we've processed all items
if remainingItems <= chunkSize {
// Check if we've processed everything
if len(processed) >= mapSize {
break
}
offset += chunkSize
}
}
}
Expand Down

0 comments on commit 31468cb

Please sign in to comment.