Skip to content

Commit

Permalink
chore: print formatted queue messages on remove (#1204)
Browse files Browse the repository at this point in the history
# Related Github tickets

- Closes #1783

# Background

At the moment, investigating messages is incredibly tedious due to the
high volatility in storage. We will need a better way to investigate
messages after they've been removed from the consensus queue.
This PR logs all messages as they're removed from the queue, so we can
see what's happening through the logs.

- Change consensus keeper `cdc` from `BinaryCodec` to `Codec`. The
`Codec` interface embeds `BinaryCodec` as well, so we don't lose
functionality. However, `Codec` also adds `JSONCodec`.
- Create `MessageWithSignatures` from `QueuedSignedMessageI`.
- Marshal messages with `cdc.MarshalJSON` as we remove them from the
queues. We get the output formatted the same as in the CLI commands.

# Testing completed

- [x] test coverage exists or has been added/updated
- [x] tested in a private testnet

# Breaking changes

- [x] I have checked my code for breaking changes
- [x] If there are breaking changes, there is a supporting migration.
  • Loading branch information
maharifu authored Jul 1, 2024
1 parent 68a3849 commit f004c06
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 53 deletions.
2 changes: 1 addition & 1 deletion x/consensus/keeper/consensus/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestBatching(t *testing.T) {
ChainType: types.ChainTypeCosmos,
ChainReferenceID: "test",
})
ctx := sdk.NewContext(stateStore, tmproto.Header{}, false, nil)
ctx := sdk.NewContext(stateStore, tmproto.Header{}, false, log.NewNopLogger())

t.Run("putting messages in", func(t *testing.T) {
for i := 0; i < 666; i++ {
Expand Down
66 changes: 64 additions & 2 deletions x/consensus/keeper/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"

Expand Down Expand Up @@ -32,7 +33,7 @@ type QueueOptions struct {
QueueTypeName string
Sg keeperutil.StoreGetter
Ider keeperutil.IDGenerator
Cdc codec.BinaryCodec
Cdc codec.Codec
TypeCheck types.TypeChecker
BytesToSignCalculator types.BytesToSignFunc
VerifySignature types.VerifySignatureFunc
Expand Down Expand Up @@ -326,7 +327,7 @@ func (c Queue) AddSignature(ctx context.Context, msgID uint64, signData *types.S
// remove removes the message from the queue.
func (c Queue) Remove(ctx context.Context, msgID uint64) error {
sdkCtx := sdk.UnwrapSDKContext(ctx)
_, err := c.GetMsgByID(sdkCtx, msgID)
msg, err := c.GetMsgByID(sdkCtx, msgID)
if err != nil {
return err
}
Expand All @@ -342,6 +343,24 @@ func (c Queue) Remove(ctx context.Context, msgID uint64) error {
types.ItemRemovedEventID.With(fmt.Sprintf("%d", msgID)),
types.ItemRemovedChainReferenceID.With(c.qo.ChainReferenceID),
)

logger := liblog.FromSDKLogger(sdkCtx.Logger()).WithFields("msg-id", msgID)

msgWithSigs, err := ToMessageWithSignatures(msg, c.qo.Cdc)
if err != nil {
logger.WithError(err).Error("Failed to convert message with signatures")
return nil
}

jsonMsg, err := c.qo.Cdc.MarshalJSON(&msgWithSigs)
if err != nil {
logger.WithError(err).Error("Failed to marshal message as json")
return nil
}

logger.WithFields("msg", json.RawMessage(jsonMsg)).
Info("Removed message from queue")

return nil
}

Expand Down Expand Up @@ -442,3 +461,46 @@ func RemoveQueueCompletely(ctx context.Context, cq Queuer) {
store.Delete(key)
}
}

func ToMessageWithSignatures(msg types.QueuedSignedMessageI, cdc codec.BinaryCodec) (types.MessageWithSignatures, error) {
origMsg, err := msg.ConsensusMsg(cdc)
if err != nil {
return types.MessageWithSignatures{}, err
}
anyMsg, err := codectypes.NewAnyWithValue(origMsg)
if err != nil {
return types.MessageWithSignatures{}, err
}

var publicAccessData []byte

if msg.GetPublicAccessData() != nil {
publicAccessData = msg.GetPublicAccessData().GetData()
}

var errorData []byte

if msg.GetErrorData() != nil {
errorData = msg.GetErrorData().GetData()
}

approvedMessage := types.MessageWithSignatures{
Nonce: msg.Nonce(),
Id: msg.GetId(),
Msg: anyMsg,
BytesToSign: msg.GetBytesToSign(),
SignData: []*types.ValidatorSignature{},
PublicAccessData: publicAccessData,
ErrorData: errorData,
}
for _, signData := range msg.GetSignData() {
approvedMessage.SignData = append(approvedMessage.SignData, &types.ValidatorSignature{
ValAddress: signData.GetValAddress(),
Signature: signData.GetSignature(),
ExternalAccountAddress: signData.GetExternalAccountAddress(),
PublicKey: signData.GetPublicKey(),
})
}

return approvedMessage, nil
}
2 changes: 1 addition & 1 deletion x/consensus/keeper/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestConsensusQueueAllMethods(t *testing.T) {
ChainReferenceID: "bla",
},
}
ctx := sdk.NewContext(stateStore, tmproto.Header{}, false, nil)
ctx := sdk.NewContext(stateStore, tmproto.Header{}, false, log.NewNopLogger())

msg := &types.SimpleMessage{
Sender: "bob",
Expand Down
50 changes: 3 additions & 47 deletions x/consensus/keeper/grpc_query_messages_in_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import (
"context"
"fmt"

"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/palomachain/paloma/x/consensus/keeper/consensus"
"github.com/palomachain/paloma/x/consensus/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -31,7 +30,7 @@ func (k Keeper) MessageByID(goCtx context.Context, req *types.QueryMessageByIDRe
if msg == nil {
return nil, fmt.Errorf("message not found")
}
approvedMessage, err := toMessageWithSignatures(msg, k.cdc)
approvedMessage, err := consensus.ToMessageWithSignatures(msg, k.cdc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -68,54 +67,11 @@ func (k Keeper) MessagesInQueue(goCtx context.Context, req *types.QueryMessagesI
}
}

approvedMessage, err := toMessageWithSignatures(msg, k.cdc)
approvedMessage, err := consensus.ToMessageWithSignatures(msg, k.cdc)
if err != nil {
return nil, err
}
res.Messages = append(res.Messages, approvedMessage)
}
return res, nil
}

func toMessageWithSignatures(msg types.QueuedSignedMessageI, cdc codec.BinaryCodec) (types.MessageWithSignatures, error) {
origMsg, err := msg.ConsensusMsg(cdc)
if err != nil {
return types.MessageWithSignatures{}, err
}
anyMsg, err := codectypes.NewAnyWithValue(origMsg)
if err != nil {
return types.MessageWithSignatures{}, err
}

var publicAccessData []byte

if msg.GetPublicAccessData() != nil {
publicAccessData = msg.GetPublicAccessData().GetData()
}

var errorData []byte

if msg.GetErrorData() != nil {
errorData = msg.GetErrorData().GetData()
}

approvedMessage := types.MessageWithSignatures{
Nonce: msg.Nonce(),
Id: msg.GetId(),
Msg: anyMsg,
BytesToSign: msg.GetBytesToSign(),
SignData: []*types.ValidatorSignature{},
PublicAccessData: publicAccessData,
ErrorData: errorData,
}
for _, signData := range msg.GetSignData() {
approvedMessage.SignData = append(approvedMessage.SignData, &types.ValidatorSignature{
ValAddress: signData.GetValAddress(),
Signature: signData.GetSignature(),
ExternalAccountAddress: signData.GetExternalAccountAddress(),
PublicKey: signData.GetPublicKey(),
})
}

return approvedMessage, nil
}
4 changes: 2 additions & 2 deletions x/consensus/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

type (
Keeper struct {
cdc codec.BinaryCodec
cdc codec.Codec
storeKey store.KVStoreService
paramstore paramtypes.Subspace

Expand All @@ -34,7 +34,7 @@ type (
)

func NewKeeper(
cdc codec.BinaryCodec,
cdc codec.Codec,
storeKey store.KVStoreService,
ps paramtypes.Subspace,
valsetKeeper types.ValsetKeeper,
Expand Down

0 comments on commit f004c06

Please sign in to comment.