Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add round-robin sink #28

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ var (
// closed. If encountered, the error should be considered terminal and
// retries will not be successful.
ErrSinkClosed = fmt.Errorf("events: sink closed")
// ErrNoSinks is returned if a write is issued to a round-robin sink without
// destiny. If encountered, destiny sinks should be added before calling
// Write again.
ErrNoSinks = fmt.Errorf("events: no destiny sink")
)
206 changes: 206 additions & 0 deletions roundrobin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package events

import (
"fmt"
"sync"

"github.com/sirupsen/logrus"
)

type eventRequest struct {
event Event
ch chan error
}

// RoundRobin sends events to one of the Sinks in order. The goal of this
// component is to dispatch events to configured endpoints. Reliability can be
// provided by wrapping ingoing sinks.
type RoundRobin struct {
sinks []Sink
cursor int
events chan eventRequest
adds chan configureRequest
removes chan configureRequest

shutdown chan struct{}
closed chan struct{}
once sync.Once
}

// NewRoundRobin appends one or more sinks to the list of sinks. The
// round-robin behavior will be affected by the properties of the sink.
// Generally, the sink should accept all messages and deal with reliability on
// its own. Use of EventQueue and RetryingSink should be used here.
func NewRoundRobin(sinks ...Sink) *RoundRobin {
b := RoundRobin{
sinks: sinks,
events: make(chan eventRequest),
adds: make(chan configureRequest),
removes: make(chan configureRequest),
shutdown: make(chan struct{}),
closed: make(chan struct{}),
}

// Start the round-robin
go b.run()

return &b
}

// Write accepts an event to be dispatched to a sink.
func (rr *RoundRobin) Write(event Event) error {
respChan := make(chan error, 1)
request := eventRequest{event: event, ch: respChan}

select {
case rr.events <- request:
case <-rr.closed:
return ErrSinkClosed
}

select {
case err := <-respChan:
return err
case <-rr.closed:
return ErrSinkClosed
}
}

// Add the sink to the broadcaster.
//
// The provided sink must be comparable with equality. Typically, this just
// works with a regular pointer type.
func (rr *RoundRobin) Add(sink Sink) error {
return rr.configure(rr.adds, sink)
}

// Remove the provided sink.
func (rr *RoundRobin) Remove(sink Sink) error {
return rr.configure(rr.removes, sink)
}

func (rr *RoundRobin) configure(ch chan configureRequest, sink Sink) error {
response := make(chan error, 1)

for {
select {
case ch <- configureRequest{
sink: sink,
response: response}:
ch = nil
case err := <-response:
return err
case <-rr.closed:
return ErrSinkClosed
}
}
}

// Close the round-robin, ensuring that all messages are flushed to the
// underlying sink before returning.
func (rr *RoundRobin) Close() error {
rr.once.Do(func() {
close(rr.shutdown)
})

<-rr.closed
return nil
}

// run is the main round-robin loop, started when the round-robin is created.
// Under normal conditions, it waits for events on the event channel. After
// Close is called, this goroutine will exit.
func (rr *RoundRobin) run() {
defer close(rr.closed)

for {
select {
case request := <-rr.events:
if len(rr.sinks) == 0 {
request.ch <- ErrNoSinks
break
}

rr.cursor++
rr.cursor %= len(rr.sinks)

for {
sink := rr.sinks[rr.cursor]

if err := sink.Write(request.event); err == ErrSinkClosed {
// remove closed sinks
rr.sinks = append(rr.sinks[:rr.cursor],
rr.sinks[rr.cursor+1:]...)
// check that it was not the only remaining sink
if len(rr.sinks) == 0 {
request.ch <- ErrNoSinks
break
}
// continue from the start if it was the last sink
rr.cursor %= len(rr.sinks)
} else {
request.ch <- err
break
}
}
case request := <-rr.adds:
// while we have to iterate for add/remove, common iteration for
// send is faster against slice.

var found bool
for _, sink := range rr.sinks {
if request.sink == sink {
found = true
break
}
}

if !found {
rr.sinks = append(rr.sinks, request.sink)
}
request.response <- nil
case request := <-rr.removes:
for i, sink := range rr.sinks {
if sink == request.sink {
rr.sinks = append(rr.sinks[:i], rr.sinks[i+1:]...)
if len(rr.sinks) == 0 {
rr.cursor = 0
} else if rr.cursor >= i {
// decrease the cursor if the remove sink was before
rr.cursor--
rr.cursor %= len(rr.sinks)
}
break
}
}
request.response <- nil
case <-rr.shutdown:
// close all the underlying sinks
for _, sink := range rr.sinks {
if err := sink.Close(); err != nil && err != ErrSinkClosed {
logrus.WithField("events.sink", sink).WithError(err).
Errorf("round-robin: closing sink failed")
}
}
return
}
}
}

func (rr *RoundRobin) String() string {
// Serialize copy of this round-robin without the sync.Once, to avoid
// a data race.

rr2 := map[string]interface{}{
"sinks": rr.sinks,
"cursor": rr.cursor,
"events": rr.events,
"adds": rr.adds,
"removes": rr.removes,

"shutdown": rr.shutdown,
"closed": rr.closed,
}

return fmt.Sprint(rr2)
}
89 changes: 89 additions & 0 deletions roundrobin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package events

import (
"sync"
"testing"
)

func TestRoundRobin(t *testing.T) {
const nEvents = 1000
const nSinks = 10
var sinks []Sink
rr := NewRoundRobin()
for i := 0; i < nSinks; i++ {
sinks = append(sinks, newTestSink(t, nEvents/nSinks))
rr.Add(sinks[i])
rr.Add(sinks[i]) // noop
}

var wg sync.WaitGroup
for i := 0; i < nEvents; i++ {
wg.Add(1)
go func(event Event) {
if err := rr.Write(event); err != nil {
t.Fatalf("error writing event %v: %v", event, err)
}
wg.Done()
}("event")
}

wg.Wait() // Wait until writes complete

for _, sink := range sinks {
rr.Remove(sink)
}

// sending one more should trigger no sink failure.
if err := rr.Write("one-more"); err != ErrNoSinks {
t.Fatalf("unexpected error: %v != %v", err, ErrNoSinks)
}

// add them back to test closing.
for i := range sinks {
rr.Add(sinks[i])
}

checkClose(t, rr)

// Iterate through the sinks and check that they all have the expected length.
for _, sink := range sinks {
ts := sink.(*testSink)
ts.mu.Lock()
defer ts.mu.Unlock()

if len(ts.events) != nEvents/nSinks {
t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents/nSinks)
}

if !ts.closed {
t.Fatalf("sink should have been closed")
}
}
}

func BenchmarkRoundRobin10(b *testing.B) {
benchmarkRoundRobin(b, 10)
}

func BenchmarkRoundRobin100(b *testing.B) {
benchmarkRoundRobin(b, 100)
}

func BenchmarkRoundRobin1000(b *testing.B) {
benchmarkRoundRobin(b, 1000)
}

func BenchmarkRoundRobin10000(b *testing.B) {
benchmarkRoundRobin(b, 10000)
}

func benchmarkRoundRobin(b *testing.B, nsinks int) {
b.StopTimer()
var sinks []Sink
for i := 0; i < nsinks; i++ {
sinks = append(sinks, newTestSink(b, b.N/nsinks + 1))
}
b.StartTimer()

benchmarkSink(b, NewRoundRobin(sinks...))
}