Skip to content

Commit

Permalink
Make initial/max retry duration configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
ThiefMaster committed Jan 12, 2019
1 parent 3ed64d2 commit b1ba234
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ type Stream struct {
c *http.Client
req *http.Request
lastEventId string
retry time.Duration
// InitialRetryDelay indicates how long to wait before attempting to reconnect.
// This may be overridden by the server.
InitialRetryDelay time.Duration
// MaxRetryDelay specifies the maximum time to wait between consecutive
// reconnect attempts.
MaxRetryDelay time.Duration
// Events emits the events received by the stream
Events chan Event
// Errors emits any errors encountered while reading events from the stream.
Expand Down Expand Up @@ -63,12 +68,13 @@ func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, e
// control over the http client settings (timeouts, tls, etc)
func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error) {
stream := &Stream{
c: client,
req: request,
lastEventId: lastEventId,
retry: time.Millisecond * 3000,
Events: make(chan Event),
Errors: make(chan error),
c: client,
req: request,
lastEventId: lastEventId,
InitialRetryDelay: time.Millisecond * 3000,
MaxRetryDelay: 0,
Events: make(chan Event),
Errors: make(chan error),
}
stream.c.CheckRedirect = checkRedirect

Expand Down Expand Up @@ -163,7 +169,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) {

pub := ev.(*publication)
if pub.Retry() > 0 {
stream.retry = time.Duration(pub.Retry()) * time.Millisecond
stream.InitialRetryDelay = time.Duration(pub.Retry()) * time.Millisecond
}
if len(pub.Id()) > 0 {
stream.lastEventId = pub.Id()
Expand All @@ -173,7 +179,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) {
}

func (stream *Stream) retryRestartStream() {
backoff := stream.retry
backoff := stream.InitialRetryDelay
for {
if stream.Logger != nil {
stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
Expand All @@ -192,5 +198,8 @@ func (stream *Stream) retryRestartStream() {
}
stream.Errors <- err
backoff *= 2
if stream.MaxRetryDelay > 0 && backoff > stream.MaxRetryDelay {
backoff = stream.MaxRetryDelay
}
}
}

0 comments on commit b1ba234

Please sign in to comment.