From c572a6466315d83c9289ec59090448ab26cd1f7d Mon Sep 17 00:00:00 2001 From: wjrjerome <wjrjerome@babylonchain.io> Date: Sat, 18 Jan 2025 00:26:53 +1100 Subject: [PATCH 1/3] fix: resubscribe to ws if not responding --- internal/clients/bbnclient/bbnclient.go | 98 +++++++++++++++++++++++-- internal/clients/bbnclient/interface.go | 8 +- internal/services/service.go | 8 +- internal/services/subscription.go | 74 ++++++++++++------- 4 files changed, 153 insertions(+), 35 deletions(-) diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go index d089485f..1b66657c 100644 --- a/internal/clients/bbnclient/bbnclient.go +++ b/internal/clients/bbnclient/bbnclient.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/avast/retry-go/v4" "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" @@ -16,8 +17,9 @@ import ( ) type BBNClient struct { - queryClient *query.QueryClient - cfg *config.BBNConfig + queryClient *query.QueryClient + cfg *config.BBNConfig + subscriptionChanMap map[string]chan ctypes.ResultEvent } func NewBBNClient(cfg *config.BBNConfig) BbnInterface { @@ -30,7 +32,11 @@ func NewBBNClient(cfg *config.BBNConfig) BbnInterface { if err != nil { log.Fatal().Err(err).Msg("error while creating BBN query client") } - return &BBNClient{queryClient, cfg} + return &BBNClient{ + queryClient: queryClient, + cfg: cfg, + subscriptionChanMap: make(map[string]chan ctypes.ResultEvent), + } } func (c *BBNClient) GetLatestBlockNumber(ctx context.Context) (int64, error) { @@ -140,11 +146,93 @@ func (c *BBNClient) GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.R return block, nil } -func (c *BBNClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { - return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...) +func (c *BBNClient) Subscribe( + subscriber, query string, + healthCheckInterval time.Duration, + maxEventWaitInterval time.Duration, + outCapacity ...int, +) (out <-chan ctypes.ResultEvent, err error) { + // Create a new channel for this subscriber if it doesn't exist + if _, exists := c.subscriptionChanMap[subscriber]; !exists { + c.subscriptionChanMap[subscriber] = make(chan ctypes.ResultEvent) + } + + var rawEventChan <-chan ctypes.ResultEvent + subscribe := func() error { + rawEventChan, err = c.queryClient.RPCClient.Subscribe( + context.Background(), + subscriber, + query, + outCapacity..., + ) + if err != nil { + return fmt.Errorf("failed to subscribe babylon events for query %s: %w", query, err) + } + return nil + } + + if err := subscribe(); err != nil { + return nil, err + } + + go func() { + timeoutTicker := time.NewTicker(healthCheckInterval) + defer timeoutTicker.Stop() + lastEventTime := time.Now() + + for { + select { + case event, ok := <-rawEventChan: + if !ok { + log.Error(). + Str("subscriber", subscriber). + Str("query", query). + Msg("Subscription channel closed") + return + } + lastEventTime = time.Now() + c.subscriptionChanMap[subscriber] <- event + + case <-timeoutTicker.C: + if time.Since(lastEventTime) > maxEventWaitInterval { + log.Error(). + Str("subscriber", subscriber). + Str("query", query). + Dur("healthCheckInterval", healthCheckInterval). + Dur("maxEventWaitInterval", maxEventWaitInterval). + Msg("No events received, attempting to resubscribe") + + if err := c.queryClient.RPCClient.Unsubscribe( + context.Background(), + subscriber, + query, + ); err != nil { + log.Error().Err(err).Msg("Failed to unsubscribe babylon events") + } + + if err := subscribe(); err != nil { + log.Error().Err(err).Msg("Failed to resubscribe babylon events") + } else { + log.Info(). + Str("subscriber", subscriber). + Str("query", query). + Msg("Successfully resubscribed babylon events") + // reset last event time + lastEventTime = time.Now() + } + } + } + } + }() + + return c.subscriptionChanMap[subscriber], nil } func (c *BBNClient) UnsubscribeAll(subscriber string) error { + if ch, exists := c.subscriptionChanMap[subscriber]; exists { + close(ch) + delete(c.subscriptionChanMap, subscriber) + } return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber) } diff --git a/internal/clients/bbnclient/interface.go b/internal/clients/bbnclient/interface.go index c035dc15..249e0777 100644 --- a/internal/clients/bbnclient/interface.go +++ b/internal/clients/bbnclient/interface.go @@ -2,6 +2,7 @@ package bbnclient import ( "context" + "time" ctypes "github.com/cometbft/cometbft/rpc/core/types" ) @@ -13,7 +14,12 @@ type BbnInterface interface { GetLatestBlockNumber(ctx context.Context) (int64, error) GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlock, error) GetBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, error) - Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) + Subscribe( + subscriber, query string, + healthCheckInterval time.Duration, + maxEventWaitInterval time.Duration, + outCapacity ...int, + ) (out <-chan ctypes.ResultEvent, err error) UnsubscribeAll(subscriber string) error IsRunning() bool Start() error diff --git a/internal/services/service.go b/internal/services/service.go index 39263fbe..35192ecf 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -65,15 +65,15 @@ func (s *Service) StartIndexerSync(ctx context.Context) { } // Sync global parameters - s.SyncGlobalParams(ctx) + // s.SyncGlobalParams(ctx) // Resubscribe to missed BTC notifications - s.ResubscribeToMissedBtcNotifications(ctx) + // s.ResubscribeToMissedBtcNotifications(ctx) // Start the expiry checker - s.StartExpiryChecker(ctx) + // s.StartExpiryChecker(ctx) // Start the websocket event subscription process s.SubscribeToBbnEvents(ctx) // Keep processing BBN blocks in the main thread - s.StartBbnBlockProcessor(ctx) + // s.StartBbnBlockProcessor(ctx) } func (s *Service) quitContext() (context.Context, func()) { diff --git a/internal/services/subscription.go b/internal/services/subscription.go index 902b324b..2330da45 100644 --- a/internal/services/subscription.go +++ b/internal/services/subscription.go @@ -2,6 +2,8 @@ package services import ( "context" + "fmt" + "time" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" ctypes "github.com/cometbft/cometbft/types" @@ -9,46 +11,68 @@ import ( ) const ( - subscriberName = "babylon-staking-indexer" - newBlockQuery = "tm.event='NewBlock'" + subscriberName = "babylon-staking-indexer" + newBlockQuery = "tm.event='NewBlock'" + outCapacity = 100 + subscriptionHealthCheckInterval = 5 * time.Second + maxEventWaitInterval = 2 * time.Second ) func (s *Service) SubscribeToBbnEvents(ctx context.Context) { if !s.bbn.IsRunning() { log.Fatal().Msg("BBN client is not running") } - - eventChan, err := s.bbn.Subscribe(subscriberName, newBlockQuery) + // Subscribe to new block events but only wait for 5 minutes for events + // if nothing come through within 5 minutes, the underlying subscription will + // be resubscribed. + // This is a workaround for the fact that cometbft ws_client does not have + // proper ping pong configuration setup to detect if the connection is dead. + // Refer to https://github.com/cometbft/cometbft/commit/2fd8496bc109d010c6c2e415604131b500550e37#r151452099 + eventChan, err := s.bbn.Subscribe( + subscriberName, + newBlockQuery, + subscriptionHealthCheckInterval, + maxEventWaitInterval, + outCapacity, + ) if err != nil { log.Fatal().Msgf("Failed to subscribe to events: %v", err) } - go func() { - for { - select { - case event := <-eventChan: - newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock) - if !ok { - log.Fatal().Msg("Event is not a NewBlock event") - } + // go func() { + for { + select { + case event := <-eventChan: + newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock) + if !ok { + log.Fatal(). + Str("event", fmt.Sprintf("%+v", event)). + Msg("Event is not a NewBlock event") + } - latestHeight := newBlockEvent.Block.Height - if latestHeight == 0 { - log.Fatal().Msg("Event doesn't contain block height information") - } + latestHeight := newBlockEvent.Block.Height + if latestHeight == 0 { + log.Fatal().Msg("Event doesn't contain block height information") + } + log.Info(). + Int64("height", latestHeight). + Msg("received new block event from babylon subscription") - // Send the latest height to the BBN block processor - s.latestHeightChan <- latestHeight + // Send the latest height to the BBN block processor + // s.latestHeightChan <- latestHeight + log.Info(). + Int64("latest_height", latestHeight). + Msg("latest height") - case <-ctx.Done(): - err := s.bbn.UnsubscribeAll(subscriberName) - if err != nil { - log.Error().Msgf("Failed to unsubscribe from events: %v", err) - } - return + case <-ctx.Done(): + err := s.bbn.UnsubscribeAll(subscriberName) + if err != nil { + log.Error().Msgf("Failed to unsubscribe from events: %v", err) } + return } - }() + } + // }() } // Resubscribe to missed BTC notifications From 3bf22466b55b51f7d4801da92195dc8718656ead Mon Sep 17 00:00:00 2001 From: wjrjerome <wjrjerome@babylonchain.io> Date: Sat, 18 Jan 2025 01:34:42 +1100 Subject: [PATCH 2/3] fix: resubscribe to ws if not responding --- internal/clients/bbnclient/bbnclient.go | 65 +++++++++++-------------- internal/services/service.go | 8 +-- internal/services/subscription.go | 59 ++++++++++------------ 3 files changed, 60 insertions(+), 72 deletions(-) diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go index 1b66657c..c3191fd3 100644 --- a/internal/clients/bbnclient/bbnclient.go +++ b/internal/clients/bbnclient/bbnclient.go @@ -17,9 +17,8 @@ import ( ) type BBNClient struct { - queryClient *query.QueryClient - cfg *config.BBNConfig - subscriptionChanMap map[string]chan ctypes.ResultEvent + queryClient *query.QueryClient + cfg *config.BBNConfig } func NewBBNClient(cfg *config.BBNConfig) BbnInterface { @@ -33,9 +32,8 @@ func NewBBNClient(cfg *config.BBNConfig) BbnInterface { log.Fatal().Err(err).Msg("error while creating BBN query client") } return &BBNClient{ - queryClient: queryClient, - cfg: cfg, - subscriptionChanMap: make(map[string]chan ctypes.ResultEvent), + queryClient: queryClient, + cfg: cfg, } } @@ -152,30 +150,32 @@ func (c *BBNClient) Subscribe( maxEventWaitInterval time.Duration, outCapacity ...int, ) (out <-chan ctypes.ResultEvent, err error) { - // Create a new channel for this subscriber if it doesn't exist - if _, exists := c.subscriptionChanMap[subscriber]; !exists { - c.subscriptionChanMap[subscriber] = make(chan ctypes.ResultEvent) - } + eventChan := make(chan ctypes.ResultEvent) - var rawEventChan <-chan ctypes.ResultEvent - subscribe := func() error { - rawEventChan, err = c.queryClient.RPCClient.Subscribe( + subscribe := func() (<-chan ctypes.ResultEvent, error) { + newChan, err := c.queryClient.RPCClient.Subscribe( context.Background(), subscriber, query, outCapacity..., ) if err != nil { - return fmt.Errorf("failed to subscribe babylon events for query %s: %w", query, err) + return nil, fmt.Errorf( + "failed to subscribe babylon events for query %s: %w", query, err, + ) } - return nil + return newChan, nil } - if err := subscribe(); err != nil { + // Initial subscription + rawEventChan, err := subscribe() + if err != nil { + close(eventChan) return nil, err } go func() { + defer close(eventChan) timeoutTicker := time.NewTicker(healthCheckInterval) defer timeoutTicker.Stop() lastEventTime := time.Now() @@ -184,22 +184,18 @@ func (c *BBNClient) Subscribe( select { case event, ok := <-rawEventChan: if !ok { - log.Error(). + log.Fatal(). Str("subscriber", subscriber). Str("query", query). - Msg("Subscription channel closed") - return + Msg("Subscription channel closed, this shall not happen") } lastEventTime = time.Now() - c.subscriptionChanMap[subscriber] <- event - + eventChan <- event case <-timeoutTicker.C: if time.Since(lastEventTime) > maxEventWaitInterval { log.Error(). Str("subscriber", subscriber). Str("query", query). - Dur("healthCheckInterval", healthCheckInterval). - Dur("maxEventWaitInterval", maxEventWaitInterval). Msg("No events received, attempting to resubscribe") if err := c.queryClient.RPCClient.Unsubscribe( @@ -210,29 +206,26 @@ func (c *BBNClient) Subscribe( log.Error().Err(err).Msg("Failed to unsubscribe babylon events") } - if err := subscribe(); err != nil { + // Create new subscription + newEventChan, err := subscribe() + if err != nil { log.Error().Err(err).Msg("Failed to resubscribe babylon events") - } else { - log.Info(). - Str("subscriber", subscriber). - Str("query", query). - Msg("Successfully resubscribed babylon events") - // reset last event time - lastEventTime = time.Now() + continue } + + // Replace the old channel with the new one + rawEventChan = newEventChan + // reset last event time + lastEventTime = time.Now() } } } }() - return c.subscriptionChanMap[subscriber], nil + return eventChan, nil } func (c *BBNClient) UnsubscribeAll(subscriber string) error { - if ch, exists := c.subscriptionChanMap[subscriber]; exists { - close(ch) - delete(c.subscriptionChanMap, subscriber) - } return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber) } diff --git a/internal/services/service.go b/internal/services/service.go index 35192ecf..39263fbe 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -65,15 +65,15 @@ func (s *Service) StartIndexerSync(ctx context.Context) { } // Sync global parameters - // s.SyncGlobalParams(ctx) + s.SyncGlobalParams(ctx) // Resubscribe to missed BTC notifications - // s.ResubscribeToMissedBtcNotifications(ctx) + s.ResubscribeToMissedBtcNotifications(ctx) // Start the expiry checker - // s.StartExpiryChecker(ctx) + s.StartExpiryChecker(ctx) // Start the websocket event subscription process s.SubscribeToBbnEvents(ctx) // Keep processing BBN blocks in the main thread - // s.StartBbnBlockProcessor(ctx) + s.StartBbnBlockProcessor(ctx) } func (s *Service) quitContext() (context.Context, func()) { diff --git a/internal/services/subscription.go b/internal/services/subscription.go index 2330da45..3263a8a4 100644 --- a/internal/services/subscription.go +++ b/internal/services/subscription.go @@ -2,7 +2,6 @@ package services import ( "context" - "fmt" "time" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" @@ -14,8 +13,8 @@ const ( subscriberName = "babylon-staking-indexer" newBlockQuery = "tm.event='NewBlock'" outCapacity = 100 - subscriptionHealthCheckInterval = 5 * time.Second - maxEventWaitInterval = 2 * time.Second + subscriptionHealthCheckInterval = 1 * time.Minute + maxEventWaitInterval = 1 * time.Minute ) func (s *Service) SubscribeToBbnEvents(ctx context.Context) { @@ -39,40 +38,36 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) { log.Fatal().Msgf("Failed to subscribe to events: %v", err) } - // go func() { - for { - select { - case event := <-eventChan: - newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock) - if !ok { - log.Fatal(). - Str("event", fmt.Sprintf("%+v", event)). - Msg("Event is not a NewBlock event") - } + go func() { + for { + select { + case event := <-eventChan: + newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock) + if !ok { + log.Fatal().Msg("Event is not a NewBlock event") + } - latestHeight := newBlockEvent.Block.Height - if latestHeight == 0 { - log.Fatal().Msg("Event doesn't contain block height information") - } - log.Info(). - Int64("height", latestHeight). - Msg("received new block event from babylon subscription") + latestHeight := newBlockEvent.Block.Height + if latestHeight == 0 { + log.Fatal().Msg("Event doesn't contain block height information") + } + log.Debug(). + Int64("height", latestHeight). + Msg("received new block event from babylon subscription") - // Send the latest height to the BBN block processor - // s.latestHeightChan <- latestHeight - log.Info(). - Int64("latest_height", latestHeight). - Msg("latest height") + // Send the latest height to the BBN block processor + s.latestHeightChan <- latestHeight - case <-ctx.Done(): - err := s.bbn.UnsubscribeAll(subscriberName) - if err != nil { - log.Error().Msgf("Failed to unsubscribe from events: %v", err) + case <-ctx.Done(): + log.Info().Msg("context done, unsubscribing all babylon events") + err := s.bbn.UnsubscribeAll(subscriberName) + if err != nil { + log.Error().Msgf("Failed to unsubscribe from events: %v", err) + } + return } - return } - } - // }() + }() } // Resubscribe to missed BTC notifications From 568786e16f646ec9c967ed34eb7bdc01b78249f1 Mon Sep 17 00:00:00 2001 From: wjrjerome <wjrjerome@babylonchain.io> Date: Tue, 21 Jan 2025 22:21:54 +1100 Subject: [PATCH 3/3] add wg --- internal/clients/bbnclient/bbnclient.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go index c3191fd3..1d038701 100644 --- a/internal/clients/bbnclient/bbnclient.go +++ b/internal/clients/bbnclient/bbnclient.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/avast/retry-go/v4" @@ -17,6 +18,7 @@ import ( ) type BBNClient struct { + wg sync.WaitGroup queryClient *query.QueryClient cfg *config.BBNConfig } @@ -173,8 +175,9 @@ func (c *BBNClient) Subscribe( close(eventChan) return nil, err } - + c.wg.Add(1) go func() { + defer c.wg.Done() defer close(eventChan) timeoutTicker := time.NewTicker(healthCheckInterval) defer timeoutTicker.Stop()