From e9875a9cab7620de34919a9c6c6e6287694c640f Mon Sep 17 00:00:00 2001 From: Gurjot Singh <111540954+gusin13@users.noreply.github.com> Date: Thu, 9 Jan 2025 11:05:28 +0530 Subject: [PATCH] feat: store state history in db (#113) --- e2etest/e2e_test.go | 10 ++++ internal/db/delegation.go | 78 +++++++++++++++++++++++++-- internal/db/interface.go | 16 +++--- internal/db/model/delegation.go | 14 +++++ internal/services/delegation.go | 21 ++++---- internal/services/events.go | 10 ++-- internal/services/expiry_checker.go | 3 +- internal/services/watch_btc_events.go | 13 +++-- tests/mocks/mock_db_client.go | 39 ++++++++------ 9 files changed, 161 insertions(+), 43 deletions(-) diff --git a/e2etest/e2e_test.go b/e2etest/e2e_test.go index db5425c..6423bc4 100644 --- a/e2etest/e2e_test.go +++ b/e2etest/e2e_test.go @@ -202,4 +202,14 @@ func TestStakingEarlyUnbonding(t *testing.T) { // Consume unbonding staking event emitted by Indexer tm.CheckNextUnbondingStakingEvent(t, stakingMsgTxHash.String()) + + // Verify state history in Indexer DB + delegation, err := tm.DbClient.GetBTCDelegationByStakingTxHash(ctx, stakingMsgTxHash.String()) + require.NoError(t, err) + require.NotEmpty(t, delegation.StateHistory, "State history should not be empty") + require.Equal(t, delegation.StateHistory[0].State, types.StatePending) + require.Equal(t, delegation.StateHistory[1].State, types.StateVerified) + require.Equal(t, delegation.StateHistory[2].State, types.StateActive) + require.Equal(t, delegation.StateHistory[3].State, types.StateUnbonding) + require.Equal(t, delegation.StateHistory[3].SubState, expectedSubState) } diff --git a/internal/db/delegation.go b/internal/db/delegation.go index d80164b..6fd0eab 100644 --- a/internal/db/delegation.go +++ b/internal/db/delegation.go @@ -12,6 +12,37 @@ import ( "go.mongodb.org/mongo-driver/mongo" ) +// UpdateOption is a function that modifies update options +type UpdateOption func(*updateOptions) + +// updateOptions holds all possible optional parameters +type updateOptions struct { + subState *types.DelegationSubState + bbnHeight *int64 + btcHeight *int64 +} + +// WithSubState sets the sub-state option +func WithSubState(subState types.DelegationSubState) UpdateOption { + return func(opts *updateOptions) { + opts.subState = &subState + } +} + +// WithBbnHeight sets the BBN height option +func WithBbnHeight(height int64) UpdateOption { + return func(opts *updateOptions) { + opts.bbnHeight = &height + } +} + +// WithBtcHeight sets the BTC height option +func WithBtcHeight(height int64) UpdateOption { + return func(opts *updateOptions) { + opts.btcHeight = &height + } +} + func (db *Database) SaveNewBTCDelegation( ctx context.Context, delegationDoc *model.BTCDelegationDetails, ) error { @@ -40,7 +71,7 @@ func (db *Database) UpdateBTCDelegationState( stakingTxHash string, qualifiedPreviousStates []types.DelegationState, newState types.DelegationState, - newSubState *types.DelegationSubState, + opts ...UpdateOption, // Can pass multiple optional parameters ) error { if len(qualifiedPreviousStates) == 0 { return fmt.Errorf("qualified previous states array cannot be empty") @@ -51,6 +82,15 @@ func (db *Database) UpdateBTCDelegationState( qualifiedStateStrs[i] = state.String() } + options := &updateOptions{} + for _, opt := range opts { + opt(options) + } + + stateRecord := model.StateRecord{ + State: newState, + } + filter := bson.M{ "_id": stakingTxHash, "state": bson.M{"$in": qualifiedStateStrs}, @@ -60,12 +100,24 @@ func (db *Database) UpdateBTCDelegationState( "state": newState.String(), } - if newSubState != nil { - updateFields["sub_state"] = newSubState.String() + if options.bbnHeight != nil { + stateRecord.BbnHeight = *options.bbnHeight + } + + if options.btcHeight != nil { + stateRecord.BtcHeight = *options.btcHeight + } + + if options.subState != nil { + stateRecord.SubState = *options.subState + updateFields["sub_state"] = options.subState.String() } update := bson.M{ "$set": updateFields, + "$push": bson.M{ + "state_history": stateRecord, + }, } res := db.client.Database(db.dbName). @@ -98,13 +150,20 @@ func (db *Database) GetBTCDelegationState( func (db *Database) UpdateBTCDelegationDetails( ctx context.Context, stakingTxHash string, + bbnBlockHeight int64, details *model.BTCDelegationDetails, ) error { updateFields := bson.M{} + var stateRecord *model.StateRecord + // Only add fields to updateFields if they are not empty if details.State.String() != "" { updateFields["state"] = details.State.String() + stateRecord = &model.StateRecord{ + State: details.State, + BbnHeight: bbnBlockHeight, + } } if details.StartHeight != 0 { updateFields["start_height"] = details.StartHeight @@ -118,6 +177,10 @@ func (db *Database) UpdateBTCDelegationDetails( filter := bson.M{"_id": stakingTxHash} update := bson.M{"$set": updateFields} + if stateRecord != nil { + update["$push"] = bson.M{"state_history": stateRecord} + } + res, err := db.client.Database(db.dbName). Collection(model.BTCDelegationDetailsCollection). UpdateOne(ctx, filter, update) @@ -183,15 +246,24 @@ func (db *Database) UpdateDelegationsStateByFinalityProvider( ctx context.Context, fpBTCPKHex string, newState types.DelegationState, + bbnBlockHeight int64, ) error { filter := bson.M{ "finality_provider_btc_pks_hex": fpBTCPKHex, } + stateRecord := model.StateRecord{ + State: newState, + BbnHeight: bbnBlockHeight, + } + update := bson.M{ "$set": bson.M{ "state": newState.String(), }, + "$push": bson.M{ + "state_history": stateRecord, + }, } result, err := db.client.Database(db.dbName). diff --git a/internal/db/interface.go b/internal/db/interface.go index 3d24efe..7de7851 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -92,9 +92,12 @@ type DbInterface interface { ctx context.Context, delegationDoc *model.BTCDelegationDetails, ) error /** - * SaveBTCDelegationStateUpdate saves a BTC delegation state update to the database. + * UpdateBTCDelegationState updates a BTC delegation state in the database. * @param ctx The context - * @param delegationDoc The BTC delegation details + * @param stakingTxHash The staking transaction hash + * @param qualifiedPreviousStates The previous states that qualify for this update + * @param newState The new state to update to + * @param opts Optional parameters for the update * @return An error if the operation failed */ UpdateBTCDelegationState( @@ -102,7 +105,7 @@ type DbInterface interface { stakingTxHash string, qualifiedPreviousStates []types.DelegationState, newState types.DelegationState, - newSubState *types.DelegationSubState, + opts ...UpdateOption, ) error /** * SaveBTCDelegationUnbondingCovenantSignature saves a BTC delegation @@ -127,11 +130,12 @@ type DbInterface interface { * UpdateBTCDelegationDetails updates the BTC delegation details. * @param ctx The context * @param stakingTxHash The staking tx hash + * @param bbnBlockHeight The Babylon block height * @param details The BTC delegation details to update * @return An error if the operation failed */ UpdateBTCDelegationDetails( - ctx context.Context, stakingTxHash string, details *model.BTCDelegationDetails, + ctx context.Context, stakingTxHash string, bbnBlockHeight int64, details *model.BTCDelegationDetails, ) error /** * GetBTCDelegationByStakingTxHash retrieves the BTC delegation details by the staking tx hash. @@ -148,11 +152,11 @@ type DbInterface interface { * @param ctx The context * @param fpBtcPkHex The finality provider public key * @param newState The new state - * @param qualifiedStates The qualified states + * @param bbnBlockHeight The Babylon block height * @return An error if the operation failed */ UpdateDelegationsStateByFinalityProvider( - ctx context.Context, fpBtcPkHex string, newState types.DelegationState, + ctx context.Context, fpBtcPkHex string, newState types.DelegationState, bbnBlockHeight int64, ) error /** * GetDelegationsByFinalityProvider retrieves the BTC delegations by the finality provider public key. diff --git a/internal/db/model/delegation.go b/internal/db/model/delegation.go index 9cd2c96..2ed6d0d 100644 --- a/internal/db/model/delegation.go +++ b/internal/db/model/delegation.go @@ -27,6 +27,13 @@ type SlashingTx struct { SpendingHeight uint32 `bson:"spending_height"` } +type StateRecord struct { + State types.DelegationState `bson:"state"` + SubState types.DelegationSubState `bson:"sub_state,omitempty"` + BbnHeight int64 `bson:"bbn_height,omitempty"` // Babylon block height when applicable + BtcHeight int64 `bson:"btc_height,omitempty"` // Bitcoin block height when applicable +} + type BTCDelegationDetails struct { StakingTxHashHex string `bson:"_id"` // Primary key StakingTxHex string `bson:"staking_tx_hex"` @@ -39,6 +46,7 @@ type BTCDelegationDetails struct { EndHeight uint32 `bson:"end_height"` State types.DelegationState `bson:"state"` SubState types.DelegationSubState `bson:"sub_state,omitempty"` + StateHistory []StateRecord `bson:"state_history"` ParamsVersion uint32 `bson:"params_version"` UnbondingTime uint32 `bson:"unbonding_time"` UnbondingTx string `bson:"unbonding_tx"` @@ -118,6 +126,12 @@ func FromEventBTCDelegationCreated( Height: bbnBlockHeight, Timestamp: bbnBlockTime, }, + StateHistory: []StateRecord{ + { + State: types.StatePending, + BbnHeight: bbnBlockHeight, + }, + }, }, nil } diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 05f52f5..7671c64 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -122,7 +122,7 @@ func (s *Service) processCovenantSignatureReceivedEvent( } func (s *Service) processCovenantQuorumReachedEvent( - ctx context.Context, event abcitypes.Event, + ctx context.Context, event abcitypes.Event, bbnBlockHeight int64, ) *types.Error { covenantQuorumReachedEvent, err := parseEvent[*bbntypes.EventCovenantQuorumReached]( EventCovenantQuorumReached, event, @@ -186,7 +186,7 @@ func (s *Service) processCovenantQuorumReachedEvent( covenantQuorumReachedEvent.StakingTxHash, types.QualifiedStatesForCovenantQuorumReached(covenantQuorumReachedEvent.NewState), newState, - nil, + db.WithBbnHeight(bbnBlockHeight), ); dbErr != nil { return types.NewError( http.StatusInternalServerError, @@ -199,7 +199,7 @@ func (s *Service) processCovenantQuorumReachedEvent( } func (s *Service) processBTCDelegationInclusionProofReceivedEvent( - ctx context.Context, event abcitypes.Event, + ctx context.Context, event abcitypes.Event, bbnBlockHeight int64, ) *types.Error { inclusionProofEvent, err := parseEvent[*bbntypes.EventBTCDelegationInclusionProofReceived]( EventBTCDelegationInclusionProofReceived, event, @@ -261,6 +261,7 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( if dbErr := s.db.UpdateBTCDelegationDetails( ctx, inclusionProofEvent.StakingTxHash, + bbnBlockHeight, model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent), ); dbErr != nil { return types.NewError( @@ -274,7 +275,7 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( } func (s *Service) processBTCDelegationUnbondedEarlyEvent( - ctx context.Context, event abcitypes.Event, + ctx context.Context, event abcitypes.Event, bbnBlockHeight int64, ) *types.Error { unbondedEarlyEvent, err := parseEvent[*bbntypes.EventBTCDelgationUnbondedEarly]( EventBTCDelgationUnbondedEarly, @@ -349,7 +350,8 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( unbondedEarlyEvent.StakingTxHash, types.QualifiedStatesForUnbondedEarly(), types.StateUnbonding, - &subState, + db.WithSubState(subState), + db.WithBbnHeight(bbnBlockHeight), ); err != nil { return types.NewError( http.StatusInternalServerError, @@ -362,7 +364,7 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( } func (s *Service) processBTCDelegationExpiredEvent( - ctx context.Context, event abcitypes.Event, + ctx context.Context, event abcitypes.Event, bbnBlockHeight int64, ) *types.Error { expiredEvent, err := parseEvent[*bbntypes.EventBTCDelegationExpired]( EventBTCDelegationExpired, @@ -417,7 +419,8 @@ func (s *Service) processBTCDelegationExpiredEvent( delegation.StakingTxHashHex, types.QualifiedStatesForExpired(), types.StateUnbonding, - &subState, + db.WithSubState(subState), + db.WithBbnHeight(bbnBlockHeight), ); err != nil { return types.NewError( http.StatusInternalServerError, @@ -430,7 +433,7 @@ func (s *Service) processBTCDelegationExpiredEvent( } func (s *Service) processSlashedFinalityProviderEvent( - ctx context.Context, event abcitypes.Event, + ctx context.Context, event abcitypes.Event, bbnBlockHeight int64, ) *types.Error { slashedFinalityProviderEvent, err := parseEvent[*ftypes.EventSlashedFinalityProvider]( EventSlashedFinalityProvider, @@ -453,7 +456,7 @@ func (s *Service) processSlashedFinalityProviderEvent( fpBTCPKHex := evidence.FpBtcPk.MarshalHex() if dbErr := s.db.UpdateDelegationsStateByFinalityProvider( - ctx, fpBTCPKHex, types.StateSlashed, + ctx, fpBTCPKHex, types.StateSlashed, bbnBlockHeight, ); dbErr != nil { return types.NewError( http.StatusInternalServerError, diff --git a/internal/services/events.go b/internal/services/events.go index 031834f..494159a 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -70,22 +70,22 @@ func (s *Service) processEvent( err = s.processNewBTCDelegationEvent(ctx, bbnEvent, blockHeight) case EventCovenantQuorumReached: log.Debug().Msg("Processing covenant quorum reached event") - err = s.processCovenantQuorumReachedEvent(ctx, bbnEvent) + err = s.processCovenantQuorumReachedEvent(ctx, bbnEvent, blockHeight) case EventCovenantSignatureReceived: log.Debug().Msg("Processing covenant signature received event") err = s.processCovenantSignatureReceivedEvent(ctx, bbnEvent) case EventBTCDelegationInclusionProofReceived: log.Debug().Msg("Processing BTC delegation inclusion proof received event") - err = s.processBTCDelegationInclusionProofReceivedEvent(ctx, bbnEvent) + err = s.processBTCDelegationInclusionProofReceivedEvent(ctx, bbnEvent, blockHeight) case EventBTCDelgationUnbondedEarly: log.Debug().Msg("Processing BTC delegation unbonded early event") - err = s.processBTCDelegationUnbondedEarlyEvent(ctx, bbnEvent) + err = s.processBTCDelegationUnbondedEarlyEvent(ctx, bbnEvent, blockHeight) case EventBTCDelegationExpired: log.Debug().Msg("Processing BTC delegation expired event") - err = s.processBTCDelegationExpiredEvent(ctx, bbnEvent) + err = s.processBTCDelegationExpiredEvent(ctx, bbnEvent, blockHeight) case EventSlashedFinalityProvider: log.Debug().Msg("Processing slashed finality provider event") - err = s.processSlashedFinalityProviderEvent(ctx, bbnEvent) + err = s.processSlashedFinalityProviderEvent(ctx, bbnEvent, blockHeight) } if err != nil { diff --git a/internal/services/expiry_checker.go b/internal/services/expiry_checker.go index a2b1582..62d6f33 100644 --- a/internal/services/expiry_checker.go +++ b/internal/services/expiry_checker.go @@ -57,7 +57,8 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { delegation.StakingTxHashHex, types.QualifiedStatesForWithdrawable(), types.StateWithdrawable, - &tlDoc.DelegationSubState, + db.WithSubState(tlDoc.DelegationSubState), + db.WithBtcHeight(int64(tlDoc.ExpireHeight)), ) if stateUpdateErr != nil { if db.IsNotFoundError(stateUpdateErr) { diff --git a/internal/services/watch_btc_events.go b/internal/services/watch_btc_events.go index 6efbb1a..20eca98 100644 --- a/internal/services/watch_btc_events.go +++ b/internal/services/watch_btc_events.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils" @@ -135,7 +136,8 @@ func (s *Service) watchForSpendSlashingChange( delegation.StakingTxHashHex, types.QualifiedStatesForWithdrawn(), types.StateWithdrawn, - &delegationSubState, + db.WithSubState(delegationSubState), + db.WithBtcHeight(int64(spendDetail.SpendingHeight)), ); err != nil { log.Error(). Err(err). @@ -193,7 +195,7 @@ func (s *Service) handleSpendingStakingTransaction( Str("staking_tx", delegation.StakingTxHashHex). Str("withdrawal_tx", spendingTx.TxHash().String()). Msg("staking tx has been spent through withdrawal path") - return s.handleWithdrawal(ctx, delegation, types.SubStateTimelock) + return s.handleWithdrawal(ctx, delegation, types.SubStateTimelock, spendingHeight) } // If it's not a valid withdrawal, check if it's a valid slashing @@ -255,7 +257,7 @@ func (s *Service) handleSpendingUnbondingTransaction( Str("staking_tx", delegation.StakingTxHashHex). Str("unbonding_tx", spendingTx.TxHash().String()). Msg("unbonding tx has been spent through withdrawal path") - return s.handleWithdrawal(ctx, delegation, types.SubStateEarlyUnbonding) + return s.handleWithdrawal(ctx, delegation, types.SubStateEarlyUnbonding, spendingHeight) } // If it's not a valid withdrawal, check if it's a valid slashing @@ -300,6 +302,7 @@ func (s *Service) handleWithdrawal( ctx context.Context, delegation *model.BTCDelegationDetails, subState types.DelegationSubState, + spendingHeight uint32, ) error { delegationState, err := s.db.GetBTCDelegationState(ctx, delegation.StakingTxHashHex) if err != nil { @@ -321,12 +324,14 @@ func (s *Service) handleWithdrawal( Str("state", types.StateWithdrawn.String()). Str("sub_state", subState.String()). Msg("updating delegation state to withdrawn") + return s.db.UpdateBTCDelegationState( ctx, delegation.StakingTxHashHex, types.QualifiedStatesForWithdrawn(), types.StateWithdrawn, - &subState, + db.WithSubState(subState), + db.WithBtcHeight(int64(spendingHeight)), ) } diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index 40a3532..4fad2e2 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -7,6 +7,8 @@ import ( bbnclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" + db "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" + mock "github.com/stretchr/testify/mock" model "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" @@ -437,17 +439,17 @@ func (_m *DbInterface) SaveStakingParams(ctx context.Context, version uint32, pa return r0 } -// UpdateBTCDelegationDetails provides a mock function with given fields: ctx, stakingTxHash, details -func (_m *DbInterface) UpdateBTCDelegationDetails(ctx context.Context, stakingTxHash string, details *model.BTCDelegationDetails) error { - ret := _m.Called(ctx, stakingTxHash, details) +// UpdateBTCDelegationDetails provides a mock function with given fields: ctx, stakingTxHash, bbnBlockHeight, details +func (_m *DbInterface) UpdateBTCDelegationDetails(ctx context.Context, stakingTxHash string, bbnBlockHeight int64, details *model.BTCDelegationDetails) error { + ret := _m.Called(ctx, stakingTxHash, bbnBlockHeight, details) if len(ret) == 0 { panic("no return value specified for UpdateBTCDelegationDetails") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, *model.BTCDelegationDetails) error); ok { - r0 = rf(ctx, stakingTxHash, details) + if rf, ok := ret.Get(0).(func(context.Context, string, int64, *model.BTCDelegationDetails) error); ok { + r0 = rf(ctx, stakingTxHash, bbnBlockHeight, details) } else { r0 = ret.Error(0) } @@ -455,17 +457,24 @@ func (_m *DbInterface) UpdateBTCDelegationDetails(ctx context.Context, stakingTx return r0 } -// UpdateBTCDelegationState provides a mock function with given fields: ctx, stakingTxHash, qualifiedPreviousStates, newState, newSubState -func (_m *DbInterface) UpdateBTCDelegationState(ctx context.Context, stakingTxHash string, qualifiedPreviousStates []types.DelegationState, newState types.DelegationState, newSubState *types.DelegationSubState) error { - ret := _m.Called(ctx, stakingTxHash, qualifiedPreviousStates, newState, newSubState) +// UpdateBTCDelegationState provides a mock function with given fields: ctx, stakingTxHash, qualifiedPreviousStates, newState, opts +func (_m *DbInterface) UpdateBTCDelegationState(ctx context.Context, stakingTxHash string, qualifiedPreviousStates []types.DelegationState, newState types.DelegationState, opts ...db.UpdateOption) error { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, stakingTxHash, qualifiedPreviousStates, newState) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) if len(ret) == 0 { panic("no return value specified for UpdateBTCDelegationState") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, []types.DelegationState, types.DelegationState, *types.DelegationSubState) error); ok { - r0 = rf(ctx, stakingTxHash, qualifiedPreviousStates, newState, newSubState) + if rf, ok := ret.Get(0).(func(context.Context, string, []types.DelegationState, types.DelegationState, ...db.UpdateOption) error); ok { + r0 = rf(ctx, stakingTxHash, qualifiedPreviousStates, newState, opts...) } else { r0 = ret.Error(0) } @@ -473,17 +482,17 @@ func (_m *DbInterface) UpdateBTCDelegationState(ctx context.Context, stakingTxHa return r0 } -// UpdateDelegationsStateByFinalityProvider provides a mock function with given fields: ctx, fpBtcPkHex, newState -func (_m *DbInterface) UpdateDelegationsStateByFinalityProvider(ctx context.Context, fpBtcPkHex string, newState types.DelegationState) error { - ret := _m.Called(ctx, fpBtcPkHex, newState) +// UpdateDelegationsStateByFinalityProvider provides a mock function with given fields: ctx, fpBtcPkHex, newState, bbnBlockHeight +func (_m *DbInterface) UpdateDelegationsStateByFinalityProvider(ctx context.Context, fpBtcPkHex string, newState types.DelegationState, bbnBlockHeight int64) error { + ret := _m.Called(ctx, fpBtcPkHex, newState, bbnBlockHeight) if len(ret) == 0 { panic("no return value specified for UpdateDelegationsStateByFinalityProvider") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, types.DelegationState) error); ok { - r0 = rf(ctx, fpBtcPkHex, newState) + if rf, ok := ret.Get(0).(func(context.Context, string, types.DelegationState, int64) error); ok { + r0 = rf(ctx, fpBtcPkHex, newState, bbnBlockHeight) } else { r0 = ret.Error(0) }