Skip to content

Commit

Permalink
Merge pull request #5 from snorremd/fix/firehose-restart-on-err-backoff
Browse files Browse the repository at this point in the history
fix: Websocket reconnect with backoff
  • Loading branch information
snorremd authored Oct 3, 2023
2 parents 3bd17ee + 1197736 commit c6e0aa5
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 19 deletions.
6 changes: 1 addition & 5 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,7 @@ func serveCmd() *cli.Command {

go func() {
fmt.Println("Subscribing to firehose...")
if err := fh.Subscribe(); err != nil {
// Use signal to shutdown all the goroutines
log.Error(err)
c <- os.Interrupt
}
fh.Subscribe()
}()

go func() {
Expand Down
4 changes: 1 addition & 3 deletions cmd/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ Prints all other log messages to stderr.`,

go func() {
fmt.Println("Subscribing to firehose...")
if err := fh.Subscribe(); err != nil {
log.Panic(err)
}
fh.Subscribe()
}()

go func() {
Expand Down
36 changes: 25 additions & 11 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/repomgr"
"github.com/cenkalti/backoff/v4"
"github.com/gorilla/websocket"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
)

// Add a firehose model to use as a receiver pattern for the firehose
Expand Down Expand Up @@ -46,18 +48,30 @@ func New(postChan chan interface{}, context context.Context) *Firehose {
}

// Subscribe to the firehose using the Firehose struct as a receiver
func (firehose *Firehose) Subscribe() error {
conn, _, err := firehose.dialer.Dial(firehose.address, nil)
if err != nil {
fmt.Printf("Error connecting to firehose: %s\n", err)
return err
func (firehose *Firehose) Subscribe() {

backoff := backoff.NewExponentialBackOff()

for {
conn, _, err := firehose.dialer.Dial(firehose.address, nil)
if err != nil {
log.Errorf("Error connecting to firehose: %s", err)
time.Sleep(backoff.NextBackOff())
// Increase backoff by factor of 1.3, rounded to nearest whole number
continue
}

firehose.conn = conn
firehose.scheduler = sequential.NewScheduler(conn.RemoteAddr().String(), eventProcessor(firehose.postChan, firehose.context).EventHandler)
err = events.HandleRepoStream(context.Background(), conn, firehose.scheduler)

// If error sleep
if err != nil {
log.Errorf("Error handling repo stream: %s", err)
time.Sleep(backoff.NextBackOff())
continue
}
}

firehose.conn = conn
firehose.scheduler = sequential.NewScheduler(conn.RemoteAddr().String(), eventProcessor(firehose.postChan, firehose.context).EventHandler)
events.HandleRepoStream(context.Background(), conn, firehose.scheduler)

return nil
}

func (firehose *Firehose) Shutdown() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/charmbracelet/bubbles v0.16.1 // indirect
github.com/charmbracelet/bubbletea v0.24.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bluesky-social/indigo v0.0.0-20230919180850-251fff6498dc h1:zgMSIxrsR7ZASv3uge+N2wY6EV8gojrP9G7hAorz6d0=
github.com/bluesky-social/indigo v0.0.0-20230919180850-251fff6498dc/go.mod h1:xeZ7rqlwFUpv5iuzYwOXDo4PgNzYPl6J/DytBvQgVuE=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/charmbracelet/bubbles v0.16.1 h1:6uzpAAaT9ZqKssntbvZMlksWHruQLNxg49H5WdeuYSY=
Expand Down

0 comments on commit c6e0aa5

Please sign in to comment.