diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 49b0290..59189d7 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -4,6 +4,7 @@ on: push: branches: - 'main' + - '188-add-staked-withdrawable-tvl' tags: - '*' diff --git a/consumer/event_consumer.go b/consumer/event_consumer.go index 9f0c3f7..4e88b0d 100644 --- a/consumer/event_consumer.go +++ b/consumer/event_consumer.go @@ -8,5 +8,7 @@ type EventConsumer interface { Start() error PushActiveStakingEvent(ev *client.StakingEvent) error PushUnbondingStakingEvent(ev *client.StakingEvent) error + PushWithdrawableStakingEvent(ev *client.StakingEvent) error + PushWithdrawnStakingEvent(ev *client.StakingEvent) error Stop() error } diff --git a/go.mod b/go.mod index 0270c06..9bb28bf 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( cosmossdk.io/math v1.4.0 github.com/avast/retry-go/v4 v4.5.1 github.com/babylonlabs-io/babylon v1.0.0-rc.2 - github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241212112557-9ac7de686075 + github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20250114073544-2875aea182ab github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.6 diff --git a/go.sum b/go.sum index 93aaac3..86e658f 100644 --- a/go.sum +++ b/go.sum @@ -1429,8 +1429,8 @@ github.com/aws/aws-sdk-go v1.44.312/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8 github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/babylonlabs-io/babylon v1.0.0-rc.2 h1:H7OpEDNNOXyC+9TUo4vVYLlHNhOQ8m9KqWP1qzjEt0c= github.com/babylonlabs-io/babylon v1.0.0-rc.2/go.mod h1:B8ma8IjGUEKhmoRfwv60Qa7DtUXssCgtmD89huQ4+5I= -github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241212112557-9ac7de686075 h1:gB+jslBkK5/ror4sn9NHldKjLu4nE88jgD43d2L3osc= -github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241212112557-9ac7de686075/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20250114073544-2875aea182ab h1:APSYaAU89zceHIzE+QK3vfg+qQSS8fwMPGn0rMLvQN4= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20250114073544-2875aea182ab/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/db/delegation.go b/internal/db/delegation.go index d80164b..d30ca96 100644 --- a/internal/db/delegation.go +++ b/internal/db/delegation.go @@ -4,7 +4,8 @@ import ( "context" "errors" "fmt" - "log" + + "github.com/rs/zerolog/log" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" @@ -182,10 +183,23 @@ func (db *Database) GetBTCDelegationByStakingTxHash( func (db *Database) UpdateDelegationsStateByFinalityProvider( ctx context.Context, fpBTCPKHex string, + qualifiedPreviousStates []types.DelegationState, newState types.DelegationState, ) error { + if len(qualifiedPreviousStates) == 0 { + return fmt.Errorf("qualified previous states array cannot be empty") + } + + // Convert states to strings + qualifiedStateStrs := make([]string, len(qualifiedPreviousStates)) + for i, state := range qualifiedPreviousStates { + qualifiedStateStrs[i] = state.String() + } + + // Build filter with both FP and qualified states filter := bson.M{ "finality_provider_btc_pks_hex": fpBTCPKHex, + "state": bson.M{"$in": qualifiedStateStrs}, } update := bson.M{ @@ -201,11 +215,13 @@ func (db *Database) UpdateDelegationsStateByFinalityProvider( return fmt.Errorf("failed to update delegations: %w", err) } - log.Printf("Updated %d delegations for finality provider %s to state %s", - result.ModifiedCount, - fpBTCPKHex, - newState.String(), - ) + log.Debug(). + Str("finality_provider", fpBTCPKHex). + Strs("qualified_states", qualifiedStateStrs). + Str("new_state", newState.String()). + Int64("modified_count", result.ModifiedCount). + Msg("Updated delegations for finality provider") + return nil } diff --git a/internal/db/interface.go b/internal/db/interface.go index 3d24efe..a0ec944 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -147,12 +147,15 @@ type DbInterface interface { * UpdateDelegationsStateByFinalityProvider updates the BTC delegation state by the finality provider public key. * @param ctx The context * @param fpBtcPkHex The finality provider public key + * @param qualifiedPreviousStates The qualified previous states * @param newState The new state - * @param qualifiedStates The qualified states * @return An error if the operation failed */ UpdateDelegationsStateByFinalityProvider( - ctx context.Context, fpBtcPkHex string, newState types.DelegationState, + ctx context.Context, + fpBtcPkHex string, + qualifiedPreviousStates []types.DelegationState, + newState types.DelegationState, ) error /** * GetDelegationsByFinalityProvider retrieves the BTC delegations by the finality provider public key. diff --git a/internal/db/timelock.go b/internal/db/timelock.go index 5d7c2ac..1420b8b 100644 --- a/internal/db/timelock.go +++ b/internal/db/timelock.go @@ -44,7 +44,7 @@ func (db *Database) FindExpiredDelegations(ctx context.Context, btcTipHeight, li func (db *Database) DeleteExpiredDelegation(ctx context.Context, stakingTxHashHex string) error { client := db.client.Database(db.dbName).Collection(model.TimeLockCollection) - filter := bson.M{"_id": stakingTxHashHex} + filter := bson.M{"staking_tx_hash_hex": stakingTxHashHex} result, err := client.DeleteOne(ctx, filter) if err != nil { diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index 84790b1..286471a 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -41,3 +41,29 @@ func (s *Service) emitUnbondingDelegationEvent(ctx context.Context, delegation * } return nil } + +func (s *Service) emitWithdrawableDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queuecli.NewWithdrawableStakingEvent( + delegation.StakingTxHashHex, + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + delegation.StakingAmount, + ) + if err := s.queueManager.PushWithdrawableStakingEvent(&ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to push the withdrawable event to the queue: %w", err)) + } + return nil +} + +func (s *Service) emitWithdrawnDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queuecli.NewWithdrawnStakingEvent( + delegation.StakingTxHashHex, + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + delegation.StakingAmount, + ) + if err := s.queueManager.PushWithdrawnStakingEvent(&ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to push the withdrawn event to the queue: %w", err)) + } + return nil +} diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 05f52f5..6e90135 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -69,8 +69,6 @@ func (s *Service) processNewBTCDelegationEvent( ) } - // TODO: start watching for BTC confirmation if we need PendingBTCConfirmation state - return nil } @@ -289,7 +287,7 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( return err } if !shouldProcess { - // Event is valid but should be skipped + // Ignore the event silently return nil } @@ -377,7 +375,7 @@ func (s *Service) processBTCDelegationExpiredEvent( return err } if !shouldProcess { - // Event is valid but should be skipped + // Ignore the event silently return nil } @@ -440,20 +438,16 @@ func (s *Service) processSlashedFinalityProviderEvent( return err } - shouldProcess, err := s.validateSlashedFinalityProviderEvent(ctx, slashedFinalityProviderEvent) + err = s.validateSlashedFinalityProviderEvent(ctx, slashedFinalityProviderEvent) if err != nil { return err } - if !shouldProcess { - // Event is valid but should be skipped - return nil - } evidence := slashedFinalityProviderEvent.Evidence fpBTCPKHex := evidence.FpBtcPk.MarshalHex() if dbErr := s.db.UpdateDelegationsStateByFinalityProvider( - ctx, fpBTCPKHex, types.StateSlashed, + ctx, fpBTCPKHex, types.QualifiedStatesForSlashedDelegation(), types.StateSlashed, ); dbErr != nil { return types.NewError( http.StatusInternalServerError, @@ -471,10 +465,17 @@ func (s *Service) processSlashedFinalityProviderEvent( ) } + // TODO: ideally indexer should simply emit the slashed FP + // queue handlers should handle the rest + for _, delegation := range delegations { if !delegation.HasInclusionProof() { + // If the delegation was never active/has no inclusion proof + // no need to emit the event, as it doesn't contribute to stats log.Debug(). Str("staking_tx", delegation.StakingTxHashHex). + Str("event_type", EventSlashedFinalityProvider.String()). + Str("current_state", delegation.State.String()). Str("reason", "missing_inclusion_proof"). Msg("skipping slashed delegation event") continue diff --git a/internal/services/events.go b/internal/services/events.go index 031834f..5275a06 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -330,7 +330,8 @@ func (s *Service) validateBTCDelegationUnbondedEarlyEvent(ctx context.Context, e log.Debug(). Str("stakingTxHashHex", event.StakingTxHash). Str("currentState", delegation.State.String()). - Msg("Ignoring EventBTCDelgationUnbondedEarly because current state is not qualified for transition") + Str("event_type", "EventBTCDelgationUnbondedEarly"). + Msg("current state is not qualified for transition") return false, nil } @@ -349,8 +350,10 @@ func (s *Service) validateBTCDelegationExpiredEvent(ctx context.Context, event * // Validate the event state if event.NewState != bstypes.BTCDelegationStatus_EXPIRED.String() { - return false, types.NewValidationFailedError( - fmt.Errorf("invalid delegation state from Babylon when processing EventBTCDelegationExpired: expected EXPIRED, got %s", event.NewState), + return false, types.NewErrorWithMsg( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Sprintf("invalid delegation state from Babylon when processing EventBTCDelegationExpired: expected EXPIRED, got %s", event.NewState), ) } @@ -369,16 +372,17 @@ func (s *Service) validateBTCDelegationExpiredEvent(ctx context.Context, event * log.Debug(). Str("stakingTxHashHex", event.StakingTxHash). Str("currentState", delegation.State.String()). - Msg("Ignoring EventBTCDelegationExpired because current state is not qualified for transition") + Str("event_type", "EventBTCDelegationExpired"). + Msg("current state is not qualified for transition") return false, nil } return true, nil } -func (s *Service) validateSlashedFinalityProviderEvent(ctx context.Context, event *ftypes.EventSlashedFinalityProvider) (bool, *types.Error) { +func (s *Service) validateSlashedFinalityProviderEvent(ctx context.Context, event *ftypes.EventSlashedFinalityProvider) *types.Error { if event.Evidence == nil { - return false, types.NewErrorWithMsg( + return types.NewErrorWithMsg( http.StatusInternalServerError, types.InternalServiceError, "slashed finality provider event missing evidence", @@ -387,14 +391,14 @@ func (s *Service) validateSlashedFinalityProviderEvent(ctx context.Context, even _, err := event.Evidence.ExtractBTCSK() if err != nil { - return false, types.NewError( + return types.NewError( http.StatusInternalServerError, types.InternalServiceError, fmt.Errorf("failed to extract BTC SK of the slashed finality provider: %w", err), ) } - return true, nil + return nil } func sanitizeEvent(event abcitypes.Event) abcitypes.Event { diff --git a/internal/services/expiry_checker.go b/internal/services/expiry_checker.go index 954fac8..6f29685 100644 --- a/internal/services/expiry_checker.go +++ b/internal/services/expiry_checker.go @@ -50,7 +50,7 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { Str("current_state", delegation.State.String()). Str("new_sub_state", tlDoc.DelegationSubState.String()). Str("expire_height", strconv.FormatUint(uint64(tlDoc.ExpireHeight), 10)). - Msg("checking if delegation is expired") + Msg("checking if delegation is withdrawable") // Check if the delegation is in a qualified state to transition to Withdrawable if !utils.Contains(types.QualifiedStatesForWithdrawable(), delegation.State) { @@ -58,6 +58,7 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { Str("staking_tx", delegation.StakingTxHashHex). Str("current_state", delegation.State.String()). Msg("current state is not qualified for withdrawable") + continue } @@ -76,6 +77,13 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { ) } + if err := s.emitWithdrawableDelegationEvent(ctx, delegation); err != nil { + log.Error(). + Str("staking_tx", delegation.StakingTxHashHex). + Msg("failed to emit withdrawable delegation event") + return err + } + if err := s.db.DeleteExpiredDelegation(ctx, delegation.StakingTxHashHex); err != nil { log.Error(). Str("staking_tx", delegation.StakingTxHashHex). diff --git a/internal/services/watch_btc_events.go b/internal/services/watch_btc_events.go index 7760b5b..4abc203 100644 --- a/internal/services/watch_btc_events.go +++ b/internal/services/watch_btc_events.go @@ -146,6 +146,14 @@ func (s *Service) watchForSpendSlashingChange( return } + if err := s.emitWithdrawnDelegationEvent(quitCtx, delegation); err != nil { + log.Error(). + Err(err). + Str("staking_tx", delegation.StakingTxHashHex). + Msg("failed to emit withdrawn delegation event") + return + } + case <-s.quit: return case <-quitCtx.Done(): @@ -321,13 +329,25 @@ func (s *Service) handleWithdrawal( Str("state", types.StateWithdrawn.String()). Str("sub_state", subState.String()). Msg("updating delegation state to withdrawn") - return s.db.UpdateBTCDelegationState( + if err := s.db.UpdateBTCDelegationState( ctx, delegation.StakingTxHashHex, types.QualifiedStatesForWithdrawn(), types.StateWithdrawn, &subState, - ) + ); err != nil { + return fmt.Errorf("failed to update delegation state to withdrawn: %w", err) + } + + if err := s.emitWithdrawnDelegationEvent(ctx, delegation); err != nil { + log.Error(). + Err(err). + Str("staking_tx", delegation.StakingTxHashHex). + Msg("failed to emit withdrawn delegation event") + return err + } + + return nil } func (s *Service) startWatchingSlashingChange( diff --git a/internal/types/state.go b/internal/types/state.go index 91b1631..a487094 100644 --- a/internal/types/state.go +++ b/internal/types/state.go @@ -51,6 +51,11 @@ func QualifiedStatesForExpired() []DelegationState { return []DelegationState{StateActive} } +// QualifiedStatesForSlashedDelegation returns the qualified current states when a delegation is slashed +func QualifiedStatesForSlashedDelegation() []DelegationState { + return []DelegationState{StatePending, StateVerified, StateActive, StateUnbonding, StateWithdrawable} +} + // QualifiedStatesForWithdrawn returns the qualified current states for Withdrawn event func QualifiedStatesForWithdrawn() []DelegationState { // StateActive/StateUnbonding/StateSlashed is included b/c its possible that expiry checker diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index 40a3532..3c5cb96 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -473,17 +473,17 @@ func (_m *DbInterface) UpdateBTCDelegationState(ctx context.Context, stakingTxHa return r0 } -// UpdateDelegationsStateByFinalityProvider provides a mock function with given fields: ctx, fpBtcPkHex, newState -func (_m *DbInterface) UpdateDelegationsStateByFinalityProvider(ctx context.Context, fpBtcPkHex string, newState types.DelegationState) error { - ret := _m.Called(ctx, fpBtcPkHex, newState) +// UpdateDelegationsStateByFinalityProvider provides a mock function with given fields: ctx, fpBtcPkHex, qualifiedPreviousStates, newState +func (_m *DbInterface) UpdateDelegationsStateByFinalityProvider(ctx context.Context, fpBtcPkHex string, qualifiedPreviousStates []types.DelegationState, newState types.DelegationState) error { + ret := _m.Called(ctx, fpBtcPkHex, qualifiedPreviousStates, newState) if len(ret) == 0 { panic("no return value specified for UpdateDelegationsStateByFinalityProvider") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, types.DelegationState) error); ok { - r0 = rf(ctx, fpBtcPkHex, newState) + if rf, ok := ret.Get(0).(func(context.Context, string, []types.DelegationState, types.DelegationState) error); ok { + r0 = rf(ctx, fpBtcPkHex, qualifiedPreviousStates, newState) } else { r0 = ret.Error(0) }