Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delegation iter #171

Merged
merged 7 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

## Unreleased

### Bug Fixes

* [#171](https://github.com/babylonlabs-io/vigilante/pull/171) fix: delegation iter

## v0.19.3

### Bug Fixes
Expand Down
13 changes: 7 additions & 6 deletions btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
var (
fixedDelyTypeWithJitter = retry.DelayType(retry.CombineDelay(retry.FixedDelay, retry.RandomDelay))
retryForever = retry.Attempts(0)
maxConcurrentActivations = int64(1000)
maxConcurrentActivations = int64(1500)
)

func (sew *StakingEventWatcher) quitContext() (context.Context, func()) {
Expand Down Expand Up @@ -610,7 +610,9 @@ func (sew *StakingEventWatcher) checkBtcForStakingTx() {
}

for del := range sew.pendingTracker.DelegationsIter(1000) {
if inProgDel := sew.inProgressTracker.GetDelegation(del.StakingTx.TxHash()); inProgDel != nil && inProgDel.ActivationInProgress {
if inProgDel := sew.inProgressTracker.GetDelegation(del.StakingTx.TxHash()); inProgDel != nil {
sew.logger.Debugf("skipping tx %s, already in progress", inProgDel.StakingTx.TxHash().String())

continue
}

Expand Down Expand Up @@ -649,7 +651,7 @@ func (sew *StakingEventWatcher) checkBtcForStakingTx() {
del.DelegationStartHeight,
false,
); err != nil {
sew.logger.Warnf("add del: %s", err)
sew.logger.Debugf("add del: %s", err)

continue
}
Expand Down Expand Up @@ -678,7 +680,7 @@ func (sew *StakingEventWatcher) activateBtcDelegation(
defer sew.inProgressTracker.RemoveDelegation(stakingTxHash)

if err := sew.waitForRequiredDepth(ctx, stakingTxHash, &inclusionBlockHash, requiredDepth); err != nil {
sew.logger.Warnf("exceeded waiting for required depth, will try later: err %v", err)
sew.logger.Warnf("exceeded waiting for required depth for tx: %s, will try later: err %v", stakingTxHash.String(), err)

return
}
Expand All @@ -704,7 +706,6 @@ func (sew *StakingEventWatcher) activateBtcDelegation(
}

sew.metrics.ReportedActivateDelegationsCounter.Inc()

sew.pendingTracker.RemoveDelegation(stakingTxHash)
sew.metrics.NumberOfVerifiedDelegations.Dec()

Expand Down Expand Up @@ -754,7 +755,7 @@ func (sew *StakingEventWatcher) waitForRequiredDepth(
return nil
},
retry.Context(ctx),
retry.Attempts(10),
retry.Attempts(20),
fixedDelyTypeWithJitter,
retry.MaxDelay(sew.cfg.RetrySubmitUnbondingTxInterval),
retry.MaxJitter(sew.cfg.RetryJitter),
Expand Down
45 changes: 27 additions & 18 deletions btcstaking-tracker/stakingeventwatcher/tracked_delegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,42 +88,51 @@ func (td *TrackedDelegation) Clone() *TrackedDelegation {
}
}

// DelegationsIter returns an iterator that yields copies of delegations in chunks
// DelegationsIter returns an iterator that yields copies of delegations in chunks.
// If chunkSize <= 0, it defaults to 100.
func (td *TrackedDelegations) DelegationsIter(chunkSize int) iter.Seq[*TrackedDelegation] {
if chunkSize <= 0 {
chunkSize = 100 // Default chunk size
chunkSize = 100
}

return func(yield func(*TrackedDelegation) bool) {
offset := 0
// Pre-allocate the processed map with expected capacity
td.mu.RLock()
processedCap := len(td.mapping)
td.mu.RUnlock()
processed := make(map[chainhash.Hash]struct{}, processedCap)

// Create a buffer for batch processing
buffer := make([]*TrackedDelegation, 0, chunkSize)

for {
buffer = buffer[:0] // Reset buffer without reallocating

td.mu.RLock()
chunk := make([]*TrackedDelegation, 0, chunkSize)
i := 0
for _, v := range td.mapping {
if i >= offset && len(chunk) < chunkSize {
chunk = append(chunk, v.Clone())
}
i++
if len(chunk) >= chunkSize {
break
// Collect a batch of unprocessed items directly
for k, v := range td.mapping {
if _, ok := processed[k]; !ok {
buffer = append(buffer, v.Clone())
processed[k] = struct{}{}
if len(buffer) >= chunkSize {
break
}
}
}
remainingItems := len(td.mapping) - offset
mapSize := len(td.mapping)
td.mu.RUnlock()

// Process the chunk
for _, delegation := range chunk {
// Process the batch
for _, delegation := range buffer {
if !yield(delegation) {
return
}
}

// Check if we've processed all items
if remainingItems <= chunkSize {
// Check if we've processed everything
if len(processed) >= mapSize {
break
}
offset += chunkSize
}
}
}
Expand Down
Loading