diff --git a/app/app.go b/app/app.go index 0b30ca8ac..a4250bcda 100644 --- a/app/app.go +++ b/app/app.go @@ -582,6 +582,7 @@ func New( consensusRegistry, &app.TreasuryKeeper, ) + app.ConsensusKeeper.AddMessageConsensusAttestedListener(&app.MetrixKeeper) app.EvmKeeper = *evmmodulekeeper.NewKeeper( appCodec, diff --git a/x/consensus/keeper/concensus_keeper.go b/x/consensus/keeper/concensus_keeper.go index 273311d47..bab1e3fdb 100644 --- a/x/consensus/keeper/concensus_keeper.go +++ b/x/consensus/keeper/concensus_keeper.go @@ -15,6 +15,7 @@ import ( "github.com/palomachain/paloma/x/consensus/keeper/filters" "github.com/palomachain/paloma/x/consensus/types" evmtypes "github.com/palomachain/paloma/x/evm/types" + metrixtypes "github.com/palomachain/paloma/x/metrix/types" valsettypes "github.com/palomachain/paloma/x/valset/types" ) @@ -287,35 +288,88 @@ func (k Keeper) GetMessagesFromQueue(ctx context.Context, queueTypeName string, // Any message that actually reached consensus will be removed from the queue during // attestation, other messages like superfluous valset updates will get removed // in their respective logic flows, but none of them should be using this function. -func (k Keeper) PruneJob(ctx sdk.Context, queueTypeName string, id uint64) (err error) { - if err := k.jailValidatorsWhichMissedAttestation(ctx, queueTypeName, id); err != nil { - liblog.FromSDKLogger(k.Logger(ctx)). +func (k Keeper) PruneJob(sdkCtx sdk.Context, queueTypeName string, id uint64) error { + err := k.jailValidatorsIfNecessary(sdkCtx, queueTypeName, id) + if err != nil { + liblog.FromSDKLogger(k.Logger(sdkCtx)). WithError(err). WithFields("msg-id", id). WithFields("queue-type-name", queueTypeName). - Error("Failed to jail validators that missed attestation.") + Error("Failed to jail validators.") } - return k.DeleteJob(ctx, queueTypeName, id) + return k.DeleteJob(sdkCtx, queueTypeName, id) } -func (k Keeper) jailValidatorsWhichMissedAttestation(ctx sdk.Context, queueTypeName string, id uint64) error { - cq, err := k.getConsensusQueue(ctx, queueTypeName) +func (k Keeper) jailValidatorsIfNecessary( + sdkCtx sdk.Context, + queueTypeName string, + id uint64, +) error { + cq, err := k.getConsensusQueue(sdkCtx, queueTypeName) if err != nil { return fmt.Errorf("getConsensusQueue: %w", err) } - msg, err := cq.GetMsgByID(ctx, id) + msg, err := cq.GetMsgByID(sdkCtx, id) if err != nil { return fmt.Errorf("getMsgByID: %w", err) } if msg.GetPublicAccessData() == nil && msg.GetErrorData() == nil { - // This message was never successfully handled, attestation flock - // should not be punished for this. + // The message was never delivered, so we need to update the validator + // metrics with a failure + return k.punishValidatorForMissingRelay(sdkCtx, msg) + } + + // Otherwise, there was a delivery attempt, so only jail validators that + // missed attestation + return k.jailValidatorsWhichMissedAttestation(sdkCtx, msg) +} + +func (k Keeper) punishValidatorForMissingRelay( + sdkCtx sdk.Context, + msg types.QueuedSignedMessageI, +) error { + if msg.GetGasEstimate() == 0 { + // If we don't have a gas estimate, this was probably not the + // validator's fault, so we do nothing + return nil + } + + consensusMsg, err := msg.ConsensusMsg(k.cdc) + if err != nil { + return err + } + + message, ok := consensusMsg.(*evmtypes.Message) + if !ok { + // If this is not a turnstone message, we don't want it return nil } + valAddr, err := sdk.ValAddressFromBech32(message.Assignee) + if err != nil { + return err + } + + for _, v := range k.onMessageAttestedListeners { + v.OnConsensusMessageAttested(sdkCtx, metrixtypes.MessageAttestedEvent{ + AssignedAtBlockHeight: message.AssignedAtBlockHeight, + HandledAtBlockHeight: math.NewInt(sdkCtx.BlockHeight()), + Assignee: valAddr, + MessageID: msg.GetId(), + WasRelayedSuccessfully: false, + }) + } + + return nil +} + +func (k Keeper) jailValidatorsWhichMissedAttestation( + ctx sdk.Context, + msg types.QueuedSignedMessageI, +) error { r, err := k.consensusChecker.VerifyEvidence(ctx, slice.Map(msg.GetEvidence(), func(evidence *types.Evidence) libcons.Evidence { return evidence @@ -371,7 +425,7 @@ func (k Keeper) jailValidatorsWhichMissedAttestation(ctx sdk.Context, queueTypeN // This validator is part of the active valset but did not supply evidence. // That's not very nice. Let's jail them. if err := k.valset.Jail(ctx, v.GetAddress(), fmt.Sprintf("No evidence supplied for contentious message %d", msg.GetId())); err != nil { - liblog.FromSDKLogger(k.Logger(ctx)).WithError(err).WithValidator(v.GetAddress().String()).WithFields("msg-id", id).Error("Failed to jail validator.") + liblog.FromSDKLogger(k.Logger(ctx)).WithError(err).WithValidator(v.GetAddress().String()).WithFields("msg-id", msg.GetId()).Error("Failed to jail validator.") } } } diff --git a/x/consensus/keeper/concensus_keeper_test.go b/x/consensus/keeper/concensus_keeper_test.go index 0e6662376..8a0db77fa 100644 --- a/x/consensus/keeper/concensus_keeper_test.go +++ b/x/consensus/keeper/concensus_keeper_test.go @@ -90,10 +90,10 @@ func TestEndToEndTestingOfPuttingAndGettingMessagesOfTheConsensusQueue(t *testin }) } -func TestJailValidatorsWhichMissedAttestation(t *testing.T) { +func TestJailValidatorsIfNecessary(t *testing.T) { queue := types.Queue(defaultQueueName, chainType, chainReferenceID) keeper, ms, ctx := newConsensusKeeper(t) - msgType := &types.SimpleMessage{} + msgType := &evmtypes.Message{} serializedTx, err := hex.DecodeString("02f87201108405f5e100850b68a0aa00825208941f9c2e67dbbe4c457a5e2be0bc31e67ce5953a2d87470de4df82000080c001a0e05de0771f8d577ec5aa440612c0e8f560d732d5162db0187cfaf56ac50c3716a0147565f4b0924a5adda25f55330c385448e0507d1219d4dac0950e2872682124") require.NoError(t, err) @@ -111,21 +111,24 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) { ) t.Run("with unsupported queue name", func(t *testing.T) { - err := keeper.jailValidatorsWhichMissedAttestation(ctx, "no support", 0) + err := keeper.jailValidatorsIfNecessary(ctx, "no support", 0) require.ErrorContains(t, err, "getConsensusQueue", "returns an error") }) t.Run("with unknown message ID", func(t *testing.T) { - err := keeper.jailValidatorsWhichMissedAttestation(ctx, queue, 42) + err := keeper.jailValidatorsIfNecessary(ctx, queue, 42) require.ErrorContains(t, err, "getMsgByID", "returns an error") }) t.Run("with unknown message ID", func(t *testing.T) { - err := keeper.jailValidatorsWhichMissedAttestation(ctx, queue, 42) + err := keeper.jailValidatorsIfNecessary(ctx, queue, 42) require.ErrorContains(t, err, "getMsgByID", "returns an error") }) - testMsg := types.SimpleMessage{Sender: "user", Hello: "foo", World: "bar"} + assignee, _ := sdk.ValAddressFromBech32("palomavaloper1tsu8nthuspe4zlkejtj3v27rtq8qz7q6983zt2") + testMsg := evmtypes.Message{ + Assignee: assignee.String(), + } t.Run("with message that actually forms consensus", func(t *testing.T) { mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}}) require.NoError(t, err) @@ -159,7 +162,7 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) { TotalShares: math.NewInt(4000), }, nil).Times(1) - err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID) + err = keeper.jailValidatorsIfNecessary(ctx, queue, mID) require.Error(t, err) require.ErrorContains(t, err, "unexpected message with valid consensus found, skipping jailing steps") }) @@ -198,9 +201,10 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) { TotalShares: math.NewInt(10000), }, nil).Times(2) - err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID) + err = keeper.jailValidatorsIfNecessary(ctx, queue, mID) require.NoError(t, err, "should not do anything") }) + t.Run("with expected validators missing", func(t *testing.T) { mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}}) require.NoError(t, err) @@ -237,17 +241,35 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) { ms.ValsetKeeper.On("Jail", mock.Anything, validators[0].GetAddress(), mock.Anything).Return(nil) ms.ValsetKeeper.On("Jail", mock.Anything, validators[1].GetAddress(), mock.Anything).Return(nil) - err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID) + err = keeper.jailValidatorsIfNecessary(ctx, queue, mID) + require.NoError(t, err, "should not do anything") + }) + t.Run("with neither error nor public access data set without gas estimate", func(t *testing.T) { + mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, + &consensus.PutOptions{RequireGasEstimation: true}) + require.NoError(t, err) + + err = keeper.jailValidatorsIfNecessary(ctx, queue, mID) require.NoError(t, err, "should not do anything") }) - t.Run("with neither error nor public access data set", func(t *testing.T) { - mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, nil) + t.Run("with neither error nor public access data set with gas estimate", func(t *testing.T) { + mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, + &consensus.PutOptions{RequireGasEstimation: true}) require.NoError(t, err) - err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID) + cq, err := keeper.getConsensusQueue(ctx, queue) + require.NoError(t, err) + + err = cq.SetElectedGasEstimate(ctx, mID, 1) + require.NoError(t, err) + + ms.MetrixKeeper.On("OnConsensusMessageAttested", mock.Anything, mock.Anything).Return(nil) + + err = keeper.jailValidatorsIfNecessary(ctx, queue, mID) require.NoError(t, err, "should not do anything") }) }) + t.Run("with expected validators missing, but less than 10% share supplied evidence", func(t *testing.T) { mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}}) require.NoError(t, err) @@ -281,10 +303,11 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) { TotalShares: math.NewInt(11000), }, nil).Times(1) - err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID) + err = keeper.jailValidatorsIfNecessary(ctx, queue, mID) require.Error(t, err, "should return error") require.ErrorContains(t, err, "message consensus failure likely caused by faulty response data") }) + t.Run("with expected validators missing, but 10% of share or more supplied evidence", func(t *testing.T) { mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}}) require.NoError(t, err) @@ -322,7 +345,7 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) { TotalShares: math.NewInt(10000), }, nil).Times(2) - err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID) + err = keeper.jailValidatorsIfNecessary(ctx, queue, mID) require.NoError(t, err, "should jail but not return error") }) } diff --git a/x/consensus/keeper/keeper.go b/x/consensus/keeper/keeper.go index fc6895ea1..5a0b643b6 100644 --- a/x/consensus/keeper/keeper.go +++ b/x/consensus/keeper/keeper.go @@ -15,6 +15,7 @@ import ( "github.com/palomachain/paloma/util/libcons" "github.com/palomachain/paloma/util/liblog" "github.com/palomachain/paloma/x/consensus/types" + metrixtypes "github.com/palomachain/paloma/x/metrix/types" ) type FeeProvider interface { @@ -31,10 +32,11 @@ type ( valset types.ValsetKeeper - registry *registry - evmKeeper types.EvmKeeper - consensusChecker *libcons.ConsensusChecker - feeProvider FeeProvider + registry *registry + evmKeeper types.EvmKeeper + consensusChecker *libcons.ConsensusChecker + feeProvider FeeProvider + onMessageAttestedListeners []metrixtypes.OnConsensusMessageAttestedListener } ) @@ -47,12 +49,13 @@ func NewKeeper( fp FeeProvider, ) *Keeper { k := &Keeper{ - cdc: cdc, - storeKey: storeKey, - paramstore: ps, - valset: valsetKeeper, - registry: reg, - feeProvider: fp, + cdc: cdc, + storeKey: storeKey, + paramstore: ps, + valset: valsetKeeper, + registry: reg, + feeProvider: fp, + onMessageAttestedListeners: make([]metrixtypes.OnConsensusMessageAttestedListener, 0), } ider := keeperutil.NewIDGenerator(k, nil) k.ider = ider @@ -61,6 +64,10 @@ func NewKeeper( return k } +func (k *Keeper) AddMessageConsensusAttestedListener(l metrixtypes.OnConsensusMessageAttestedListener) { + k.onMessageAttestedListeners = append(k.onMessageAttestedListeners, l) +} + func (k Keeper) Logger(ctx context.Context) log.Logger { sdkCtx := sdk.UnwrapSDKContext(ctx) return liblog.FromSDKLogger(sdkCtx.Logger()).With("module", fmt.Sprintf("x/%s", types.ModuleName)) diff --git a/x/consensus/keeper/keeper_setup_test.go b/x/consensus/keeper/keeper_setup_test.go index fd2b09562..d35109db4 100644 --- a/x/consensus/keeper/keeper_setup_test.go +++ b/x/consensus/keeper/keeper_setup_test.go @@ -22,9 +22,14 @@ import ( type mockedServices struct { ValsetKeeper *mocks.ValsetKeeper + MetrixKeeper *mocks.MetrixKeeper } func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) { + config := sdk.GetConfig() + config.SetBech32PrefixForAccount("paloma", "pub") + config.SetBech32PrefixForValidator("palomavaloper", "valoperpub") + logger := log.NewNopLogger() storeKey := storetypes.NewKVStoreKey(types.StoreKey) @@ -52,8 +57,12 @@ func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) { memStoreKey, "ConsensusParams", ) + + metrixKeeper := mocks.NewMetrixKeeper(t) + ms := mockedServices{ ValsetKeeper: mocks.NewValsetKeeper(t), + MetrixKeeper: metrixKeeper, } k := NewKeeper( appCodec, @@ -64,6 +73,8 @@ func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) { nil, ) + k.AddMessageConsensusAttestedListener(metrixKeeper) + ctx := sdk.NewContext(stateStore, tmproto.Header{}, false, logger) // Initialize params diff --git a/x/consensus/types/expected_keepers.go b/x/consensus/types/expected_keepers.go index 747a98b91..f60b9db3b 100644 --- a/x/consensus/types/expected_keepers.go +++ b/x/consensus/types/expected_keepers.go @@ -5,6 +5,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" xchain "github.com/palomachain/paloma/internal/x-chain" + metrixtypes "github.com/palomachain/paloma/x/metrix/types" valsettypes "github.com/palomachain/paloma/x/valset/types" ) @@ -33,3 +34,8 @@ type ValsetKeeper interface { type EvmKeeper interface { PickValidatorForMessage(ctx context.Context, chainReferenceID string, requirements *xchain.JobRequirements) (string, string, error) } + +//go:generate mockery --name=MetrixKeeper +type MetrixKeeper interface { + OnConsensusMessageAttested(context.Context, metrixtypes.MessageAttestedEvent) +} diff --git a/x/consensus/types/mocks/MetrixKeeper.go b/x/consensus/types/mocks/MetrixKeeper.go new file mode 100644 index 000000000..496aa7a82 --- /dev/null +++ b/x/consensus/types/mocks/MetrixKeeper.go @@ -0,0 +1,34 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + types "github.com/palomachain/paloma/x/metrix/types" + mock "github.com/stretchr/testify/mock" +) + +// MetrixKeeper is an autogenerated mock type for the MetrixKeeper type +type MetrixKeeper struct { + mock.Mock +} + +// OnConsensusMessageAttested provides a mock function with given fields: _a0, _a1 +func (_m *MetrixKeeper) OnConsensusMessageAttested(_a0 context.Context, _a1 types.MessageAttestedEvent) { + _m.Called(_a0, _a1) +} + +// NewMetrixKeeper creates a new instance of MetrixKeeper. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMetrixKeeper(t interface { + mock.TestingT + Cleanup(func()) +}) *MetrixKeeper { + mock := &MetrixKeeper{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}