Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry subscription if no new events #131

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 87 additions & 3 deletions internal/clients/bbnclient/bbnclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/avast/retry-go/v4"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/config"
Expand All @@ -16,6 +18,7 @@ import (
)

type BBNClient struct {
wg sync.WaitGroup
queryClient *query.QueryClient
cfg *config.BBNConfig
}
Expand All @@ -30,7 +33,10 @@ func NewBBNClient(cfg *config.BBNConfig) BbnInterface {
if err != nil {
log.Fatal().Err(err).Msg("error while creating BBN query client")
}
return &BBNClient{queryClient, cfg}
return &BBNClient{
queryClient: queryClient,
cfg: cfg,
}
}

func (c *BBNClient) GetLatestBlockNumber(ctx context.Context) (int64, error) {
Expand Down Expand Up @@ -140,8 +146,86 @@ func (c *BBNClient) GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.R
return block, nil
}

func (c *BBNClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...)
func (c *BBNClient) Subscribe(
subscriber, query string,
healthCheckInterval time.Duration,
maxEventWaitInterval time.Duration,
outCapacity ...int,
) (out <-chan ctypes.ResultEvent, err error) {
eventChan := make(chan ctypes.ResultEvent)

subscribe := func() (<-chan ctypes.ResultEvent, error) {
newChan, err := c.queryClient.RPCClient.Subscribe(
context.Background(),
subscriber,
query,
outCapacity...,
jrwbabylonlab marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
return nil, fmt.Errorf(
"failed to subscribe babylon events for query %s: %w", query, err,
)
}
return newChan, nil
}

// Initial subscription
rawEventChan, err := subscribe()
if err != nil {
close(eventChan)
return nil, err
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer close(eventChan)
jrwbabylonlab marked this conversation as resolved.
Show resolved Hide resolved
timeoutTicker := time.NewTicker(healthCheckInterval)
defer timeoutTicker.Stop()
lastEventTime := time.Now()

for {
select {
case event, ok := <-rawEventChan:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was wondering would it be simpler if we on every ticker we unsubscribe/re-subscribe again instead of lastEventTime 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm… this feels even hackier than my current hacky solution, haha.

In the worst-case scenario, if this turns out to be too complex, we could just revert the code to use polling with a 2–3 second interval. Let’s keep that as a last resort, though.

if !ok {
log.Fatal().
Str("subscriber", subscriber).
Str("query", query).
Msg("Subscription channel closed, this shall not happen")
}
lastEventTime = time.Now()
eventChan <- event
case <-timeoutTicker.C:
if time.Since(lastEventTime) > maxEventWaitInterval {
log.Error().
Str("subscriber", subscriber).
Str("query", query).
Msg("No events received, attempting to resubscribe")

if err := c.queryClient.RPCClient.Unsubscribe(
context.Background(),
subscriber,
query,
); err != nil {
log.Error().Err(err).Msg("Failed to unsubscribe babylon events")
}

// Create new subscription
newEventChan, err := subscribe()
if err != nil {
log.Error().Err(err).Msg("Failed to resubscribe babylon events")
continue
}

// Replace the old channel with the new one
rawEventChan = newEventChan
// reset last event time
lastEventTime = time.Now()
}
}
}
}()

return eventChan, nil
}

func (c *BBNClient) UnsubscribeAll(subscriber string) error {
Expand Down
8 changes: 7 additions & 1 deletion internal/clients/bbnclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bbnclient

import (
"context"
"time"

ctypes "github.com/cometbft/cometbft/rpc/core/types"
)
Expand All @@ -13,7 +14,12 @@ type BbnInterface interface {
GetLatestBlockNumber(ctx context.Context) (int64, error)
GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlock, error)
GetBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, error)
Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
Subscribe(
subscriber, query string,
healthCheckInterval time.Duration,
maxEventWaitInterval time.Duration,
outCapacity ...int,
) (out <-chan ctypes.ResultEvent, err error)
UnsubscribeAll(subscriber string) error
IsRunning() bool
Start() error
Expand Down
27 changes: 23 additions & 4 deletions internal/services/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,38 @@ package services

import (
"context"
"time"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
ctypes "github.com/cometbft/cometbft/types"
"github.com/rs/zerolog/log"
)

const (
subscriberName = "babylon-staking-indexer"
newBlockQuery = "tm.event='NewBlock'"
subscriberName = "babylon-staking-indexer"
newBlockQuery = "tm.event='NewBlock'"
outCapacity = 100
subscriptionHealthCheckInterval = 1 * time.Minute
maxEventWaitInterval = 1 * time.Minute
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put the capacity and intervals in config?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can, but do you see any value from adjusting this value set once rolled out? i don't see reasons to change values hence left it here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep no blocker, we can make it configurable later if needed

the intervals might be nice to have b/c recently i remember before the phase 2 testnet launch the infra guys turned on the backend services to check everything but the babylon node was still not producing new blocks (as there was no validators running), in this case as no blocks are being produced the indexer would still subscribe/resubscribe every 1 minute.

)

func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
if !s.bbn.IsRunning() {
log.Fatal().Msg("BBN client is not running")
}

eventChan, err := s.bbn.Subscribe(subscriberName, newBlockQuery)
// Subscribe to new block events but only wait for 5 minutes for events
// if nothing come through within 5 minutes, the underlying subscription will
// be resubscribed.
// This is a workaround for the fact that cometbft ws_client does not have
// proper ping pong configuration setup to detect if the connection is dead.
// Refer to https://github.com/cometbft/cometbft/commit/2fd8496bc109d010c6c2e415604131b500550e37#r151452099
eventChan, err := s.bbn.Subscribe(
subscriberName,
newBlockQuery,
subscriptionHealthCheckInterval,
maxEventWaitInterval,
outCapacity,
)
if err != nil {
log.Fatal().Msgf("Failed to subscribe to events: %v", err)
}
Expand All @@ -36,11 +51,15 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
if latestHeight == 0 {
log.Fatal().Msg("Event doesn't contain block height information")
}
log.Debug().
Int64("height", latestHeight).
Msg("received new block event from babylon subscription")

// Send the latest height to the BBN block processor
s.latestHeightChan <- latestHeight

case <-ctx.Done():
log.Info().Msg("context done, unsubscribing all babylon events")
err := s.bbn.UnsubscribeAll(subscriberName)
if err != nil {
log.Error().Msgf("Failed to unsubscribe from events: %v", err)
Expand Down
Loading