Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: emit withdrawable and withdrawn staking events #104

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- 'main'
- '188-add-staked-withdrawable-tvl'
tags:
- '*'

Expand Down
2 changes: 2 additions & 0 deletions consumer/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ type EventConsumer interface {
Start() error
PushActiveStakingEvent(ev *client.StakingEvent) error
PushUnbondingStakingEvent(ev *client.StakingEvent) error
PushWithdrawableStakingEvent(ev *client.StakingEvent) error
PushWithdrawnStakingEvent(ev *client.StakingEvent) error
Stop() error
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
cosmossdk.io/math v1.4.0
github.com/avast/retry-go/v4 v4.5.1
github.com/babylonlabs-io/babylon v1.0.0-rc.2
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241212112557-9ac7de686075
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20250114073544-2875aea182ab
github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4
github.com/btcsuite/btcd/btcec/v2 v2.3.4
github.com/btcsuite/btcd/btcutil v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1429,8 +1429,8 @@ github.com/aws/aws-sdk-go v1.44.312/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/babylonlabs-io/babylon v1.0.0-rc.2 h1:H7OpEDNNOXyC+9TUo4vVYLlHNhOQ8m9KqWP1qzjEt0c=
github.com/babylonlabs-io/babylon v1.0.0-rc.2/go.mod h1:B8ma8IjGUEKhmoRfwv60Qa7DtUXssCgtmD89huQ4+5I=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241212112557-9ac7de686075 h1:gB+jslBkK5/ror4sn9NHldKjLu4nE88jgD43d2L3osc=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241212112557-9ac7de686075/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20250114073544-2875aea182ab h1:APSYaAU89zceHIzE+QK3vfg+qQSS8fwMPGn0rMLvQN4=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20250114073544-2875aea182ab/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
28 changes: 22 additions & 6 deletions internal/db/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"context"
"errors"
"fmt"
"log"

"github.com/rs/zerolog/log"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
Expand Down Expand Up @@ -182,10 +183,23 @@ func (db *Database) GetBTCDelegationByStakingTxHash(
func (db *Database) UpdateDelegationsStateByFinalityProvider(
ctx context.Context,
fpBTCPKHex string,
qualifiedPreviousStates []types.DelegationState,
newState types.DelegationState,
) error {
if len(qualifiedPreviousStates) == 0 {
return fmt.Errorf("qualified previous states array cannot be empty")
}

// Convert states to strings
qualifiedStateStrs := make([]string, len(qualifiedPreviousStates))
for i, state := range qualifiedPreviousStates {
qualifiedStateStrs[i] = state.String()
}

// Build filter with both FP and qualified states
filter := bson.M{
"finality_provider_btc_pks_hex": fpBTCPKHex,
"state": bson.M{"$in": qualifiedStateStrs},
}

update := bson.M{
Expand All @@ -201,11 +215,13 @@ func (db *Database) UpdateDelegationsStateByFinalityProvider(
return fmt.Errorf("failed to update delegations: %w", err)
}

log.Printf("Updated %d delegations for finality provider %s to state %s",
result.ModifiedCount,
fpBTCPKHex,
newState.String(),
)
log.Debug().
Str("finality_provider", fpBTCPKHex).
Strs("qualified_states", qualifiedStateStrs).
Str("new_state", newState.String()).
Int64("modified_count", result.ModifiedCount).
Msg("Updated delegations for finality provider")

return nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,15 @@ type DbInterface interface {
* UpdateDelegationsStateByFinalityProvider updates the BTC delegation state by the finality provider public key.
* @param ctx The context
* @param fpBtcPkHex The finality provider public key
* @param qualifiedPreviousStates The qualified previous states
* @param newState The new state
* @param qualifiedStates The qualified states
* @return An error if the operation failed
*/
UpdateDelegationsStateByFinalityProvider(
ctx context.Context, fpBtcPkHex string, newState types.DelegationState,
ctx context.Context,
fpBtcPkHex string,
qualifiedPreviousStates []types.DelegationState,
newState types.DelegationState,
) error
/**
* GetDelegationsByFinalityProvider retrieves the BTC delegations by the finality provider public key.
Expand Down
2 changes: 1 addition & 1 deletion internal/db/timelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (db *Database) FindExpiredDelegations(ctx context.Context, btcTipHeight, li

func (db *Database) DeleteExpiredDelegation(ctx context.Context, stakingTxHashHex string) error {
client := db.client.Database(db.dbName).Collection(model.TimeLockCollection)
filter := bson.M{"_id": stakingTxHashHex}
filter := bson.M{"staking_tx_hash_hex": stakingTxHashHex}

result, err := client.DeleteOne(ctx, filter)
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions internal/services/consumer_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
stakingTxHashHex,
stakerBtcPkHex,
finalityProviderBtcPksHex,
stakingAmount,

Check failure on line 23 in internal/services/consumer_events.go

View workflow job for this annotation

GitHub Actions / lint_test / integration-tests

not enough arguments in call to queuecli.NewActiveStakingEvent

Check failure on line 23 in internal/services/consumer_events.go

View workflow job for this annotation

GitHub Actions / lint_test / unit-tests

not enough arguments in call to queuecli.NewActiveStakingEvent
)

if err := s.queueManager.PushActiveStakingEvent(&stakingEvent); err != nil {
Expand All @@ -34,10 +34,36 @@
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,

Check failure on line 37 in internal/services/consumer_events.go

View workflow job for this annotation

GitHub Actions / lint_test / integration-tests

not enough arguments in call to queuecli.NewUnbondingStakingEvent

Check failure on line 37 in internal/services/consumer_events.go

View workflow job for this annotation

GitHub Actions / lint_test / unit-tests

not enough arguments in call to queuecli.NewUnbondingStakingEvent
)
if err := s.queueManager.PushUnbondingStakingEvent(&ev); err != nil {
return types.NewInternalServiceError(fmt.Errorf("failed to push the unbonding event to the queue: %w", err))
}
return nil
}

func (s *Service) emitWithdrawableDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error {
ev := queuecli.NewWithdrawableStakingEvent(
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,

Check failure on line 50 in internal/services/consumer_events.go

View workflow job for this annotation

GitHub Actions / lint_test / integration-tests

not enough arguments in call to queuecli.NewWithdrawableStakingEvent

Check failure on line 50 in internal/services/consumer_events.go

View workflow job for this annotation

GitHub Actions / lint_test / unit-tests

not enough arguments in call to queuecli.NewWithdrawableStakingEvent
)
if err := s.queueManager.PushWithdrawableStakingEvent(&ev); err != nil {
return types.NewInternalServiceError(fmt.Errorf("failed to push the withdrawable event to the queue: %w", err))
}
return nil
}

func (s *Service) emitWithdrawnDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error {
ev := queuecli.NewWithdrawnStakingEvent(
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,

Check failure on line 63 in internal/services/consumer_events.go

View workflow job for this annotation

GitHub Actions / lint_test / integration-tests

not enough arguments in call to queuecli.NewWithdrawnStakingEvent

Check failure on line 63 in internal/services/consumer_events.go

View workflow job for this annotation

GitHub Actions / lint_test / unit-tests

not enough arguments in call to queuecli.NewWithdrawnStakingEvent
)
if err := s.queueManager.PushWithdrawnStakingEvent(&ev); err != nil {
return types.NewInternalServiceError(fmt.Errorf("failed to push the withdrawn event to the queue: %w", err))
}
return nil
}
21 changes: 11 additions & 10 deletions internal/services/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ func (s *Service) processNewBTCDelegationEvent(
)
}

// TODO: start watching for BTC confirmation if we need PendingBTCConfirmation state

return nil
}

Expand Down Expand Up @@ -289,7 +287,7 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent(
return err
}
if !shouldProcess {
// Event is valid but should be skipped
// Ignore the event silently
return nil
}

Expand Down Expand Up @@ -377,7 +375,7 @@ func (s *Service) processBTCDelegationExpiredEvent(
return err
}
if !shouldProcess {
// Event is valid but should be skipped
// Ignore the event silently
return nil
}

Expand Down Expand Up @@ -440,20 +438,16 @@ func (s *Service) processSlashedFinalityProviderEvent(
return err
}

shouldProcess, err := s.validateSlashedFinalityProviderEvent(ctx, slashedFinalityProviderEvent)
err = s.validateSlashedFinalityProviderEvent(ctx, slashedFinalityProviderEvent)
if err != nil {
return err
}
if !shouldProcess {
// Event is valid but should be skipped
return nil
}

evidence := slashedFinalityProviderEvent.Evidence
fpBTCPKHex := evidence.FpBtcPk.MarshalHex()

if dbErr := s.db.UpdateDelegationsStateByFinalityProvider(
ctx, fpBTCPKHex, types.StateSlashed,
ctx, fpBTCPKHex, types.QualifiedStatesForSlashedDelegation(), types.StateSlashed,
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
Expand All @@ -471,10 +465,17 @@ func (s *Service) processSlashedFinalityProviderEvent(
)
}

// TODO: ideally indexer should simply emit the slashed FP
// queue handlers should handle the rest

for _, delegation := range delegations {
if !delegation.HasInclusionProof() {
// If the delegation was never active/has no inclusion proof
// no need to emit the event, as it doesn't contribute to stats
log.Debug().
Str("staking_tx", delegation.StakingTxHashHex).
Str("event_type", EventSlashedFinalityProvider.String()).
Str("current_state", delegation.State.String()).
Str("reason", "missing_inclusion_proof").
Msg("skipping slashed delegation event")
continue
Expand Down
20 changes: 12 additions & 8 deletions internal/services/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ func (s *Service) validateBTCDelegationUnbondedEarlyEvent(ctx context.Context, e
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Msg("Ignoring EventBTCDelgationUnbondedEarly because current state is not qualified for transition")
Str("event_type", "EventBTCDelgationUnbondedEarly").
Msg("current state is not qualified for transition")
return false, nil
}

Expand All @@ -349,8 +350,10 @@ func (s *Service) validateBTCDelegationExpiredEvent(ctx context.Context, event *

// Validate the event state
if event.NewState != bstypes.BTCDelegationStatus_EXPIRED.String() {
return false, types.NewValidationFailedError(
fmt.Errorf("invalid delegation state from Babylon when processing EventBTCDelegationExpired: expected EXPIRED, got %s", event.NewState),
return false, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Sprintf("invalid delegation state from Babylon when processing EventBTCDelegationExpired: expected EXPIRED, got %s", event.NewState),
)
}

Expand All @@ -369,16 +372,17 @@ func (s *Service) validateBTCDelegationExpiredEvent(ctx context.Context, event *
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Msg("Ignoring EventBTCDelegationExpired because current state is not qualified for transition")
Str("event_type", "EventBTCDelegationExpired").
Msg("current state is not qualified for transition")
return false, nil
}

return true, nil
}

func (s *Service) validateSlashedFinalityProviderEvent(ctx context.Context, event *ftypes.EventSlashedFinalityProvider) (bool, *types.Error) {
func (s *Service) validateSlashedFinalityProviderEvent(ctx context.Context, event *ftypes.EventSlashedFinalityProvider) *types.Error {
if event.Evidence == nil {
return false, types.NewErrorWithMsg(
return types.NewErrorWithMsg(
http.StatusInternalServerError,
types.InternalServiceError,
"slashed finality provider event missing evidence",
Expand All @@ -387,14 +391,14 @@ func (s *Service) validateSlashedFinalityProviderEvent(ctx context.Context, even

_, err := event.Evidence.ExtractBTCSK()
if err != nil {
return false, types.NewError(
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to extract BTC SK of the slashed finality provider: %w", err),
)
}

return true, nil
return nil
}

func sanitizeEvent(event abcitypes.Event) abcitypes.Event {
Expand Down
10 changes: 9 additions & 1 deletion internal/services/expiry_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error {
Str("current_state", delegation.State.String()).
Str("new_sub_state", tlDoc.DelegationSubState.String()).
Str("expire_height", strconv.FormatUint(uint64(tlDoc.ExpireHeight), 10)).
Msg("checking if delegation is expired")
Msg("checking if delegation is withdrawable")

// Check if the delegation is in a qualified state to transition to Withdrawable
if !utils.Contains(types.QualifiedStatesForWithdrawable(), delegation.State) {
log.Debug().
Str("staking_tx", delegation.StakingTxHashHex).
Str("current_state", delegation.State.String()).
Msg("current state is not qualified for withdrawable")

continue
}

Expand All @@ -76,6 +77,13 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error {
)
}

if err := s.emitWithdrawableDelegationEvent(ctx, delegation); err != nil {
log.Error().
Str("staking_tx", delegation.StakingTxHashHex).
Msg("failed to emit withdrawable delegation event")
return err
}

if err := s.db.DeleteExpiredDelegation(ctx, delegation.StakingTxHashHex); err != nil {
log.Error().
Str("staking_tx", delegation.StakingTxHashHex).
Expand Down
24 changes: 22 additions & 2 deletions internal/services/watch_btc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ func (s *Service) watchForSpendSlashingChange(
return
}

if err := s.emitWithdrawnDelegationEvent(quitCtx, delegation); err != nil {
log.Error().
Err(err).
Str("staking_tx", delegation.StakingTxHashHex).
Msg("failed to emit withdrawn delegation event")
return
}

case <-s.quit:
return
case <-quitCtx.Done():
Expand Down Expand Up @@ -321,13 +329,25 @@ func (s *Service) handleWithdrawal(
Str("state", types.StateWithdrawn.String()).
Str("sub_state", subState.String()).
Msg("updating delegation state to withdrawn")
return s.db.UpdateBTCDelegationState(
if err := s.db.UpdateBTCDelegationState(
ctx,
delegation.StakingTxHashHex,
types.QualifiedStatesForWithdrawn(),
types.StateWithdrawn,
&subState,
)
); err != nil {
return fmt.Errorf("failed to update delegation state to withdrawn: %w", err)
}

if err := s.emitWithdrawnDelegationEvent(ctx, delegation); err != nil {
log.Error().
Err(err).
Str("staking_tx", delegation.StakingTxHashHex).
Msg("failed to emit withdrawn delegation event")
return err
}

return nil
}

func (s *Service) startWatchingSlashingChange(
Expand Down
5 changes: 5 additions & 0 deletions internal/types/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func QualifiedStatesForExpired() []DelegationState {
return []DelegationState{StateActive}
}

// QualifiedStatesForSlashedDelegation returns the qualified current states when a delegation is slashed
func QualifiedStatesForSlashedDelegation() []DelegationState {
return []DelegationState{StatePending, StateVerified, StateActive, StateUnbonding, StateWithdrawable}
}

// QualifiedStatesForWithdrawn returns the qualified current states for Withdrawn event
func QualifiedStatesForWithdrawn() []DelegationState {
// StateActive/StateUnbonding/StateSlashed is included b/c its possible that expiry checker
Expand Down
10 changes: 5 additions & 5 deletions tests/mocks/mock_db_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading