Skip to content

Commit

Permalink
Integrating drand interfaces into drand-cli codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
AnomalRoil committed Jul 31, 2024
1 parent da4b07e commit 38ce42c
Show file tree
Hide file tree
Showing 35 changed files with 573 additions and 515 deletions.
26 changes: 13 additions & 13 deletions client/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync"
"time"

"github.com/drand/drand/v2/common/client"
"github.com/drand/drand/v2/common/log"
"github.com/drand/go-clients/drand"
)

const (
Expand All @@ -24,7 +24,7 @@ const (
// is passed, a `watch` will be run on the watch client in the absence of external watchers,
// which will swap watching over to the main client. If no watch client is set and autowatch is off
// then a single watch will only run when an external watch is requested.
func newWatchAggregator(l log.Logger, c, wc client.Client, autoWatch bool, autoWatchRetry time.Duration) *watchAggregator {
func newWatchAggregator(l log.Logger, c, wc drand.Client, autoWatch bool, autoWatchRetry time.Duration) *watchAggregator {
if autoWatchRetry == 0 {
autoWatchRetry = defaultAutoWatchRetry
}
Expand All @@ -41,12 +41,12 @@ func newWatchAggregator(l log.Logger, c, wc client.Client, autoWatch bool, autoW

type subscriber struct {
ctx context.Context
c chan client.Result
c chan drand.Result
}

type watchAggregator struct {
client.Client
passiveClient client.Client
drand.Client
passiveClient drand.Client
autoWatch bool
autoWatchRetry time.Duration
log log.Logger
Expand Down Expand Up @@ -82,7 +82,7 @@ func (c *watchAggregator) startAutoWatch(full bool) {
c.cancelAutoWatch = cancel
go func() {
for {
var results <-chan client.Result
var results <-chan drand.Result
if full {
results = c.Watch(ctx)
} else if c.passiveClient != nil {
Expand Down Expand Up @@ -118,7 +118,7 @@ func (c *watchAggregator) startAutoWatch(full bool) {

// passiveWatch is a degraded form of watch, where watch only hits the 'passive client'
// unless distribution is actually needed.
func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result {
func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan drand.Result {
c.subscriberLock.Lock()
defer c.subscriberLock.Unlock()

Expand All @@ -127,7 +127,7 @@ func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result
return nil
}

wc := make(chan client.Result)
wc := make(chan drand.Result)
if len(c.subscribers) == 0 {
ctx, cancel := context.WithCancel(ctx)
c.cancelPassive = cancel
Expand All @@ -139,11 +139,11 @@ func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result
return wc
}

func (c *watchAggregator) Watch(ctx context.Context) <-chan client.Result {
func (c *watchAggregator) Watch(ctx context.Context) <-chan drand.Result {
c.subscriberLock.Lock()
defer c.subscriberLock.Unlock()

sub := subscriber{ctx, make(chan client.Result, aggregatorWatchBuffer)}
sub := subscriber{ctx, make(chan drand.Result, aggregatorWatchBuffer)}
c.subscribers = append(c.subscribers, sub)

if len(c.subscribers) == 1 {
Expand All @@ -157,14 +157,14 @@ func (c *watchAggregator) Watch(ctx context.Context) <-chan client.Result {
return sub.c
}

func (c *watchAggregator) sink(in <-chan client.Result, out chan client.Result) {
func (c *watchAggregator) sink(in <-chan drand.Result, out chan drand.Result) {
defer close(out)
for range in {
continue
}
}

func (c *watchAggregator) distribute(in <-chan client.Result, cancel context.CancelFunc) {
func (c *watchAggregator) distribute(in <-chan drand.Result, cancel context.CancelFunc) {
defer cancel()
for {
c.subscriberLock.Lock()
Expand All @@ -176,7 +176,7 @@ func (c *watchAggregator) distribute(in <-chan client.Result, cancel context.Can
aCtx := c.subscribers[0].ctx
c.subscriberLock.Unlock()

var m client.Result
var m drand.Result
var ok bool

select {
Expand Down
8 changes: 4 additions & 4 deletions client/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
"testing"
"time"

"github.com/drand/drand/v2/common/client"
"github.com/drand/drand/v2/common/log"
clientMock "github.com/drand/go-clients/client/mock"
"github.com/drand/go-clients/client/test/result/mock"
"github.com/drand/go-clients/drand"
)

func TestAggregatorClose(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)

c := &clientMock.Client{
WatchCh: make(chan client.Result),
WatchCh: make(chan drand.Result),
CloseF: func() error {
wg.Done()
return nil
Expand All @@ -38,15 +38,15 @@ func TestAggregatorPassive(t *testing.T) {
wg.Add(1)

c := &clientMock.Client{
WatchCh: make(chan client.Result, 1),
WatchCh: make(chan drand.Result, 1),
CloseF: func() error {
wg.Done()
return nil
},
}

wc := &clientMock.Client{
WatchCh: make(chan client.Result, 1),
WatchCh: make(chan drand.Result, 1),
CloseF: func() error {
return nil
},
Expand Down
27 changes: 14 additions & 13 deletions client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (

lru "github.com/hashicorp/golang-lru"

"github.com/drand/drand/v2/common/client"
"github.com/drand/go-clients/drand"

"github.com/drand/drand/v2/common/log"
)

// Cache provides a mechanism to check for rounds in the cache.
type Cache interface {
// TryGet provides a round beacon or nil if it is not cached.
TryGet(round uint64) client.Result
TryGet(round uint64) drand.Result
// Add adds an item to the cache
Add(uint64, client.Result)
Add(uint64, drand.Result)
}

// makeCache creates a cache of a given size
Expand All @@ -36,14 +37,14 @@ type typedCache struct {
}

// Add a result to the cache
func (t *typedCache) Add(round uint64, result client.Result) {
func (t *typedCache) Add(round uint64, result drand.Result) {
t.ARCCache.Add(round, result)
}

// TryGet attempts to get a result from the cache
func (t *typedCache) TryGet(round uint64) client.Result {
func (t *typedCache) TryGet(round uint64) drand.Result {
if val, ok := t.ARCCache.Get(round); ok {
return val.(client.Result)
return val.(drand.Result)
}
return nil
}
Expand All @@ -52,17 +53,17 @@ func (t *typedCache) TryGet(round uint64) client.Result {
type nilCache struct{}

// Add a result to the cache
func (*nilCache) Add(_ uint64, _ client.Result) {
func (*nilCache) Add(_ uint64, _ drand.Result) {
}

// TryGet attempts to get ar esult from the cache
func (*nilCache) TryGet(_ uint64) client.Result {
func (*nilCache) TryGet(_ uint64) drand.Result {
return nil
}

// NewCachingClient is a meta client that stores an LRU cache of
// recently fetched random values.
func NewCachingClient(l log.Logger, c client.Client, cache Cache) (client.Client, error) {
func NewCachingClient(l log.Logger, c drand.Client, cache Cache) (drand.Client, error) {
return &cachingClient{
Client: c,
cache: cache,
Expand All @@ -71,7 +72,7 @@ func NewCachingClient(l log.Logger, c client.Client, cache Cache) (client.Client
}

type cachingClient struct {
client.Client
drand.Client

cache Cache
log log.Logger
Expand All @@ -91,7 +92,7 @@ func (c *cachingClient) String() string {
}

// Get returns the randomness at `round` or an error.
func (c *cachingClient) Get(ctx context.Context, round uint64) (res client.Result, err error) {
func (c *cachingClient) Get(ctx context.Context, round uint64) (res drand.Result, err error) {
if val := c.cache.TryGet(round); val != nil {
return val, nil
}
Expand All @@ -102,9 +103,9 @@ func (c *cachingClient) Get(ctx context.Context, round uint64) (res client.Resul
return val, err
}

func (c *cachingClient) Watch(ctx context.Context) <-chan client.Result {
func (c *cachingClient) Watch(ctx context.Context) <-chan drand.Result {
in := c.Client.Watch(ctx)
out := make(chan client.Result)
out := make(chan drand.Result)
go func() {
for result := range in {
if ctx.Err() != nil {
Expand Down
6 changes: 3 additions & 3 deletions client/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"sync"
"testing"

"github.com/drand/drand/v2/common/client"
"github.com/drand/drand/v2/common/log"
clientMock "github.com/drand/go-clients/client/mock"
"github.com/drand/go-clients/client/test/result/mock"
"github.com/drand/go-clients/drand"
)

func TestCacheGet(t *testing.T) {
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestCacheGetLatest(t *testing.T) {

func TestCacheWatch(t *testing.T) {
m := clientMock.ClientWithResults(2, 6)
rc := make(chan client.Result, 1)
rc := make(chan drand.Result, 1)
m.WatchCh = rc
arcCache, err := makeCache(3)
if err != nil {
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestCacheClose(t *testing.T) {
wg.Add(1)

c := &clientMock.Client{
WatchCh: make(chan client.Result),
WatchCh: make(chan drand.Result),
CloseF: func() error {
wg.Done()
return nil
Expand Down
Loading

0 comments on commit 38ce42c

Please sign in to comment.