diff --git a/errors.go b/errors.go index 56db7c2..e6eeec6 100644 --- a/errors.go +++ b/errors.go @@ -7,4 +7,9 @@ var ( // closed. If encountered, the error should be considered terminal and // retries will not be successful. ErrSinkClosed = fmt.Errorf("events: sink closed") + + // ErrSinkTimeout is returned if a write is issued to a sink and it does + // not return in the specified time. If encountered, the error may mean + // that the sink is overloaded and retries may be successful. + ErrSinkTimeout = fmt.Errorf("events: sink timeout") ) diff --git a/timeout.go b/timeout.go new file mode 100644 index 0000000..7e49120 --- /dev/null +++ b/timeout.go @@ -0,0 +1,48 @@ +package events + +import "time" + +// Timeout provides an event sink that requires sent events to return in a +// specified amount of time or considers them to have failed. +type Timeout struct { + dst Sink + timeout time.Duration + closed bool +} + +// NewTimeout returns a new timeout to the provided dst sink. +func NewTimeout(dst Sink, timeout time.Duration) Sink { + return &Timeout{dst: dst, timeout: timeout} +} + +// Write an event to the timeout. +func (t *Timeout) Write(event Event) error { + if t.closed { + return ErrSinkClosed + } + + errChan := make(chan error) + go func(c chan<- error) { + c <- t.dst.Write(event) + }(errChan) + + timer := time.NewTimer(t.timeout) + select { + case err := <-errChan: + timer.Stop() + return err + case <-timer.C: + return ErrSinkTimeout + } +} + +// Close the timeout and allow no more events to pass through. +func (t *Timeout) Close() error { + // TODO(stevvooe): Not all sinks should have Close. + if t.closed { + return nil + } + + t.closed = true + return t.dst.Close() +} \ No newline at end of file diff --git a/timeout_test.go b/timeout_test.go new file mode 100644 index 0000000..1a9b1a8 --- /dev/null +++ b/timeout_test.go @@ -0,0 +1,40 @@ +package events + +import ( + "testing" + "time" +) + +func TestTimeout(t *testing.T) { + const nevents = 100 + sink := newTestSink(t, nevents*2) + + ts := NewTimeout( + sink, + time.Millisecond, + ) + for i := 0; i < nevents; i++ { + if err := ts.Write(i); err != nil { + t.Fatalf("error writting event: %v", err) + } + } + + ts = NewTimeout( + // delayed sink simulates destination slower than timeout + &delayedSink{ + sink, + time.Millisecond * 2, + }, + time.Millisecond, + ) + for i := 0; i < nevents; i++ { + if err := ts.Write(i); err != ErrSinkTimeout { + t.Fatalf("unexpected error: %v != %v", err, ErrSinkTimeout) + } + } + + // Wait for all the events + time.Sleep(time.Millisecond * 5) + + checkClose(t, ts) +}