Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle slashing state #51

Merged
merged 7 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions internal/db/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package db
import (
"context"
"errors"
"fmt"
"log"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
Expand Down Expand Up @@ -36,8 +38,12 @@ func (db *Database) SaveNewBTCDelegation(
func (db *Database) UpdateBTCDelegationState(
ctx context.Context, stakingTxHash string, newState types.DelegationState,
) error {
filter := map[string]interface{}{"_id": stakingTxHash}
update := map[string]interface{}{"$set": map[string]string{"state": newState.String()}}
filter := bson.M{"_id": stakingTxHash}
update := bson.M{
"$set": bson.M{
"state": newState.String(),
},
}

res := db.client.Database(db.dbName).
Collection(model.BTCDelegationDetailsCollection).
Expand Down Expand Up @@ -110,7 +116,8 @@ func (db *Database) UpdateBTCDelegationDetails(
func (db *Database) GetBTCDelegationByStakingTxHash(
ctx context.Context, stakingTxHash string,
) (*model.BTCDelegationDetails, error) {
filter := map[string]interface{}{"_id": stakingTxHash}
filter := bson.M{"_id": stakingTxHash}

res := db.client.Database(db.dbName).
Collection(model.BTCDelegationDetailsCollection).
FindOne(ctx, filter)
Expand All @@ -129,3 +136,33 @@ func (db *Database) GetBTCDelegationByStakingTxHash(

return &delegationDoc, nil
}

func (db *Database) UpdateDelegationsStateByFinalityProvider(
ctx context.Context,
fpBTCPKHex string,
newState types.DelegationState,
) error {
filter := bson.M{
"finality_provider_btc_pks_hex": fpBTCPKHex,
}

update := bson.M{
"$set": bson.M{
"state": newState.String(),
},
}

result, err := db.client.Database(db.dbName).
Collection(model.BTCDelegationDetailsCollection).
UpdateMany(ctx, filter, update)
if err != nil {
return fmt.Errorf("failed to update delegations: %w", err)
}

log.Printf("Updated %d delegations for finality provider %s to state %s",
result.ModifiedCount,
fpBTCPKHex,
newState.String(),
)
return nil
}
File renamed without changes.
11 changes: 11 additions & 0 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ type DbInterface interface {
GetBTCDelegationByStakingTxHash(
ctx context.Context, stakingTxHash string,
) (*model.BTCDelegationDetails, error)
/**
* UpdateDelegationsStateByFinalityProvider updates the BTC delegation state by the finality provider public key.
* @param ctx The context
* @param fpBtcPkHex The finality provider public key
* @param newState The new state
* @param qualifiedStates The qualified states
* @return An error if the operation failed
*/
UpdateDelegationsStateByFinalityProvider(
ctx context.Context, fpBtcPkHex string, newState types.DelegationState,
) error
/**
* SaveNewTimeLockExpire saves a new timelock expire to the database.
* If the timelock expire already exists, DuplicateKeyError will be returned.
Expand Down
234 changes: 28 additions & 206 deletions internal/services/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
"github.com/babylonlabs-io/babylon-staking-indexer/internal/db"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/utils"
bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types"
ftypes "github.com/babylonlabs-io/babylon/x/finality/types"
abcitypes "github.com/cometbft/cometbft/abci/types"
"github.com/rs/zerolog/log"
)

const (
Expand All @@ -20,6 +19,7 @@ const (
EventBTCDelegationInclusionProofReceived EventTypes = "babylon.btcstaking.v1.EventBTCDelegationInclusionProofReceived"
EventBTCDelgationUnbondedEarly EventTypes = "babylon.btcstaking.v1.EventBTCDelgationUnbondedEarly"
EventBTCDelegationExpired EventTypes = "babylon.btcstaking.v1.EventBTCDelegationExpired"
EventSlashedFinalityProvider EventTypes = "babylon.finality.v1.EventSlashedFinalityProvider"
)

func (s *Service) processNewBTCDelegationEvent(
Expand Down Expand Up @@ -245,220 +245,42 @@ func (s *Service) processBTCDelegationExpiredEvent(
return nil
}

func (s *Service) validateBTCDelegationCreatedEvent(event *bbntypes.EventBTCDelegationCreated) *types.Error {
// Check if the staking tx hash is present
if event.StakingTxHash == "" {
return types.NewErrorWithMsg(
http.StatusInternalServerError,
types.InternalServiceError,
"new BTC delegation event missing staking tx hash",
)
}

// Validate the event state
if event.NewState != bbntypes.BTCDelegationStatus_PENDING.String() {
return types.NewValidationFailedError(
fmt.Errorf("invalid delegation state from Babylon: expected PENDING, got %s", event.NewState),
)
}

return nil
}

func (s *Service) validateCovenantQuorumReachedEvent(ctx context.Context, event *bbntypes.EventCovenantQuorumReached) (bool, *types.Error) {
// Check if the staking tx hash is present
if event.StakingTxHash == "" {
return false, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.InternalServiceError,
"covenant quorum reached event missing staking tx hash",
)
}

// Fetch the current delegation state from the database
delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, event.StakingTxHash)
if dbErr != nil {
return false, types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr),
)
}

// Retrieve the qualified states for the intended transition
qualifiedStates := types.QualifiedStatesForCovenantQuorumReached(event.NewState)
if qualifiedStates == nil {
return false, types.NewValidationFailedError(
fmt.Errorf("invalid delegation state from Babylon: %s", event.NewState),
)
}

// Check if the current state is qualified for the transition
if !utils.Contains(qualifiedStates, delegation.State) {
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Str("newState", event.NewState).
Msg("Ignoring EventCovenantQuorumReached because current state is not qualified for transition")
return false, nil // Ignore the event silently
}

if event.NewState == bbntypes.BTCDelegationStatus_VERIFIED.String() {
// This will only happen if the staker is following the new pre-approval flow.
// For more info read https://github.com/babylonlabs-io/pm/blob/main/rfc/rfc-008-staking-transaction-pre-approval.md#handling-of-the-modified--msgcreatebtcdelegation-message

// Delegation should not have the inclusion proof yet
if delegation.HasInclusionProof() {
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Str("newState", event.NewState).
Msg("Ignoring EventCovenantQuorumReached because inclusion proof already received")
return false, nil
}
} else if event.NewState == bbntypes.BTCDelegationStatus_ACTIVE.String() {
// This will happen if the inclusion proof is received in MsgCreateBTCDelegation, i.e the staker is following the old flow

// Delegation should have the inclusion proof
if !delegation.HasInclusionProof() {
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Str("newState", event.NewState).
Msg("Ignoring EventCovenantQuorumReached because inclusion proof not received")
return false, nil
}
}

return true, nil
}

func (s *Service) validateBTCDelegationInclusionProofReceivedEvent(ctx context.Context, event *bbntypes.EventBTCDelegationInclusionProofReceived) (bool, *types.Error) {
// Check if the staking tx hash is present
if event.StakingTxHash == "" {
return false, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.InternalServiceError,
"inclusion proof received event missing staking tx hash",
)
}

// Fetch the current delegation state from the database
delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, event.StakingTxHash)
if dbErr != nil {
return false, types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr),
)
}

// Retrieve the qualified states for the intended transition
qualifiedStates := types.QualifiedStatesForInclusionProofReceived(event.NewState)
if qualifiedStates == nil {
return false, types.NewValidationFailedError(
fmt.Errorf("no qualified states defined for new state: %s", event.NewState),
)
}

// Check if the current state is qualified for the transition
if !utils.Contains(qualifiedStates, delegation.State) {
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Str("newState", event.NewState).
Msg("Ignoring EventBTCDelegationInclusionProofReceived because current state is not qualified for transition")
return false, nil
}

// Delegation should not have the inclusion proof yet
// After this event is processed, the inclusion proof will be set
if delegation.HasInclusionProof() {
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Str("newState", event.NewState).
Msg("Ignoring EventBTCDelegationInclusionProofReceived because inclusion proof already received")
return false, nil
}

return true, nil
}

func (s *Service) validateBTCDelegationUnbondedEarlyEvent(ctx context.Context, event *bbntypes.EventBTCDelgationUnbondedEarly) (bool, *types.Error) {
// Check if the staking tx hash is present
if event.StakingTxHash == "" {
return false, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.InternalServiceError,
"unbonded early event missing staking tx hash",
)
}

// Validate the event state
if event.NewState != bbntypes.BTCDelegationStatus_UNBONDED.String() {
return false, types.NewValidationFailedError(
fmt.Errorf("invalid delegation state from Babylon: expected UNBONDED, got %s", event.NewState),
)
}

// Fetch the current delegation state from the database
delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, event.StakingTxHash)
if dbErr != nil {
return false, types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr),
)
func (s *Service) processSlashedFinalityProviderEvent(
ctx context.Context, event abcitypes.Event,
) *types.Error {
slashedFinalityProviderEvent, err := parseEvent[*ftypes.EventSlashedFinalityProvider](
EventSlashedFinalityProvider,
event,
)
if err != nil {
return err
}

// Check if the current state is qualified for the transition
if !utils.Contains(types.QualifiedStatesForUnbondedEarly(), delegation.State) {
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Msg("Ignoring EventBTCDelgationUnbondedEarly because current state is not qualified for transition")
return false, nil
shouldProcess, err := s.validateSlashedFinalityProviderEvent(ctx, slashedFinalityProviderEvent)
if err != nil {
return err
}

return true, nil
}

func (s *Service) validateBTCDelegationExpiredEvent(ctx context.Context, event *bbntypes.EventBTCDelegationExpired) (bool, *types.Error) {
// Check if the staking tx hash is present
if event.StakingTxHash == "" {
return false, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.InternalServiceError,
"expired event missing staking tx hash",
)
if !shouldProcess {
// Event is valid but should be skipped
return nil
}

// Validate the event state
if event.NewState != bbntypes.BTCDelegationStatus_UNBONDED.String() {
return false, types.NewValidationFailedError(
fmt.Errorf("invalid delegation state from Babylon: expected UNBONDED, got %s", event.NewState),
)
}
evidence := slashedFinalityProviderEvent.Evidence
fpBTCPKHex := evidence.FpBtcPk.MarshalHex()

// Fetch the current delegation state from the database
delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, event.StakingTxHash)
if dbErr != nil {
return false, types.NewError(
if dbErr := s.db.UpdateDelegationsStateByFinalityProvider(
ctx, fpBTCPKHex, types.StateSlashed,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if service restart in the mid of this process? e.g there are 1m delegations under the FP PK. we only processed 10k of those then we deployed the service again

Copy link
Collaborator Author

@gusin13 gusin13 Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm so if this happens then last processed height not updated.

in next restart, it picks up from same height (at which it failed in prev run), finds this slashing event and then tries to make a bulk update again 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, but slashing operation is very time consuming. we shall not couple it with the main processing thread.
Given the same example above, say u have 1 million delegations under this FP. our indexer will be blocked on this operation until all delegations are updated.

My proposal is to perform this slashing operation in a different go routine, you can do this by either:

  1. Emit "slashing msg" into rabbitmq, then process it by indexer itself. only delete the msg once all delegations are processed.
  2. OR, you create a temporary db collection just for slashing. let's say slashing_registry. then a go routing monitoring this collection and remove the entry once no more delegations to process.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, sure created a ticket to optimize the slashing process
#57

); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr),
fmt.Errorf("failed to update BTC delegation state: %w", dbErr),
)
}

// Check if the current state is qualified for the transition
if !utils.Contains(types.QualifiedStatesForExpired(), delegation.State) {
log.Debug().
Str("stakingTxHashHex", event.StakingTxHash).
Str("currentState", delegation.State.String()).
Msg("Ignoring EventBTCDelegationExpired because current state is not qualified for transition")
return false, nil
}
// TODO: babylon needs to emit slashing tx
// so indexer can start watching for slashing spend
// to identify if staker has withdrawn after slashing

return true, nil
return nil
}
Loading
Loading