From c572a6466315d83c9289ec59090448ab26cd1f7d Mon Sep 17 00:00:00 2001
From: wjrjerome <wjrjerome@babylonchain.io>
Date: Sat, 18 Jan 2025 00:26:53 +1100
Subject: [PATCH 1/3] fix: resubscribe to ws if not responding

---
 internal/clients/bbnclient/bbnclient.go | 98 +++++++++++++++++++++++--
 internal/clients/bbnclient/interface.go |  8 +-
 internal/services/service.go            |  8 +-
 internal/services/subscription.go       | 74 ++++++++++++-------
 4 files changed, 153 insertions(+), 35 deletions(-)

diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go
index d089485f..1b66657c 100644
--- a/internal/clients/bbnclient/bbnclient.go
+++ b/internal/clients/bbnclient/bbnclient.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"strings"
+	"time"
 
 	"github.com/avast/retry-go/v4"
 	"github.com/babylonlabs-io/babylon-staking-indexer/internal/config"
@@ -16,8 +17,9 @@ import (
 )
 
 type BBNClient struct {
-	queryClient *query.QueryClient
-	cfg         *config.BBNConfig
+	queryClient         *query.QueryClient
+	cfg                 *config.BBNConfig
+	subscriptionChanMap map[string]chan ctypes.ResultEvent
 }
 
 func NewBBNClient(cfg *config.BBNConfig) BbnInterface {
@@ -30,7 +32,11 @@ func NewBBNClient(cfg *config.BBNConfig) BbnInterface {
 	if err != nil {
 		log.Fatal().Err(err).Msg("error while creating BBN query client")
 	}
-	return &BBNClient{queryClient, cfg}
+	return &BBNClient{
+		queryClient:         queryClient,
+		cfg:                 cfg,
+		subscriptionChanMap: make(map[string]chan ctypes.ResultEvent),
+	}
 }
 
 func (c *BBNClient) GetLatestBlockNumber(ctx context.Context) (int64, error) {
@@ -140,11 +146,93 @@ func (c *BBNClient) GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.R
 	return block, nil
 }
 
-func (c *BBNClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
-	return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...)
+func (c *BBNClient) Subscribe(
+	subscriber, query string,
+	healthCheckInterval time.Duration,
+	maxEventWaitInterval time.Duration,
+	outCapacity ...int,
+) (out <-chan ctypes.ResultEvent, err error) {
+	// Create a new channel for this subscriber if it doesn't exist
+	if _, exists := c.subscriptionChanMap[subscriber]; !exists {
+		c.subscriptionChanMap[subscriber] = make(chan ctypes.ResultEvent)
+	}
+
+	var rawEventChan <-chan ctypes.ResultEvent
+	subscribe := func() error {
+		rawEventChan, err = c.queryClient.RPCClient.Subscribe(
+			context.Background(),
+			subscriber,
+			query,
+			outCapacity...,
+		)
+		if err != nil {
+			return fmt.Errorf("failed to subscribe babylon events for query %s: %w", query, err)
+		}
+		return nil
+	}
+
+	if err := subscribe(); err != nil {
+		return nil, err
+	}
+
+	go func() {
+		timeoutTicker := time.NewTicker(healthCheckInterval)
+		defer timeoutTicker.Stop()
+		lastEventTime := time.Now()
+
+		for {
+			select {
+			case event, ok := <-rawEventChan:
+				if !ok {
+					log.Error().
+						Str("subscriber", subscriber).
+						Str("query", query).
+						Msg("Subscription channel closed")
+					return
+				}
+				lastEventTime = time.Now()
+				c.subscriptionChanMap[subscriber] <- event
+
+			case <-timeoutTicker.C:
+				if time.Since(lastEventTime) > maxEventWaitInterval {
+					log.Error().
+						Str("subscriber", subscriber).
+						Str("query", query).
+						Dur("healthCheckInterval", healthCheckInterval).
+						Dur("maxEventWaitInterval", maxEventWaitInterval).
+						Msg("No events received, attempting to resubscribe")
+
+					if err := c.queryClient.RPCClient.Unsubscribe(
+						context.Background(),
+						subscriber,
+						query,
+					); err != nil {
+						log.Error().Err(err).Msg("Failed to unsubscribe babylon events")
+					}
+
+					if err := subscribe(); err != nil {
+						log.Error().Err(err).Msg("Failed to resubscribe babylon events")
+					} else {
+						log.Info().
+							Str("subscriber", subscriber).
+							Str("query", query).
+							Msg("Successfully resubscribed babylon events")
+						// reset last event time
+						lastEventTime = time.Now()
+					}
+				}
+			}
+		}
+	}()
+
+	return c.subscriptionChanMap[subscriber], nil
 }
 
 func (c *BBNClient) UnsubscribeAll(subscriber string) error {
+	if ch, exists := c.subscriptionChanMap[subscriber]; exists {
+		close(ch)
+		delete(c.subscriptionChanMap, subscriber)
+	}
 	return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber)
 }
 
diff --git a/internal/clients/bbnclient/interface.go b/internal/clients/bbnclient/interface.go
index c035dc15..249e0777 100644
--- a/internal/clients/bbnclient/interface.go
+++ b/internal/clients/bbnclient/interface.go
@@ -2,6 +2,7 @@ package bbnclient
 
 import (
 	"context"
+	"time"
 
 	ctypes "github.com/cometbft/cometbft/rpc/core/types"
 )
@@ -13,7 +14,12 @@ type BbnInterface interface {
 	GetLatestBlockNumber(ctx context.Context) (int64, error)
 	GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlock, error)
 	GetBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, error)
-	Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
+	Subscribe(
+		subscriber, query string,
+		healthCheckInterval time.Duration,
+		maxEventWaitInterval time.Duration,
+		outCapacity ...int,
+	) (out <-chan ctypes.ResultEvent, err error)
 	UnsubscribeAll(subscriber string) error
 	IsRunning() bool
 	Start() error
diff --git a/internal/services/service.go b/internal/services/service.go
index 39263fbe..35192ecf 100644
--- a/internal/services/service.go
+++ b/internal/services/service.go
@@ -65,15 +65,15 @@ func (s *Service) StartIndexerSync(ctx context.Context) {
 	}
 
 	// Sync global parameters
-	s.SyncGlobalParams(ctx)
+	// s.SyncGlobalParams(ctx)
 	// Resubscribe to missed BTC notifications
-	s.ResubscribeToMissedBtcNotifications(ctx)
+	// s.ResubscribeToMissedBtcNotifications(ctx)
 	// Start the expiry checker
-	s.StartExpiryChecker(ctx)
+	// s.StartExpiryChecker(ctx)
 	// Start the websocket event subscription process
 	s.SubscribeToBbnEvents(ctx)
 	// Keep processing BBN blocks in the main thread
-	s.StartBbnBlockProcessor(ctx)
+	// s.StartBbnBlockProcessor(ctx)
 }
 
 func (s *Service) quitContext() (context.Context, func()) {
diff --git a/internal/services/subscription.go b/internal/services/subscription.go
index 902b324b..2330da45 100644
--- a/internal/services/subscription.go
+++ b/internal/services/subscription.go
@@ -2,6 +2,8 @@ package services
 
 import (
 	"context"
+	"fmt"
+	"time"
 
 	"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
 	ctypes "github.com/cometbft/cometbft/types"
@@ -9,46 +11,68 @@ import (
 )
 
 const (
-	subscriberName = "babylon-staking-indexer"
-	newBlockQuery  = "tm.event='NewBlock'"
+	subscriberName                  = "babylon-staking-indexer"
+	newBlockQuery                   = "tm.event='NewBlock'"
+	outCapacity                     = 100
+	subscriptionHealthCheckInterval = 5 * time.Second
+	maxEventWaitInterval            = 2 * time.Second
 )
 
 func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
 	if !s.bbn.IsRunning() {
 		log.Fatal().Msg("BBN client is not running")
 	}
-
-	eventChan, err := s.bbn.Subscribe(subscriberName, newBlockQuery)
+	// Subscribe to new block events but only wait for 5 minutes for events
+	// if nothing come through within 5 minutes, the underlying subscription will
+	// be resubscribed.
+	// This is a workaround for the fact that cometbft ws_client does not have
+	// proper ping pong configuration setup to detect if the connection is dead.
+	// Refer to https://github.com/cometbft/cometbft/commit/2fd8496bc109d010c6c2e415604131b500550e37#r151452099
+	eventChan, err := s.bbn.Subscribe(
+		subscriberName,
+		newBlockQuery,
+		subscriptionHealthCheckInterval,
+		maxEventWaitInterval,
+		outCapacity,
+	)
 	if err != nil {
 		log.Fatal().Msgf("Failed to subscribe to events: %v", err)
 	}
 
-	go func() {
-		for {
-			select {
-			case event := <-eventChan:
-				newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
-				if !ok {
-					log.Fatal().Msg("Event is not a NewBlock event")
-				}
+	// go func() {
+	for {
+		select {
+		case event := <-eventChan:
+			newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
+			if !ok {
+				log.Fatal().
+					Str("event", fmt.Sprintf("%+v", event)).
+					Msg("Event is not a NewBlock event")
+			}
 
-				latestHeight := newBlockEvent.Block.Height
-				if latestHeight == 0 {
-					log.Fatal().Msg("Event doesn't contain block height information")
-				}
+			latestHeight := newBlockEvent.Block.Height
+			if latestHeight == 0 {
+				log.Fatal().Msg("Event doesn't contain block height information")
+			}
+			log.Info().
+				Int64("height", latestHeight).
+				Msg("received new block event from babylon subscription")
 
-				// Send the latest height to the BBN block processor
-				s.latestHeightChan <- latestHeight
+			// Send the latest height to the BBN block processor
+			// s.latestHeightChan <- latestHeight
+			log.Info().
+				Int64("latest_height", latestHeight).
+				Msg("latest height")
 
-			case <-ctx.Done():
-				err := s.bbn.UnsubscribeAll(subscriberName)
-				if err != nil {
-					log.Error().Msgf("Failed to unsubscribe from events: %v", err)
-				}
-				return
+		case <-ctx.Done():
+			err := s.bbn.UnsubscribeAll(subscriberName)
+			if err != nil {
+				log.Error().Msgf("Failed to unsubscribe from events: %v", err)
 			}
+			return
 		}
-	}()
+	}
+	// }()
 }
 
 // Resubscribe to missed BTC notifications

From 3bf22466b55b51f7d4801da92195dc8718656ead Mon Sep 17 00:00:00 2001
From: wjrjerome <wjrjerome@babylonchain.io>
Date: Sat, 18 Jan 2025 01:34:42 +1100
Subject: [PATCH 2/3] fix: resubscribe to ws if not responding

---
 internal/clients/bbnclient/bbnclient.go | 65 +++++++++++--------------
 internal/services/service.go            |  8 +--
 internal/services/subscription.go       | 59 ++++++++++------------
 3 files changed, 60 insertions(+), 72 deletions(-)

diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go
index 1b66657c..c3191fd3 100644
--- a/internal/clients/bbnclient/bbnclient.go
+++ b/internal/clients/bbnclient/bbnclient.go
@@ -17,9 +17,8 @@ import (
 )
 
 type BBNClient struct {
-	queryClient         *query.QueryClient
-	cfg                 *config.BBNConfig
-	subscriptionChanMap map[string]chan ctypes.ResultEvent
+	queryClient *query.QueryClient
+	cfg         *config.BBNConfig
 }
 
 func NewBBNClient(cfg *config.BBNConfig) BbnInterface {
@@ -33,9 +32,8 @@ func NewBBNClient(cfg *config.BBNConfig) BbnInterface {
 		log.Fatal().Err(err).Msg("error while creating BBN query client")
 	}
 	return &BBNClient{
-		queryClient:         queryClient,
-		cfg:                 cfg,
-		subscriptionChanMap: make(map[string]chan ctypes.ResultEvent),
+		queryClient: queryClient,
+		cfg:         cfg,
 	}
 }
 
@@ -152,30 +150,32 @@ func (c *BBNClient) Subscribe(
 	maxEventWaitInterval time.Duration,
 	outCapacity ...int,
 ) (out <-chan ctypes.ResultEvent, err error) {
-	// Create a new channel for this subscriber if it doesn't exist
-	if _, exists := c.subscriptionChanMap[subscriber]; !exists {
-		c.subscriptionChanMap[subscriber] = make(chan ctypes.ResultEvent)
-	}
+	eventChan := make(chan ctypes.ResultEvent)
 
-	var rawEventChan <-chan ctypes.ResultEvent
-	subscribe := func() error {
-		rawEventChan, err = c.queryClient.RPCClient.Subscribe(
+	subscribe := func() (<-chan ctypes.ResultEvent, error) {
+		newChan, err := c.queryClient.RPCClient.Subscribe(
 			context.Background(),
 			subscriber,
 			query,
 			outCapacity...,
 		)
 		if err != nil {
-			return fmt.Errorf("failed to subscribe babylon events for query %s: %w", query, err)
+			return nil, fmt.Errorf(
+				"failed to subscribe babylon events for query %s: %w", query, err,
+			)
 		}
-		return nil
+		return newChan, nil
 	}
 
-	if err := subscribe(); err != nil {
+	// Initial subscription
+	rawEventChan, err := subscribe()
+	if err != nil {
+		close(eventChan)
 		return nil, err
 	}
 
 	go func() {
+		defer close(eventChan)
 		timeoutTicker := time.NewTicker(healthCheckInterval)
 		defer timeoutTicker.Stop()
 		lastEventTime := time.Now()
@@ -184,22 +184,18 @@ func (c *BBNClient) Subscribe(
 			select {
 			case event, ok := <-rawEventChan:
 				if !ok {
-					log.Error().
+					log.Fatal().
 						Str("subscriber", subscriber).
 						Str("query", query).
-						Msg("Subscription channel closed")
-					return
+						Msg("Subscription channel closed, this shall not happen")
 				}
 				lastEventTime = time.Now()
-				c.subscriptionChanMap[subscriber] <- event
-
+				eventChan <- event
 			case <-timeoutTicker.C:
 				if time.Since(lastEventTime) > maxEventWaitInterval {
 					log.Error().
 						Str("subscriber", subscriber).
 						Str("query", query).
-						Dur("healthCheckInterval", healthCheckInterval).
-						Dur("maxEventWaitInterval", maxEventWaitInterval).
 						Msg("No events received, attempting to resubscribe")
 
 					if err := c.queryClient.RPCClient.Unsubscribe(
@@ -210,29 +206,26 @@ func (c *BBNClient) Subscribe(
 						log.Error().Err(err).Msg("Failed to unsubscribe babylon events")
 					}
 
-					if err := subscribe(); err != nil {
+					// Create new subscription
+					newEventChan, err := subscribe()
+					if err != nil {
 						log.Error().Err(err).Msg("Failed to resubscribe babylon events")
-					} else {
-						log.Info().
-							Str("subscriber", subscriber).
-							Str("query", query).
-							Msg("Successfully resubscribed babylon events")
-						// reset last event time
-						lastEventTime = time.Now()
+						continue
 					}
+
+					// Replace the old channel with the new one
+					rawEventChan = newEventChan
+					// reset last event time
+					lastEventTime = time.Now()
 				}
 			}
 		}
 	}()
 
-	return c.subscriptionChanMap[subscriber], nil
+	return eventChan, nil
 }
 
 func (c *BBNClient) UnsubscribeAll(subscriber string) error {
-	if ch, exists := c.subscriptionChanMap[subscriber]; exists {
-		close(ch)
-		delete(c.subscriptionChanMap, subscriber)
-	}
 	return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber)
 }
 
diff --git a/internal/services/service.go b/internal/services/service.go
index 35192ecf..39263fbe 100644
--- a/internal/services/service.go
+++ b/internal/services/service.go
@@ -65,15 +65,15 @@ func (s *Service) StartIndexerSync(ctx context.Context) {
 	}
 
 	// Sync global parameters
-	// s.SyncGlobalParams(ctx)
+	s.SyncGlobalParams(ctx)
 	// Resubscribe to missed BTC notifications
-	// s.ResubscribeToMissedBtcNotifications(ctx)
+	s.ResubscribeToMissedBtcNotifications(ctx)
 	// Start the expiry checker
-	// s.StartExpiryChecker(ctx)
+	s.StartExpiryChecker(ctx)
 	// Start the websocket event subscription process
 	s.SubscribeToBbnEvents(ctx)
 	// Keep processing BBN blocks in the main thread
-	// s.StartBbnBlockProcessor(ctx)
+	s.StartBbnBlockProcessor(ctx)
 }
 
 func (s *Service) quitContext() (context.Context, func()) {
diff --git a/internal/services/subscription.go b/internal/services/subscription.go
index 2330da45..3263a8a4 100644
--- a/internal/services/subscription.go
+++ b/internal/services/subscription.go
@@ -2,7 +2,6 @@ package services
 
 import (
 	"context"
-	"fmt"
 	"time"
 
 	"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
@@ -14,8 +13,8 @@ const (
 	subscriberName                  = "babylon-staking-indexer"
 	newBlockQuery                   = "tm.event='NewBlock'"
 	outCapacity                     = 100
-	subscriptionHealthCheckInterval = 5 * time.Second
-	maxEventWaitInterval            = 2 * time.Second
+	subscriptionHealthCheckInterval = 1 * time.Minute
+	maxEventWaitInterval            = 1 * time.Minute
 )
 
 func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
@@ -39,40 +38,36 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
 		log.Fatal().Msgf("Failed to subscribe to events: %v", err)
 	}
 
-	// go func() {
-	for {
-		select {
-		case event := <-eventChan:
-			newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
-			if !ok {
-				log.Fatal().
-					Str("event", fmt.Sprintf("%+v", event)).
-					Msg("Event is not a NewBlock event")
-			}
+	go func() {
+		for {
+			select {
+			case event := <-eventChan:
+				newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
+				if !ok {
+					log.Fatal().Msg("Event is not a NewBlock event")
+				}
 
-			latestHeight := newBlockEvent.Block.Height
-			if latestHeight == 0 {
-				log.Fatal().Msg("Event doesn't contain block height information")
-			}
-			log.Info().
-				Int64("height", latestHeight).
-				Msg("received new block event from babylon subscription")
+				latestHeight := newBlockEvent.Block.Height
+				if latestHeight == 0 {
+					log.Fatal().Msg("Event doesn't contain block height information")
+				}
+				log.Debug().
+					Int64("height", latestHeight).
+					Msg("received new block event from babylon subscription")
 
-			// Send the latest height to the BBN block processor
-			// s.latestHeightChan <- latestHeight
-			log.Info().
-				Int64("latest_height", latestHeight).
-				Msg("latest height")
+				// Send the latest height to the BBN block processor
+				s.latestHeightChan <- latestHeight
 
-		case <-ctx.Done():
-			err := s.bbn.UnsubscribeAll(subscriberName)
-			if err != nil {
-				log.Error().Msgf("Failed to unsubscribe from events: %v", err)
+			case <-ctx.Done():
+				log.Info().Msg("context done, unsubscribing all babylon events")
+				err := s.bbn.UnsubscribeAll(subscriberName)
+				if err != nil {
+					log.Error().Msgf("Failed to unsubscribe from events: %v", err)
+				}
+				return
 			}
-			return
 		}
-	}
-	// }()
+	}()
 }
 
 // Resubscribe to missed BTC notifications

From 568786e16f646ec9c967ed34eb7bdc01b78249f1 Mon Sep 17 00:00:00 2001
From: wjrjerome <wjrjerome@babylonchain.io>
Date: Tue, 21 Jan 2025 22:21:54 +1100
Subject: [PATCH 3/3] add wg

---
 internal/clients/bbnclient/bbnclient.go | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go
index c3191fd3..1d038701 100644
--- a/internal/clients/bbnclient/bbnclient.go
+++ b/internal/clients/bbnclient/bbnclient.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/avast/retry-go/v4"
@@ -17,6 +18,7 @@ import (
 )
 
 type BBNClient struct {
+	wg          sync.WaitGroup
 	queryClient *query.QueryClient
 	cfg         *config.BBNConfig
 }
@@ -173,8 +175,9 @@ func (c *BBNClient) Subscribe(
 		close(eventChan)
 		return nil, err
 	}
-
+	c.wg.Add(1)
 	go func() {
+		defer c.wg.Done()
 		defer close(eventChan)
 		timeoutTicker := time.NewTicker(healthCheckInterval)
 		defer timeoutTicker.Stop()