Skip to content

Commit

Permalink
chore: build message complete
Browse files Browse the repository at this point in the history
  • Loading branch information
RafilxTenfen committed Jan 18, 2025
1 parent 1aed9a0 commit c8f253a
Show file tree
Hide file tree
Showing 2 changed files with 371 additions and 98 deletions.
245 changes: 147 additions & 98 deletions clientcontroller/babylon_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@ import (
"cosmossdk.io/store/rootmulti"
"github.com/avast/retry-go/v4"
bbn "github.com/babylonlabs-io/babylon/app"
appparams "github.com/babylonlabs-io/babylon/app/params"
"github.com/babylonlabs-io/babylon/client/config"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/crypto/merkle"
"github.com/cometbft/cometbft/libs/bytes"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/tx"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
"github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sdk "github.com/cosmos/cosmos-sdk/types"
legacyerrors "github.com/cosmos/cosmos-sdk/types/errors"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/types/tx/signing"
chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
rlycosmos "github.com/cosmos/relayer/v2/chains/cosmos"

Check failure on line 37 in clientcontroller/babylon_msg.go

View workflow job for this annotation

GitHub Actions / lint_test / build

cannot find module providing package github.com/cosmos/relayer/v2/chains/cosmos: import lookup disabled by -mod=readonly

Check failure on line 37 in clientcontroller/babylon_msg.go

View workflow job for this annotation

GitHub Actions / lint_test / integration-tests

no required module provides package github.com/cosmos/relayer/v2/chains/cosmos; to add it:

Check failure on line 37 in clientcontroller/babylon_msg.go

View workflow job for this annotation

GitHub Actions / lint_test / integration-tests

cannot find module providing package github.com/cosmos/relayer/v2/chains/cosmos: import lookup disabled by -mod=readonly

Check failure on line 37 in clientcontroller/babylon_msg.go

View workflow job for this annotation

GitHub Actions / lint_test / unit-tests

no required module provides package github.com/cosmos/relayer/v2/chains/cosmos; to add it:
"github.com/cosmos/relayer/v2/relayer/provider"
"github.com/juju/fslock"
abcistrange "github.com/strangelove-ventures/cometbft-client/abci/types"
strangeloveclient "github.com/strangelove-ventures/cometbft-client/client"
rpcclient "github.com/strangelove-ventures/cometbft-client/rpc/client"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -56,22 +62,31 @@ func reliablySendMsgsAsMultipleTxs(
msgs []sdk.Msg,
) error {

encCfg := bbn.GetEncodingConfig()

Check failure on line 65 in clientcontroller/babylon_msg.go

View workflow job for this annotation

GitHub Actions / lint_test / lint

declared and not used: encCfg (typecheck)

c, err := strangeloveclient.NewClient(cfg.RPCAddr, cfg.Timeout)

Check failure on line 67 in clientcontroller/babylon_msg.go

View workflow job for this annotation

GitHub Actions / lint_test / lint

declared and not used: c (typecheck)
if err != nil {
return err
}

return nil
ctx := context.Background()
return ReliablySendMsgs(ctx, cfg)

Check failure on line 73 in clientcontroller/babylon_msg.go

View workflow job for this annotation

GitHub Actions / lint_test / lint

not enough arguments in call to ReliablySendMsgs
}

// ReliablySendMsgs reliably sends a list of messages to the chain.
// It utilizes a file lock as well as a keyring lock to ensure atomic access.
func ReliablySendMsgs(
ctx context.Context,
logger *zap.Logger,
cfg *config.BabylonConfig,
logger *zap.Logger,
cometClient client.CometRPC,
rpcClient *strangeloveclient.Client,
encCfg *appparams.EncodingConfig,
ar client.AccountRetriever,

accSequence uint64,
msgs []sdk.Msg,
expectedErrors, unrecoverableErrors []*errors.Error,
expectedErrors, unrecoverableErrors []*sdkerrors.Error,
) (*sdk.TxResponse, error) {
var (
rlyResp *sdk.TxResponse
Expand All @@ -90,7 +105,7 @@ func ReliablySendMsgs(
if err := retry.Do(func() error {
var sendMsgErr error
krErr := AccessKeyWithLock(cfg.KeyDirectory, func() {
sendMsgErr = c.provider.SendMessagesToMempool(ctx, msgs, "", ctx, []func(*sdk.TxResponse, error){callback})
sendMsgErr = SendMessagesToMempool(ctx, cfg, logger, cometClient, rpcClient, encCfg, ar, msgs, "", accSequence, []func(*sdk.TxResponse, error){callback})
})
if krErr != nil {
logger.Error("unrecoverable err when submitting the tx, skip retrying", zap.Error(krErr))
Expand Down Expand Up @@ -146,28 +161,30 @@ func ReliablySendMsgs(
func SendMessagesToMempool(
ctx context.Context,
cfg *config.BabylonConfig,
logger *zap.Logger,
cometClient client.CometRPC,
rpcClient *strangeloveclient.Client,
encCfg *appparams.EncodingConfig,
ar client.AccountRetriever,

msgs []sdk.Msg,
memo string,

txSignerKey string,
sequence uint64,

asyncCtx context.Context,
asyncCallbacks []func(sdk.TxResponse, error),
asyncCallbacks []func(*sdk.TxResponse, error),
) error {
txSignerKey := cfg.Key

txBytes, fees, err := BuildMessages(
ctx, cfg, cometClient, rpcClient, ar, msgs, memo, 0, txSignerKey, sequence,
ctx, cfg, cometClient, rpcClient, encCfg, ar, msgs, memo, 0, txSignerKey, sequence,
)
if err != nil {
return err
}

if err := cc.broadcastTx(ctx, txBytes, msgs, fees, asyncCtx, defaultBroadcastWaitTimeout, asyncCallbacks); err != nil {
err = BroadcastTx(ctx, logger, cfg, encCfg, rpcClient, txBytes, msgs, fees, ctx, defaultBroadcastWaitTimeout, asyncCallbacks)
if err != nil {
return err
}

Expand All @@ -179,6 +196,7 @@ func BuildMessages(
cfg *config.BabylonConfig,
cometClient client.CometRPC,
rpcClient *strangeloveclient.Client,
encCfg *appparams.EncodingConfig,
ar client.AccountRetriever,
msgs []sdk.Msg,
memo string,
Expand All @@ -190,7 +208,6 @@ func BuildMessages(
fees sdk.Coins,
err error,
) {
encCfg := bbn.GetEncodingConfig()

keybase, err := keyring.New(
cfg.ChainID,
Expand Down Expand Up @@ -262,15 +279,16 @@ func BroadcastTx(
ctx context.Context, // context for tx broadcast
logger *zap.Logger,
cfg *config.BabylonConfig,
encCfg *appparams.EncodingConfig,

rpcClient *strangeloveclient.Client,
tx []byte, // raw tx to be broadcasted
msgs []provider.RelayerMessage, // used for logging only
msgs []sdk.Msg, // used for logging only
fees sdk.Coins, // used for metrics

asyncCtx context.Context, // context for async wait for block inclusion after successful tx broadcast
asyncTimeout time.Duration, // timeout for waiting for block inclusion
asyncCallbacks []func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion
asyncCallbacks []func(*sdk.TxResponse, error), // callback for success/fail of the wait for block inclusion
) error {
res, err := rpcClient.BroadcastTxSync(ctx, tx)
isErr := err != nil
Expand Down Expand Up @@ -299,8 +317,8 @@ func BroadcastTx(

// TODO: maybe we need to check if the node has tx indexing enabled?
// if not, we need to find a new way to block until inclusion in a block

go cc.waitForTx(asyncCtx, res.Hash, msgs, asyncTimeout, asyncCallbacks)
protoCdc := codec.NewProtoCodec(encCfg.InterfaceRegistry)
go waitForTx(asyncCtx, logger, rpcClient, protoCdc, encCfg.TxConfig, cfg.ChainID, res.Hash, msgs, asyncTimeout, asyncCallbacks)

return nil
}
Expand Down Expand Up @@ -363,12 +381,16 @@ func CalculateGas(
func waitForTx(
ctx context.Context,
log *zap.Logger,
rpcClient *strangeloveclient.Client,
cdc *codec.ProtoCodec,
txConfig client.TxConfig,
chainId string,
txHash []byte,
msgs []provider.RelayerMessage, // used for logging only
msgs []sdk.Msg, // used for logging only
waitTimeout time.Duration,
callbacks []func(*provider.RelayerTxResponse, error),
callbacks []func(*sdk.TxResponse, error),
) {
res, err := cc.waitForBlockInclusion(ctx, txHash, waitTimeout)
res, err := waitForBlockInclusion(ctx, rpcClient, txConfig, txHash, waitTimeout)
if err != nil {
log.Error("Failed to wait for block inclusion", zap.Error(err))
if len(callbacks) > 0 {
Expand All @@ -395,7 +417,7 @@ func waitForTx(

if res.Code != 0 {
// Check for any registered SDK errors
err := cc.sdkError(res.Codespace, res.Code)
err := sdkError(res.Codespace, res.Code)
if err == nil {
err = fmt.Errorf("transaction failed to execute: codespace: %s, code: %d, log: %s", res.Codespace, res.Code, res.RawLog)
}
Expand All @@ -405,17 +427,115 @@ func waitForTx(
cb(nil, err)
}
}
LogFailedTx(log, rlyResp, nil, msgs)
LogFailedTx(log, chainId, rlyResp, nil, msgs)
return
}

if len(callbacks) > 0 {
for _, cb := range callbacks {
//Call each callback in order since waitForTx is already invoked asyncronously
cb(rlyResp, nil)
cb(res, nil)
}
}
LogSuccessTx(log, chainId, cdc, res, msgs)
}

// waitForBlockInclusion will wait for a transaction to be included in a block, up to waitTimeout or context cancellation.
func waitForBlockInclusion(
ctx context.Context,
rpcClient *strangeloveclient.Client,
txConfig client.TxConfig,
txHash []byte,
waitTimeout time.Duration,
) (*sdk.TxResponse, error) {
exitAfter := time.After(waitTimeout)
for {
select {
case <-exitAfter:
return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, rlycosmos.ErrTimeoutAfterWaitingForTxBroadcast)
// This fixed poll is fine because it's only for logging and updating prometheus metrics currently.
case <-time.After(time.Millisecond * 100):
res, err := rpcClient.Tx(ctx, txHash, false)
if err == nil {
return mkTxResult(convertResultTx(res), txConfig)
}
if strings.Contains(err.Error(), "transaction indexing is disabled") {
return nil, fmt.Errorf("cannot determine success/failure of tx because transaction indexing is disabled on rpc url")
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

func convertResultTx(res *strangeloveclient.TxResponse) *coretypes.ResultTx {
return &coretypes.ResultTx{
Hash: bytes.HexBytes(res.Hash),
Height: res.Height,
Index: res.Index,
TxResult: abci.ExecTxResult{
Code: res.ExecTx.Code,
Data: res.ExecTx.Data,
Log: res.ExecTx.Log,
Info: res.ExecTx.Info,
GasWanted: res.ExecTx.GasWanted,
GasUsed: res.ExecTx.GasUsed,
Events: converStringEvents(res.ExecTx.Events),
Codespace: res.ExecTx.Codespace,
},
Tx: tmtypes.Tx(res.Tx),
Proof: tmtypes.TxProof{
RootHash: bytes.HexBytes(res.Proof.RootHash),
Data: tmtypes.Tx(res.Proof.Data),
Proof: merkle.Proof{
Total: res.Proof.Proof.Total,
Index: res.Proof.Proof.Index,
LeafHash: res.Proof.Proof.LeafHash,
Aunts: res.Proof.Proof.Aunts,
},
},
}
}

func converStringEvents(events sdk.StringEvents) []abci.Event {
evts := make([]abci.Event, len(events))

for i, evt := range events {
attributes := make([]abci.EventAttribute, len(evt.Attributes))

for j, attr := range evt.Attributes {
attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
}
}

evts[i] = abci.Event{
Type: evt.Type,
Attributes: attributes,
}
}
cc.LogSuccessTx(res, msgs)

return evts
}

// mkTxResult decodes a comet transaction into an SDK TxResponse.
func mkTxResult(
resTx *coretypes.ResultTx,
txConfig client.TxConfig,
) (*sdk.TxResponse, error) {
txbz, err := txConfig.TxDecoder()(resTx.Tx)
if err != nil {
return nil, err
}

p, ok := txbz.(intoAny)
if !ok {
return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txbz)
}

any := p.AsAny()
return sdk.NewResponseResultTx(resTx, any, ""), nil
}

func AccessKeyWithLock(keyDir string, accessFunc func()) error {
Expand Down Expand Up @@ -671,79 +791,8 @@ func sdkError(codespace string, code uint32) error {
return nil
}

// LogFailedTx takes the transaction and the messages to create it and logs the appropriate data
func LogFailedTx(log *zap.Logger, chainId string, res *provider.RelayerTxResponse, err error, msgs []provider.RelayerMessage) {
// Include the chain_id
fields := []zapcore.Field{zap.String("chain_id", chainId)}

// Extract the channels from the events, if present
if res != nil {
channels := getChannelsIfPresent(res.Events)
fields = append(fields, channels...)
}
fields = append(fields, msgTypesField(msgs))

if err != nil {

if errors.Is(err, chantypes.ErrRedundantTx) {
log.Debug("Redundant message(s)", fields...)
return
}

// Make a copy since we may continue to the warning
errorFields := append(fields, zap.Error(err))
log.Error(
"Failed sending cosmos transaction",
errorFields...,
)

if res == nil {
return
}
}

if res.Code != 0 {
if sdkErr := cc.sdkError(res.Codespace, res.Code); err != nil {
fields = append(fields, zap.NamedError("sdk_error", sdkErr))
}
fields = append(fields, zap.Object("response", res))
cc.log.Warn(
"Sent transaction but received failure response",
fields...,
)
}
}

// getChannelsIfPresent scans the events for channel tags
func getChannelsIfPresent(events []provider.RelayerEvent) []zapcore.Field {
channelTags := []string{srcChanTag, dstChanTag}
fields := []zap.Field{}

// While a transaction may have multiple messages, we just need to first
// pair of channels
foundTag := map[string]struct{}{}

for _, event := range events {
for _, tag := range channelTags {
for attributeKey, attributeValue := range event.Attributes {
if attributeKey == tag {
// Only append the tag once
// TODO: what if they are different?
if _, ok := foundTag[tag]; !ok {
fields = append(fields, zap.String(tag, attributeValue))
foundTag[tag] = struct{}{}
}
}
}
}
}
return fields
}

func msgTypesField(msgs []provider.RelayerMessage) zap.Field {
msgTypes := make([]string, len(msgs))
for i, m := range msgs {
msgTypes[i] = m.Type()
}
return zap.Strings("msg_types", msgTypes)
// Deprecated: this interface is used only internally for scenario we are
// deprecating (StdTxConfig support)
type intoAny interface {
AsAny() *codectypes.Any
}
Loading

0 comments on commit c8f253a

Please sign in to comment.