From c8f253a2a0a6fc6384daa43a8b1e797edbbf38dc Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Fri, 17 Jan 2025 23:20:09 -0300 Subject: [PATCH] chore: build message complete --- clientcontroller/babylon_msg.go | 245 +++++++++++++++++----------- clientcontroller/babylon_msg_log.go | 224 +++++++++++++++++++++++++ 2 files changed, 371 insertions(+), 98 deletions(-) create mode 100644 clientcontroller/babylon_msg_log.go diff --git a/clientcontroller/babylon_msg.go b/clientcontroller/babylon_msg.go index 02a20c4..7c538ed 100644 --- a/clientcontroller/babylon_msg.go +++ b/clientcontroller/babylon_msg.go @@ -16,10 +16,17 @@ import ( "cosmossdk.io/store/rootmulti" "github.com/avast/retry-go/v4" bbn "github.com/babylonlabs-io/babylon/app" + appparams "github.com/babylonlabs-io/babylon/app/params" "github.com/babylonlabs-io/babylon/client/config" abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/crypto/merkle" + "github.com/cometbft/cometbft/libs/bytes" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + tmtypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" @@ -27,14 +34,13 @@ import ( legacyerrors "github.com/cosmos/cosmos-sdk/types/errors" txtypes "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/types/tx/signing" - chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" + rlycosmos "github.com/cosmos/relayer/v2/chains/cosmos" "github.com/cosmos/relayer/v2/relayer/provider" "github.com/juju/fslock" abcistrange "github.com/strangelove-ventures/cometbft-client/abci/types" strangeloveclient "github.com/strangelove-ventures/cometbft-client/client" rpcclient "github.com/strangelove-ventures/cometbft-client/rpc/client" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -56,22 +62,31 @@ func reliablySendMsgsAsMultipleTxs( msgs []sdk.Msg, ) error { + encCfg := bbn.GetEncodingConfig() + c, err := strangeloveclient.NewClient(cfg.RPCAddr, cfg.Timeout) if err != nil { return err } - return nil + ctx := context.Background() + return ReliablySendMsgs(ctx, cfg) } // ReliablySendMsgs reliably sends a list of messages to the chain. // It utilizes a file lock as well as a keyring lock to ensure atomic access. func ReliablySendMsgs( ctx context.Context, - logger *zap.Logger, cfg *config.BabylonConfig, + logger *zap.Logger, + cometClient client.CometRPC, + rpcClient *strangeloveclient.Client, + encCfg *appparams.EncodingConfig, + ar client.AccountRetriever, + + accSequence uint64, msgs []sdk.Msg, - expectedErrors, unrecoverableErrors []*errors.Error, + expectedErrors, unrecoverableErrors []*sdkerrors.Error, ) (*sdk.TxResponse, error) { var ( rlyResp *sdk.TxResponse @@ -90,7 +105,7 @@ func ReliablySendMsgs( if err := retry.Do(func() error { var sendMsgErr error krErr := AccessKeyWithLock(cfg.KeyDirectory, func() { - sendMsgErr = c.provider.SendMessagesToMempool(ctx, msgs, "", ctx, []func(*sdk.TxResponse, error){callback}) + sendMsgErr = SendMessagesToMempool(ctx, cfg, logger, cometClient, rpcClient, encCfg, ar, msgs, "", accSequence, []func(*sdk.TxResponse, error){callback}) }) if krErr != nil { logger.Error("unrecoverable err when submitting the tx, skip retrying", zap.Error(krErr)) @@ -146,28 +161,30 @@ func ReliablySendMsgs( func SendMessagesToMempool( ctx context.Context, cfg *config.BabylonConfig, + logger *zap.Logger, cometClient client.CometRPC, rpcClient *strangeloveclient.Client, + encCfg *appparams.EncodingConfig, ar client.AccountRetriever, msgs []sdk.Msg, memo string, - txSignerKey string, sequence uint64, - asyncCtx context.Context, - asyncCallbacks []func(sdk.TxResponse, error), + asyncCallbacks []func(*sdk.TxResponse, error), ) error { + txSignerKey := cfg.Key txBytes, fees, err := BuildMessages( - ctx, cfg, cometClient, rpcClient, ar, msgs, memo, 0, txSignerKey, sequence, + ctx, cfg, cometClient, rpcClient, encCfg, ar, msgs, memo, 0, txSignerKey, sequence, ) if err != nil { return err } - if err := cc.broadcastTx(ctx, txBytes, msgs, fees, asyncCtx, defaultBroadcastWaitTimeout, asyncCallbacks); err != nil { + err = BroadcastTx(ctx, logger, cfg, encCfg, rpcClient, txBytes, msgs, fees, ctx, defaultBroadcastWaitTimeout, asyncCallbacks) + if err != nil { return err } @@ -179,6 +196,7 @@ func BuildMessages( cfg *config.BabylonConfig, cometClient client.CometRPC, rpcClient *strangeloveclient.Client, + encCfg *appparams.EncodingConfig, ar client.AccountRetriever, msgs []sdk.Msg, memo string, @@ -190,7 +208,6 @@ func BuildMessages( fees sdk.Coins, err error, ) { - encCfg := bbn.GetEncodingConfig() keybase, err := keyring.New( cfg.ChainID, @@ -262,15 +279,16 @@ func BroadcastTx( ctx context.Context, // context for tx broadcast logger *zap.Logger, cfg *config.BabylonConfig, + encCfg *appparams.EncodingConfig, rpcClient *strangeloveclient.Client, tx []byte, // raw tx to be broadcasted - msgs []provider.RelayerMessage, // used for logging only + msgs []sdk.Msg, // used for logging only fees sdk.Coins, // used for metrics asyncCtx context.Context, // context for async wait for block inclusion after successful tx broadcast asyncTimeout time.Duration, // timeout for waiting for block inclusion - asyncCallbacks []func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion + asyncCallbacks []func(*sdk.TxResponse, error), // callback for success/fail of the wait for block inclusion ) error { res, err := rpcClient.BroadcastTxSync(ctx, tx) isErr := err != nil @@ -299,8 +317,8 @@ func BroadcastTx( // TODO: maybe we need to check if the node has tx indexing enabled? // if not, we need to find a new way to block until inclusion in a block - - go cc.waitForTx(asyncCtx, res.Hash, msgs, asyncTimeout, asyncCallbacks) + protoCdc := codec.NewProtoCodec(encCfg.InterfaceRegistry) + go waitForTx(asyncCtx, logger, rpcClient, protoCdc, encCfg.TxConfig, cfg.ChainID, res.Hash, msgs, asyncTimeout, asyncCallbacks) return nil } @@ -363,12 +381,16 @@ func CalculateGas( func waitForTx( ctx context.Context, log *zap.Logger, + rpcClient *strangeloveclient.Client, + cdc *codec.ProtoCodec, + txConfig client.TxConfig, + chainId string, txHash []byte, - msgs []provider.RelayerMessage, // used for logging only + msgs []sdk.Msg, // used for logging only waitTimeout time.Duration, - callbacks []func(*provider.RelayerTxResponse, error), + callbacks []func(*sdk.TxResponse, error), ) { - res, err := cc.waitForBlockInclusion(ctx, txHash, waitTimeout) + res, err := waitForBlockInclusion(ctx, rpcClient, txConfig, txHash, waitTimeout) if err != nil { log.Error("Failed to wait for block inclusion", zap.Error(err)) if len(callbacks) > 0 { @@ -395,7 +417,7 @@ func waitForTx( if res.Code != 0 { // Check for any registered SDK errors - err := cc.sdkError(res.Codespace, res.Code) + err := sdkError(res.Codespace, res.Code) if err == nil { err = fmt.Errorf("transaction failed to execute: codespace: %s, code: %d, log: %s", res.Codespace, res.Code, res.RawLog) } @@ -405,17 +427,115 @@ func waitForTx( cb(nil, err) } } - LogFailedTx(log, rlyResp, nil, msgs) + LogFailedTx(log, chainId, rlyResp, nil, msgs) return } if len(callbacks) > 0 { for _, cb := range callbacks { //Call each callback in order since waitForTx is already invoked asyncronously - cb(rlyResp, nil) + cb(res, nil) + } + } + LogSuccessTx(log, chainId, cdc, res, msgs) +} + +// waitForBlockInclusion will wait for a transaction to be included in a block, up to waitTimeout or context cancellation. +func waitForBlockInclusion( + ctx context.Context, + rpcClient *strangeloveclient.Client, + txConfig client.TxConfig, + txHash []byte, + waitTimeout time.Duration, +) (*sdk.TxResponse, error) { + exitAfter := time.After(waitTimeout) + for { + select { + case <-exitAfter: + return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, rlycosmos.ErrTimeoutAfterWaitingForTxBroadcast) + // This fixed poll is fine because it's only for logging and updating prometheus metrics currently. + case <-time.After(time.Millisecond * 100): + res, err := rpcClient.Tx(ctx, txHash, false) + if err == nil { + return mkTxResult(convertResultTx(res), txConfig) + } + if strings.Contains(err.Error(), "transaction indexing is disabled") { + return nil, fmt.Errorf("cannot determine success/failure of tx because transaction indexing is disabled on rpc url") + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +func convertResultTx(res *strangeloveclient.TxResponse) *coretypes.ResultTx { + return &coretypes.ResultTx{ + Hash: bytes.HexBytes(res.Hash), + Height: res.Height, + Index: res.Index, + TxResult: abci.ExecTxResult{ + Code: res.ExecTx.Code, + Data: res.ExecTx.Data, + Log: res.ExecTx.Log, + Info: res.ExecTx.Info, + GasWanted: res.ExecTx.GasWanted, + GasUsed: res.ExecTx.GasUsed, + Events: converStringEvents(res.ExecTx.Events), + Codespace: res.ExecTx.Codespace, + }, + Tx: tmtypes.Tx(res.Tx), + Proof: tmtypes.TxProof{ + RootHash: bytes.HexBytes(res.Proof.RootHash), + Data: tmtypes.Tx(res.Proof.Data), + Proof: merkle.Proof{ + Total: res.Proof.Proof.Total, + Index: res.Proof.Proof.Index, + LeafHash: res.Proof.Proof.LeafHash, + Aunts: res.Proof.Proof.Aunts, + }, + }, + } +} + +func converStringEvents(events sdk.StringEvents) []abci.Event { + evts := make([]abci.Event, len(events)) + + for i, evt := range events { + attributes := make([]abci.EventAttribute, len(evt.Attributes)) + + for j, attr := range evt.Attributes { + attributes[j] = abci.EventAttribute{ + Key: attr.Key, + Value: attr.Value, + } + } + + evts[i] = abci.Event{ + Type: evt.Type, + Attributes: attributes, } } - cc.LogSuccessTx(res, msgs) + + return evts +} + +// mkTxResult decodes a comet transaction into an SDK TxResponse. +func mkTxResult( + resTx *coretypes.ResultTx, + txConfig client.TxConfig, +) (*sdk.TxResponse, error) { + txbz, err := txConfig.TxDecoder()(resTx.Tx) + if err != nil { + return nil, err + } + + p, ok := txbz.(intoAny) + if !ok { + return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txbz) + } + + any := p.AsAny() + return sdk.NewResponseResultTx(resTx, any, ""), nil } func AccessKeyWithLock(keyDir string, accessFunc func()) error { @@ -671,79 +791,8 @@ func sdkError(codespace string, code uint32) error { return nil } -// LogFailedTx takes the transaction and the messages to create it and logs the appropriate data -func LogFailedTx(log *zap.Logger, chainId string, res *provider.RelayerTxResponse, err error, msgs []provider.RelayerMessage) { - // Include the chain_id - fields := []zapcore.Field{zap.String("chain_id", chainId)} - - // Extract the channels from the events, if present - if res != nil { - channels := getChannelsIfPresent(res.Events) - fields = append(fields, channels...) - } - fields = append(fields, msgTypesField(msgs)) - - if err != nil { - - if errors.Is(err, chantypes.ErrRedundantTx) { - log.Debug("Redundant message(s)", fields...) - return - } - - // Make a copy since we may continue to the warning - errorFields := append(fields, zap.Error(err)) - log.Error( - "Failed sending cosmos transaction", - errorFields..., - ) - - if res == nil { - return - } - } - - if res.Code != 0 { - if sdkErr := cc.sdkError(res.Codespace, res.Code); err != nil { - fields = append(fields, zap.NamedError("sdk_error", sdkErr)) - } - fields = append(fields, zap.Object("response", res)) - cc.log.Warn( - "Sent transaction but received failure response", - fields..., - ) - } -} - -// getChannelsIfPresent scans the events for channel tags -func getChannelsIfPresent(events []provider.RelayerEvent) []zapcore.Field { - channelTags := []string{srcChanTag, dstChanTag} - fields := []zap.Field{} - - // While a transaction may have multiple messages, we just need to first - // pair of channels - foundTag := map[string]struct{}{} - - for _, event := range events { - for _, tag := range channelTags { - for attributeKey, attributeValue := range event.Attributes { - if attributeKey == tag { - // Only append the tag once - // TODO: what if they are different? - if _, ok := foundTag[tag]; !ok { - fields = append(fields, zap.String(tag, attributeValue)) - foundTag[tag] = struct{}{} - } - } - } - } - } - return fields -} - -func msgTypesField(msgs []provider.RelayerMessage) zap.Field { - msgTypes := make([]string, len(msgs)) - for i, m := range msgs { - msgTypes[i] = m.Type() - } - return zap.Strings("msg_types", msgTypes) +// Deprecated: this interface is used only internally for scenario we are +// deprecating (StdTxConfig support) +type intoAny interface { + AsAny() *codectypes.Any } diff --git a/clientcontroller/babylon_msg_log.go b/clientcontroller/babylon_msg_log.go new file mode 100644 index 0000000..aeb0ef0 --- /dev/null +++ b/clientcontroller/babylon_msg_log.go @@ -0,0 +1,224 @@ +package clientcontroller + +import ( + "errors" + "reflect" + + "github.com/cosmos/cosmos-sdk/codec" + sdk "github.com/cosmos/cosmos-sdk/types" + typestx "github.com/cosmos/cosmos-sdk/types/tx" + feetypes "github.com/cosmos/ibc-go/v8/modules/apps/29-fee/types" + transfertypes "github.com/cosmos/ibc-go/v8/modules/apps/transfer/types" + clienttypes "github.com/cosmos/ibc-go/v8/modules/core/02-client/types" + chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/provider" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// LogFailedTx takes the transaction and the messages to create it and logs the appropriate data +func LogFailedTx(log *zap.Logger, chainId string, res *provider.RelayerTxResponse, err error, msgs []sdk.Msg) { + // Include the chain_id + fields := []zapcore.Field{zap.String("chain_id", chainId)} + + // Extract the channels from the events, if present + if res != nil { + channels := getChannelsIfPresent(res.Events) + fields = append(fields, channels...) + } + fields = append(fields, msgTypesField(msgs)) + + if err != nil { + + if errors.Is(err, chantypes.ErrRedundantTx) { + log.Debug("Redundant message(s)", fields...) + return + } + + // Make a copy since we may continue to the warning + errorFields := append(fields, zap.Error(err)) + log.Error( + "Failed sending cosmos transaction", + errorFields..., + ) + + if res == nil { + return + } + } + + if res.Code != 0 { + if sdkErr := sdkError(res.Codespace, res.Code); err != nil { + fields = append(fields, zap.NamedError("sdk_error", sdkErr)) + } + fields = append(fields, zap.Object("response", res)) + log.Warn( + "Sent transaction but received failure response", + fields..., + ) + } +} + +// LogSuccessTx take the transaction and the messages to create it and logs the appropriate data +func LogSuccessTx(log *zap.Logger, chainId string, cdc *codec.ProtoCodec, res *sdk.TxResponse, msgs []sdk.Msg) { + // Include the chain_id + fields := []zapcore.Field{zap.String("chain_id", chainId)} + + // Extract the channels from the events, if present + if res != nil { + events := parseEventsFromTxResponse(res) + fields = append(fields, getChannelsIfPresent(events)...) + } + + // Include the gas used + fields = append(fields, zap.Int64("gas_used", res.GasUsed)) + + var m sdk.Msg + if err := cdc.InterfaceRegistry().UnpackAny(res.Tx, &m); err == nil { + if tx, ok := m.(*typestx.Tx); ok { + fields = append(fields, zap.Stringer("fees", tx.GetFee())) + if feePayer := getFeePayer(log, cdc, tx); feePayer != "" { + fields = append(fields, zap.String("fee_payer", feePayer)) + } + } else { + log.Debug( + "Failed to convert message to Tx type", + zap.Stringer("type", reflect.TypeOf(m)), + ) + } + } else { + log.Debug("Failed to unpack response Tx into sdk.Msg", zap.Error(err)) + } + + // Include the height, msgType, and tx_hash + fields = append(fields, + zap.Int64("height", res.Height), + msgTypesField(msgs), + zap.String("tx_hash", res.TxHash), + ) + + // Log the successful transaction with fields + log.Info( + "Successful transaction", + fields..., + ) +} + +// getChannelsIfPresent scans the events for channel tags +func getChannelsIfPresent(events []provider.RelayerEvent) []zapcore.Field { + channelTags := []string{srcChanTag, dstChanTag} + fields := []zap.Field{} + + // While a transaction may have multiple messages, we just need to first + // pair of channels + foundTag := map[string]struct{}{} + + for _, event := range events { + for _, tag := range channelTags { + for attributeKey, attributeValue := range event.Attributes { + if attributeKey == tag { + // Only append the tag once + // TODO: what if they are different? + if _, ok := foundTag[tag]; !ok { + fields = append(fields, zap.String(tag, attributeValue)) + foundTag[tag] = struct{}{} + } + } + } + } + } + return fields +} + +func msgTypesField(msgs []sdk.Msg) zap.Field { + msgTypes := make([]string, len(msgs)) + for i, m := range msgs { + msgTypes[i] = sdk.MsgTypeURL(m) + } + return zap.Strings("msg_types", msgTypes) +} + +// getFeePayer returns the bech32 address of the fee payer of a transaction. +// This uses the fee payer field if set, +// otherwise falls back to the address of whoever signed the first message. +func getFeePayer(log *zap.Logger, cdc *codec.ProtoCodec, tx *typestx.Tx) string { + payer := tx.AuthInfo.Fee.Payer + if payer != "" { + return payer + } + + switch firstMsg := tx.GetMsgs()[0].(type) { + case *transfertypes.MsgTransfer: + // There is a possible data race around concurrent map access + // in the cosmos sdk when it converts the address from bech32. + // We don't need the address conversion; just the sender is all that + // GetSigners is doing under the hood anyway. + return firstMsg.Sender + case *clienttypes.MsgCreateClient: + // Without this particular special case, there is a panic in ibc-go + // due to the sdk config singleton expecting one bech32 prefix but seeing another. + return firstMsg.Signer + case *clienttypes.MsgUpdateClient: + // Same failure mode as MsgCreateClient. + return firstMsg.Signer + case *clienttypes.MsgUpgradeClient: + return firstMsg.Signer + case *clienttypes.MsgSubmitMisbehaviour: + // Same failure mode as MsgCreateClient. + return firstMsg.Signer + case *feetypes.MsgRegisterPayee: + return firstMsg.Relayer + case *feetypes.MsgRegisterCounterpartyPayee: + return firstMsg.Relayer + case *feetypes.MsgPayPacketFee: + return firstMsg.Signer + case *feetypes.MsgPayPacketFeeAsync: + return firstMsg.PacketFee.RefundAddress + default: + signers, _, err := cdc.GetMsgV1Signers(firstMsg) + if err != nil { + log.Info("Could not get signers for msg when attempting to get the fee payer", zap.Error(err)) + return "" + } + + return string(signers[0]) + } +} + +func parseEventsFromTxResponse(resp *sdk.TxResponse) []provider.RelayerEvent { + var events []provider.RelayerEvent + + if resp == nil { + return events + } + + for _, logs := range resp.Logs { + for _, event := range logs.Events { + attributes := make(map[string]string) + for _, attribute := range event.Attributes { + attributes[attribute.Key] = attribute.Value + } + events = append(events, provider.RelayerEvent{ + EventType: event.Type, + Attributes: attributes, + }) + } + } + + // After SDK v0.50, indexed events are no longer provided in the logs on + // transaction execution, the response events can be directly used + if len(events) == 0 { + for _, event := range resp.Events { + attributes := make(map[string]string) + for _, attribute := range event.Attributes { + attributes[attribute.Key] = attribute.Value + } + events = append(events, provider.RelayerEvent{ + EventType: event.Type, + Attributes: attributes, + }) + } + } + + return events +}