From 65fe7d2edab40ade90a67413fbce0bd49f3bc2dc Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 16 Jan 2025 11:52:10 -0600 Subject: [PATCH 1/2] Node/Solana: Reobservation by transaction ID --- node/pkg/watchers/solana/client.go | 195 +++++++++++++++--------- node/pkg/watchers/solana/client_test.go | 15 ++ 2 files changed, 141 insertions(+), 69 deletions(-) create mode 100644 node/pkg/watchers/solana/client_test.go diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index 6dae0a2506..941703c277 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -10,6 +10,7 @@ import ( "time" "encoding/base64" + "encoding/hex" "encoding/json" "github.com/certusone/wormhole/node/pkg/common" @@ -125,6 +126,16 @@ type ( } ) +const ( + // NOTE: We have a test to make sure these constants don't change in solana-go. + + // SolanaAccountLen is the expected length of an account identifier, which is a public key. Using the number here because that's what the admin client will populate. + SolanaAccountLen = 32 + + // SolanaSignatureLen is the expected length of a signature. As of v1.12.0, solana-go does not have a const for this. + SolanaSignatureLen = 64 +) + var ( emptyAddressBytes = vaa.Address{}.Bytes() emptyGapBytes = []byte{0, 0, 0} @@ -367,16 +378,40 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { return err } case m := <-s.obsvReqC: - if m.ChainId != uint32(s.chainID) { - panic("unexpected chain id") + if len(m.TxHash) == SolanaAccountLen { // Request by account ID + acc := solana.PublicKeyFromBytes(m.TxHash) + logger.Info("received observation request with account id", zap.String("account", acc.String())) + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + s.fetchMessageAccount(rCtx, logger, acc, 0, true) + cancel() + } else if len(m.TxHash) == SolanaSignatureLen { // Request by transaction ID + // Move this to a separate PR. + signature := solana.SignatureFromBytes(m.TxHash) + logger.Info("received observation request with transaction id", zap.Stringer("signature", signature)) + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + version := uint64(0) + result, err := s.rpcClient.GetTransaction( + rCtx, + signature, + &rpc.GetTransactionOpts{ + MaxSupportedTransactionVersion: &version, + Encoding: solana.EncodingBase64, + }, + ) + cancel() + if err != nil { + logger.Error("failed to get transaction for observation request", zap.String("bytes", hex.EncodeToString(m.TxHash)), zap.Stringer("signature", signature), zap.Error(err)) + } else { + tx, err := result.Transaction.GetTransaction() + if err != nil { + logger.Error("failed to extract transaction for observation request", zap.String("bytes", hex.EncodeToString(m.TxHash)), zap.Stringer("signature", signature), zap.Error(err)) + } else { + s.processTransaction(ctx, logger, tx, result.Meta, result.Slot, true) + } + } + } else { + logger.Error("ignoring an observation request of unexpected length", zap.Int("len", len(m.TxHash)), zap.String("bytes", hex.EncodeToString(m.TxHash))) } - - acc := solana.PublicKeyFromBytes(m.TxHash) - logger.Info("received observation request", zap.String("account", acc.String())) - - rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) - s.fetchMessageAccount(rCtx, logger, acc, 0, true) - cancel() case <-timer.C: // Get current slot height rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) @@ -580,87 +615,94 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot continue } - err = s.populateLookupTableAccounts(ctx, tx) - if err != nil { - logger.Error("failed to fetch lookup table accounts", - zap.Uint64("slot", slot), - zap.Int("txNum", txNum), - zap.Error(err), - ) - continue - } + s.processTransaction(ctx, logger, tx, txRpc.Meta, slot, false) + } - signature := tx.Signatures[0] - var programIndex uint16 - for n, key := range tx.Message.AccountKeys { - if key.Equals(s.contract) { - programIndex = uint16(n) - } - } - if programIndex == 0 { - continue + if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("skipped or unavailable block retrieved on retry attempt", + zap.Uint("empty_retry", emptyRetry), + zap.Uint64("slot", slot), + zap.String("commitment", string(s.commitment))) + } + + return true +} + +// processTransaction processes a transaction and publishes any Wormhole events. +func (s *SolanaWatcher) processTransaction(ctx context.Context, logger *zap.Logger, tx *solana.Transaction, meta *rpc.TransactionMeta, slot uint64, isReobservation bool) { + signature := tx.Signatures[0] + err := s.populateLookupTableAccounts(ctx, tx) + if err != nil { + logger.Error("failed to fetch lookup table accounts for transaction", + zap.Uint64("slot", slot), + zap.Stringer("signature", signature), + zap.Error(err), + ) + return + } + + var programIndex uint16 + for n, key := range tx.Message.AccountKeys { + if key.Equals(s.contract) { + programIndex = uint16(n) } + } + if programIndex == 0 { + return + } - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("found Wormhole transaction", + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("found Wormhole transaction", + zap.Stringer("signature", signature), + zap.Uint64("slot", slot), + zap.String("commitment", string(s.commitment))) + } + + // Find top-level instructions + for i, inst := range tx.Message.Instructions { + found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation) + if err != nil { + logger.Error("malformed Wormhole instruction", + zap.Error(err), + zap.Int("idx", i), zap.Stringer("signature", signature), zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment))) + zap.String("commitment", string(s.commitment)), + zap.Binary("data", inst.Data)) + } else if found { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("found a top-level Wormhole instruction", + zap.Int("idx", i), + zap.Stringer("signature", signature), + zap.Uint64("slot", slot), + zap.String("commitment", string(s.commitment))) + } } + } - // Find top-level instructions - for i, inst := range tx.Message.Instructions { - found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation) + for outerIdx, inner := range meta.InnerInstructions { + for innerIdx, inst := range inner.Instructions { + found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, innerIdx, isReobservation) if err != nil { logger.Error("malformed Wormhole instruction", zap.Error(err), - zap.Int("idx", i), + zap.Int("outerIdx", outerIdx), + zap.Int("innerIdx", innerIdx), zap.Stringer("signature", signature), zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment)), - zap.Binary("data", inst.Data)) + zap.String("commitment", string(s.commitment))) } else if found { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("found a top-level Wormhole instruction", - zap.Int("idx", i), + logger.Debug("found an inner Wormhole instruction", + zap.Int("outerIdx", outerIdx), + zap.Int("innerIdx", innerIdx), zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.String("commitment", string(s.commitment))) } } } - - for _, inner := range txRpc.Meta.InnerInstructions { - for i, inst := range inner.Instructions { - found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation) - if err != nil { - logger.Error("malformed Wormhole instruction", - zap.Error(err), - zap.Int("idx", i), - zap.Stringer("signature", signature), - zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment))) - } else if found { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("found an inner Wormhole instruction", - zap.Int("idx", i), - zap.Stringer("signature", signature), - zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment))) - } - } - } - } - } - - if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("skipped or unavailable block retrieved on retry attempt", - zap.Uint("empty_retry", emptyRetry), - zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment))) } - - return true } func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int, isReobservation bool) (bool, error) { @@ -933,6 +975,21 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a Unreliable: !reliable, } + if !reliable && len(observation.Payload) == 0 { + logger.Debug("ignoring an observation because it is marked unreliable and has a zero length payload, probably from the shim", + zap.Stringer("account", acc), + zap.Time("timestamp", observation.Timestamp), + zap.Uint32("nonce", observation.Nonce), + zap.Uint64("sequence", observation.Sequence), + zap.Stringer("emitter_chain", observation.EmitterChain), + zap.Stringer("emitter_address", observation.EmitterAddress), + zap.Bool("isReobservation", isReobservation), + zap.Binary("payload", observation.Payload), + zap.Uint8("consistency_level", observation.ConsistencyLevel), + ) + return + } + solanaMessagesConfirmed.WithLabelValues(s.networkName).Inc() if logger.Level().Enabled(s.msgObservedLogLevel) { diff --git a/node/pkg/watchers/solana/client_test.go b/node/pkg/watchers/solana/client_test.go new file mode 100644 index 0000000000..baf1af2400 --- /dev/null +++ b/node/pkg/watchers/solana/client_test.go @@ -0,0 +1,15 @@ +package solana + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/gagliardetto/solana-go" +) + +func TestVerifyConstants(t *testing.T) { + // If either of these ever change, message publication and reobservation will break. + assert.Equal(t, SolanaAccountLen, solana.PublicKeyLength) + assert.Equal(t, SolanaSignatureLen, len(solana.Signature{})) +} From 30c0e97822f7c26fed3b45e69b7c9190c6d03a2d Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 16 Jan 2025 14:11:03 -0600 Subject: [PATCH 2/2] Code review rework --- node/pkg/watchers/solana/client.go | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index 941703c277..4848fe2912 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -378,6 +378,9 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { return err } case m := <-s.obsvReqC: + if m.ChainId != uint32(s.chainID) { + panic("unexpected chain id") + } if len(m.TxHash) == SolanaAccountLen { // Request by account ID acc := solana.PublicKeyFromBytes(m.TxHash) logger.Info("received observation request with account id", zap.String("account", acc.String())) @@ -385,7 +388,6 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { s.fetchMessageAccount(rCtx, logger, acc, 0, true) cancel() } else if len(m.TxHash) == SolanaSignatureLen { // Request by transaction ID - // Move this to a separate PR. signature := solana.SignatureFromBytes(m.TxHash) logger.Info("received observation request with transaction id", zap.Stringer("signature", signature)) rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) @@ -975,21 +977,6 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a Unreliable: !reliable, } - if !reliable && len(observation.Payload) == 0 { - logger.Debug("ignoring an observation because it is marked unreliable and has a zero length payload, probably from the shim", - zap.Stringer("account", acc), - zap.Time("timestamp", observation.Timestamp), - zap.Uint32("nonce", observation.Nonce), - zap.Uint64("sequence", observation.Sequence), - zap.Stringer("emitter_chain", observation.EmitterChain), - zap.Stringer("emitter_address", observation.EmitterAddress), - zap.Bool("isReobservation", isReobservation), - zap.Binary("payload", observation.Payload), - zap.Uint8("consistency_level", observation.ConsistencyLevel), - ) - return - } - solanaMessagesConfirmed.WithLabelValues(s.networkName).Inc() if logger.Level().Enabled(s.msgObservedLogLevel) {