diff --git a/internal/db/delegation.go b/internal/db/delegation.go index 330d1d4..bd1fa67 100644 --- a/internal/db/delegation.go +++ b/internal/db/delegation.go @@ -259,3 +259,38 @@ func (db *Database) SaveBTCDelegationUnbondingSlashingTxHex( return nil } + +func (db *Database) GetBTCDelegationsByStates( + ctx context.Context, + states []types.DelegationState, +) ([]*model.BTCDelegationDetails, error) { + // Convert states to a slice of strings + stateStrings := make([]string, len(states)) + for i, state := range states { + stateStrings[i] = state.String() + } + + filter := bson.M{"state": bson.M{"$in": stateStrings}} + + cursor, err := db.client.Database(db.dbName). + Collection(model.BTCDelegationDetailsCollection). + Find(ctx, filter) + if err != nil { + return nil, err + } + defer cursor.Close(ctx) + + var delegations []*model.BTCDelegationDetails + if err := cursor.All(ctx, &delegations); err != nil { + return nil, err + } + + if len(delegations) == 0 { + return nil, &NotFoundError{ + Key: "specified states", + Message: "No BTC delegations found for the specified states", + } + } + + return delegations, nil +} diff --git a/internal/db/interface.go b/internal/db/interface.go index b8ec414..c2c9d8e 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -212,4 +212,11 @@ type DbInterface interface { * @return An error if the operation failed */ SaveBTCDelegationUnbondingSlashingTxHex(ctx context.Context, stakingTxHashHex string, unbondingSlashingTxHex string) error + /** + * GetBTCDelegationsByStates retrieves the BTC delegations by the states. + * @param ctx The context + * @param states The states + * @return The BTC delegations or an error + */ + GetBTCDelegationsByStates(ctx context.Context, states []types.DelegationState) ([]*model.BTCDelegationDetails, error) } diff --git a/internal/services/service.go b/internal/services/service.go index 0bddfea..39263fb 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -66,6 +66,8 @@ func (s *Service) StartIndexerSync(ctx context.Context) { // Sync global parameters s.SyncGlobalParams(ctx) + // Resubscribe to missed BTC notifications + s.ResubscribeToMissedBtcNotifications(ctx) // Start the expiry checker s.StartExpiryChecker(ctx) // Start the websocket event subscription process diff --git a/internal/services/subscription.go b/internal/services/subscription.go index f9b6685..bee2f49 100644 --- a/internal/services/subscription.go +++ b/internal/services/subscription.go @@ -3,7 +3,8 @@ package services import ( "context" - "github.com/cometbft/cometbft/types" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" + ctypes "github.com/cometbft/cometbft/types" "github.com/rs/zerolog/log" ) @@ -26,7 +27,7 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) { for { select { case event := <-eventChan: - newBlockEvent, ok := event.Data.(types.EventDataNewBlock) + newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock) if !ok { log.Fatal().Msg("Event is not a NewBlock event") } @@ -49,3 +50,21 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) { } }() } + +// Resubscribe to missed BTC notifications +func (s *Service) ResubscribeToMissedBtcNotifications(ctx context.Context) { + go func() { + defer s.wg.Done() + delegations, err := s.db.GetBTCDelegationsByStates(ctx, []types.DelegationState{types.StateUnbonding, types.StateSlashed}) + if err != nil { + log.Fatal().Msgf("Failed to get BTC delegations: %v", err) + } + + for _, delegation := range delegations { + // Register spend notification + if err := s.registerStakingSpendNotification(ctx, delegation); err != nil { + log.Fatal().Msgf("Failed to register spend notification: %v", err) + } + } + }() +} diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index fbcf319..0f656dd 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -127,6 +127,36 @@ func (_m *DbInterface) GetBTCDelegationState(ctx context.Context, stakingTxHash return r0, r1 } +// GetBTCDelegationsByState provides a mock function with given fields: ctx, state +func (_m *DbInterface) GetBTCDelegationsByState(ctx context.Context, state types.DelegationState) ([]*model.BTCDelegationDetails, error) { + ret := _m.Called(ctx, state) + + if len(ret) == 0 { + panic("no return value specified for GetBTCDelegationsByState") + } + + var r0 []*model.BTCDelegationDetails + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, types.DelegationState) ([]*model.BTCDelegationDetails, error)); ok { + return rf(ctx, state) + } + if rf, ok := ret.Get(0).(func(context.Context, types.DelegationState) []*model.BTCDelegationDetails); ok { + r0 = rf(ctx, state) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.BTCDelegationDetails) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, types.DelegationState) error); ok { + r1 = rf(ctx, state) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetFinalityProviderByBtcPk provides a mock function with given fields: ctx, btcPk func (_m *DbInterface) GetFinalityProviderByBtcPk(ctx context.Context, btcPk string) (*model.FinalityProviderDetails, error) { ret := _m.Called(ctx, btcPk)