Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node/Solana: Reobservation by transaction ID #4223

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 111 additions & 67 deletions node/pkg/watchers/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"encoding/base64"
"encoding/hex"
"encoding/json"

"github.com/certusone/wormhole/node/pkg/common"
Expand Down Expand Up @@ -125,6 +126,16 @@ type (
}
)

const (
// NOTE: We have a test to make sure these constants don't change in solana-go.

// SolanaAccountLen is the expected length of an account identifier, which is a public key. Using the number here because that's what the admin client will populate.
SolanaAccountLen = 32

// SolanaSignatureLen is the expected length of a signature. As of v1.12.0, solana-go does not have a const for this.
SolanaSignatureLen = 64
)

var (
emptyAddressBytes = vaa.Address{}.Bytes()
emptyGapBytes = []byte{0, 0, 0}
Expand Down Expand Up @@ -370,13 +381,39 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
if m.ChainId != uint32(s.chainID) {
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
panic("unexpected chain id")
}

acc := solana.PublicKeyFromBytes(m.TxHash)
logger.Info("received observation request", zap.String("account", acc.String()))

rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
s.fetchMessageAccount(rCtx, logger, acc, 0, true)
cancel()
if len(m.TxHash) == SolanaAccountLen { // Request by account ID
acc := solana.PublicKeyFromBytes(m.TxHash)
logger.Info("received observation request with account id", zap.String("account", acc.String()))
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
s.fetchMessageAccount(rCtx, logger, acc, 0, true)
cancel()
} else if len(m.TxHash) == SolanaSignatureLen { // Request by transaction ID
signature := solana.SignatureFromBytes(m.TxHash)
logger.Info("received observation request with transaction id", zap.Stringer("signature", signature))
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
version := uint64(0)
result, err := s.rpcClient.GetTransaction(
rCtx,
signature,
&rpc.GetTransactionOpts{
MaxSupportedTransactionVersion: &version,
Encoding: solana.EncodingBase64,
},
)
cancel()
if err != nil {
logger.Error("failed to get transaction for observation request", zap.String("bytes", hex.EncodeToString(m.TxHash)), zap.Stringer("signature", signature), zap.Error(err))
} else {
tx, err := result.Transaction.GetTransaction()
if err != nil {
logger.Error("failed to extract transaction for observation request", zap.String("bytes", hex.EncodeToString(m.TxHash)), zap.Stringer("signature", signature), zap.Error(err))
} else {
s.processTransaction(ctx, logger, tx, result.Meta, result.Slot, true)
}
}
} else {
logger.Error("ignoring an observation request of unexpected length", zap.Int("len", len(m.TxHash)), zap.String("bytes", hex.EncodeToString(m.TxHash)))
}
case <-timer.C:
// Get current slot height
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
Expand Down Expand Up @@ -580,87 +617,94 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
continue
}

err = s.populateLookupTableAccounts(ctx, tx)
if err != nil {
logger.Error("failed to fetch lookup table accounts",
zap.Uint64("slot", slot),
zap.Int("txNum", txNum),
zap.Error(err),
)
continue
}
s.processTransaction(ctx, logger, tx, txRpc.Meta, slot, false)
}

signature := tx.Signatures[0]
var programIndex uint16
for n, key := range tx.Message.AccountKeys {
if key.Equals(s.contract) {
programIndex = uint16(n)
}
}
if programIndex == 0 {
continue
if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("skipped or unavailable block retrieved on retry attempt",
zap.Uint("empty_retry", emptyRetry),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

return true
}

// processTransaction processes a transaction and publishes any Wormhole events.
func (s *SolanaWatcher) processTransaction(ctx context.Context, logger *zap.Logger, tx *solana.Transaction, meta *rpc.TransactionMeta, slot uint64, isReobservation bool) {
signature := tx.Signatures[0]
err := s.populateLookupTableAccounts(ctx, tx)
if err != nil {
logger.Error("failed to fetch lookup table accounts for transaction",
zap.Uint64("slot", slot),
zap.Stringer("signature", signature),
zap.Error(err),
)
return
}

var programIndex uint16
for n, key := range tx.Message.AccountKeys {
if key.Equals(s.contract) {
programIndex = uint16(n)
}
}
if programIndex == 0 {
return
}

if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found Wormhole transaction",
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found Wormhole transaction",
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

// Find top-level instructions
for i, inst := range tx.Message.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found a top-level Wormhole instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}

// Find top-level instructions
for i, inst := range tx.Message.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
for outerIdx, inner := range meta.InnerInstructions {
for innerIdx, inst := range inner.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, innerIdx, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Int("outerIdx", outerIdx),
zap.Int("innerIdx", innerIdx),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
zap.String("commitment", string(s.commitment)))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found a top-level Wormhole instruction",
zap.Int("idx", i),
logger.Debug("found an inner Wormhole instruction",
zap.Int("outerIdx", outerIdx),
zap.Int("innerIdx", innerIdx),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}

for _, inner := range txRpc.Meta.InnerInstructions {
for i, inst := range inner.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found an inner Wormhole instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}
}
}

if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("skipped or unavailable block retrieved on retry attempt",
zap.Uint("empty_retry", emptyRetry),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

return true
}

func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int, isReobservation bool) (bool, error) {
Expand Down
15 changes: 15 additions & 0 deletions node/pkg/watchers/solana/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package solana

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/gagliardetto/solana-go"
)

func TestVerifyConstants(t *testing.T) {
// If either of these ever change, message publication and reobservation will break.
assert.Equal(t, SolanaAccountLen, solana.PublicKeyLength)
assert.Equal(t, SolanaSignatureLen, len(solana.Signature{}))
}
Loading