Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into refactor_v3.2.2
  • Loading branch information
nitronit committed Dec 18, 2023
2 parents 3ee2bd1 + 8e01923 commit e7db1ea
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 90 deletions.
104 changes: 45 additions & 59 deletions signer/cosigner_nonce_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/strangelove-ventures/horcrux/pkg/metrics"
Expand All @@ -24,7 +25,7 @@ type CosignerNonceCache struct {

leader Leader

lastReconcileNonces lastCount
lastReconcileNonces atomic.Uint64
lastReconcileTime time.Time

getNoncesInterval time.Duration
Expand All @@ -33,7 +34,7 @@ type CosignerNonceCache struct {

threshold uint8

cache NonceCache
cache *NonceCache

pruner NonceCachePruner

Expand Down Expand Up @@ -88,29 +89,6 @@ func (m *movingAverage) average() float64 {
return weightedSum / duration
}

type lastCount struct {
count int
mu sync.RWMutex
}

func (lc *lastCount) Set(n int) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.count = n
}

func (lc *lastCount) Inc() {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.count++
}

func (lc *lastCount) Get() int {
lc.mu.RLock()
defer lc.mu.RUnlock()
return lc.count
}

type NonceCachePruner interface {
PruneNonces() int
}
Expand All @@ -136,6 +114,30 @@ func (nc *NonceCache) Delete(index int) {
nc.cache = append(nc.cache[:index], nc.cache[index+1:]...)
}

func (nc *NonceCache) PruneNonces() int {
nc.mu.Lock()
defer nc.mu.Unlock()
nonExpiredIndex := -1
for i := 0; i < len(nc.cache); i++ {
if time.Now().Before(nc.cache[i].Expiration) {
nonExpiredIndex = i
break
}
}

var deleteCount int
if nonExpiredIndex == -1 {
// No non-expired nonces, delete everything
deleteCount = len(nc.cache)
nc.cache = nil
} else {
// Prune everything up to the non-expired nonce
deleteCount = nonExpiredIndex
nc.cache = nc.cache[nonExpiredIndex:]
}
return deleteCount
}

type CosignerNoncesRel struct {
Cosigner Cosigner
Nonces CosignerNonces
Expand Down Expand Up @@ -176,12 +178,14 @@ func NewCosignerNonceCache(
nonceExpiration: nonceExpiration,
threshold: threshold,
pruner: pruner,
empty: make(chan struct{}, 1),
movingAverage: newMovingAverage(4 * getNoncesInterval), // weighted average over 4 intervals
cache: new(NonceCache),
// buffer up to 1000 empty events so that we don't ever block
empty: make(chan struct{}, 1000),
movingAverage: newMovingAverage(4 * getNoncesInterval), // weighted average over 4 intervals
}
// the only time pruner is expected to be non-nil is during tests, otherwise we use the cache logic.
if pruner == nil {
cnc.pruner = cnc
cnc.pruner = cnc.cache
}

return cnc
Expand Down Expand Up @@ -213,9 +217,9 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {
remainingNonces := cnc.cache.Size()
timeSinceLastReconcile := time.Since(cnc.lastReconcileTime)

lastReconcileNonces := cnc.lastReconcileNonces.Get()
lastReconcileNonces := cnc.lastReconcileNonces.Load()
// calculate nonces per minute
noncesPerMin := float64(lastReconcileNonces-remainingNonces-pruned) / timeSinceLastReconcile.Minutes()
noncesPerMin := float64(int(lastReconcileNonces)-remainingNonces-pruned) / timeSinceLastReconcile.Minutes()
if noncesPerMin < 0 {
noncesPerMin = 0
}
Expand All @@ -232,7 +236,7 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {
additional := t - remainingNonces

defer func() {
cnc.lastReconcileNonces.Set(remainingNonces + additional)
cnc.lastReconcileNonces.Store(uint64(remainingNonces + additional))
cnc.lastReconcileTime = time.Now()
}()

Expand Down Expand Up @@ -327,19 +331,23 @@ func (cnc *CosignerNonceCache) LoadN(ctx context.Context, n int) {
}

func (cnc *CosignerNonceCache) Start(ctx context.Context) {
cnc.lastReconcileNonces.Set(cnc.cache.Size())
cnc.lastReconcileNonces.Store(uint64(cnc.cache.Size()))
cnc.lastReconcileTime = time.Now()

ticker := time.NewTicker(cnc.getNoncesInterval)
ticker := time.NewTimer(cnc.getNoncesInterval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cnc.reconcile(ctx)
case <-cnc.empty:
cnc.reconcile(ctx)
// clear out channel
for len(cnc.empty) > 0 {
<-cnc.empty
}
}
cnc.reconcile(ctx)
ticker.Reset(cnc.getNoncesInterval)
}
}

Expand Down Expand Up @@ -367,7 +375,7 @@ CheckNoncesLoop:
// remove this set of nonces from the cache
cnc.cache.Delete(i)

if len(cnc.cache.cache) == 0 {
if len(cnc.cache.cache) == 0 && len(cnc.empty) == 0 {
cnc.logger.Debug("Nonce cache is empty, triggering reload")
cnc.empty <- struct{}{}
}
Expand All @@ -380,7 +388,7 @@ CheckNoncesLoop:
}

// increment so it's taken into account in the nonce burn rate in the next reconciliation
cnc.lastReconcileNonces.Inc()
cnc.lastReconcileNonces.Add(1)

// no nonces found
cosignerInts := make([]int, len(fastestPeers))
Expand All @@ -390,28 +398,6 @@ CheckNoncesLoop:
return nil, fmt.Errorf("no nonces found involving cosigners %+v", cosignerInts)
}

func (cnc *CosignerNonceCache) PruneNonces() int {
cnc.cache.mu.Lock()
defer cnc.cache.mu.Unlock()
nonExpiredIndex := len(cnc.cache.cache) - 1
for i := len(cnc.cache.cache) - 1; i >= 0; i-- {
if time.Now().Before(cnc.cache.cache[i].Expiration) {
nonExpiredIndex = i
break
}
if i == 0 {
deleteCount := len(cnc.cache.cache)
cnc.cache.cache = nil
return deleteCount
}
}
deleteCount := len(cnc.cache.cache) - nonExpiredIndex - 1
if nonExpiredIndex != len(cnc.cache.cache)-1 {
cnc.cache.cache = cnc.cache.cache[:nonExpiredIndex+1]
}
return deleteCount
}

func (cnc *CosignerNonceCache) ClearNonces(cosigner Cosigner) {
cnc.cache.mu.Lock()
defer cnc.cache.mu.Unlock()
Expand Down
Loading

0 comments on commit e7db1ea

Please sign in to comment.