Skip to content

Commit

Permalink
fix: Add check for backoff stop to stop websocket connects
Browse files Browse the repository at this point in the history
When backoff indicates we should stop, actually stop the reconnect
logic.
  • Loading branch information
snorremd committed Nov 19, 2024
1 parent 172a63a commit 9742da3
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// Subscribe to the firehose using the Firehose struct as a receiver
func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Ticker, seq int64) {
address := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
address := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepo"
headers := http.Header{}
headers.Set("User-Agent", "NorSky: https://github.com/snorremd/norsky")

Expand All @@ -37,9 +37,9 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick

dialer := websocket.DefaultDialer
backoff := backoff.NewExponentialBackOff()
backoff.InitialInterval = 3 * time.Second
backoff.InitialInterval = 5 * time.Second
backoff.MaxInterval = 30 * time.Second
backoff.Multiplier = 1.5
backoff.Multiplier = 2
backoff.MaxElapsedTime = 120 * time.Second

// Check if context is cancelled, if so exit the connection loop
Expand All @@ -52,7 +52,16 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick
conn, _, err := dialer.Dial(address, nil)
if err != nil {
log.Errorf("Error connecting to firehose: %s", err)
time.Sleep(backoff.NextBackOff())

// Get the next backoff duration
duration := backoff.NextBackOff()

if duration == backoff.Stop {
log.Warn("MaxElapsedTime reached. Stopping reconnect attempts.")
return // Exit the loop
}

time.Sleep(duration)
// Increase backoff by factor of 1.3, rounded to nearest whole number
continue
}
Expand Down

0 comments on commit 9742da3

Please sign in to comment.