forked from wormhole-foundation/wormhole
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
d5d6d96
commit 3652be8
Showing
1 changed file
with
264 additions
and
104 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,130 +1,290 @@ | ||
package processor | ||
package db | ||
|
||
import ( | ||
"encoding/hex" | ||
"time" | ||
|
||
"github.com/mr-tron/base58" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"bytes" | ||
"crypto/ecdsa" | ||
"crypto/rand" | ||
"fmt" | ||
math_rand "math/rand" | ||
"os" | ||
"runtime" | ||
"sync" | ||
"sync/atomic" | ||
|
||
ethCommon "github.com/ethereum/go-ethereum/common" | ||
"github.com/dgraph-io/badger/v3" | ||
"github.com/ethereum/go-ethereum/crypto" | ||
"github.com/wormhole-foundation/wormhole/sdk/vaa" | ||
"go.uber.org/zap" | ||
"go.uber.org/zap/zapcore" | ||
|
||
"github.com/certusone/wormhole/node/pkg/common" | ||
"github.com/wormhole-foundation/wormhole/sdk/vaa" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
var ( | ||
// SECURITY: source_chain/target_chain are untrusted uint8 values. An attacker could cause a maximum of 255**2 label | ||
// pairs to be created, which is acceptable. | ||
func getVAA() vaa.VAA { | ||
return getVAAWithSeqNum(1) | ||
} | ||
|
||
messagesObservedTotal = promauto.NewCounterVec( | ||
prometheus.CounterOpts{ | ||
Name: "wormhole_message_observations_total", | ||
Help: "Total number of messages observed", | ||
}, | ||
[]string{"emitter_chain"}) | ||
) | ||
func getVAAWithSeqNum(seqNum uint64) vaa.VAA { | ||
var payload = []byte{97, 97, 97, 97, 97, 97} | ||
var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} | ||
|
||
return vaa.VAA{ | ||
Version: uint8(1), | ||
GuardianSetIndex: uint32(1), | ||
Signatures: nil, | ||
Timestamp: time.Unix(0, 0), | ||
Nonce: uint32(1), | ||
Sequence: seqNum, | ||
ConsistencyLevel: uint8(32), | ||
EmitterChain: vaa.ChainIDSolana, | ||
EmitterAddress: governanceEmitter, | ||
Payload: payload, | ||
} | ||
} | ||
|
||
// Testing the expected default behavior of a CreateGovernanceVAA | ||
func TestVaaIDFromString(t *testing.T) { | ||
vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1" | ||
vaaID, _ := VaaIDFromString(vaaIdString) | ||
expectAddr := vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} | ||
|
||
assert.Equal(t, vaa.ChainIDSolana, vaaID.EmitterChain) | ||
assert.Equal(t, expectAddr, vaaID.EmitterAddress) | ||
assert.Equal(t, uint64(1), vaaID.Sequence) | ||
} | ||
|
||
func TestVaaIDFromVAA(t *testing.T) { | ||
testVaa := getVAA() | ||
vaaID := VaaIDFromVAA(&testVaa) | ||
expectAddr := vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} | ||
|
||
assert.Equal(t, vaa.ChainIDSolana, vaaID.EmitterChain) | ||
assert.Equal(t, expectAddr, vaaID.EmitterAddress) | ||
assert.Equal(t, uint64(1), vaaID.Sequence) | ||
} | ||
|
||
func TestBytes(t *testing.T) { | ||
vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1" | ||
vaaID, _ := VaaIDFromString(vaaIdString) | ||
expected := []byte{0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x2f, 0x31, 0x2f, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x34, 0x2f, 0x31} | ||
|
||
assert.Equal(t, expected, vaaID.Bytes()) | ||
} | ||
|
||
func TestEmitterPrefixBytesWithChainIDAndAddress(t *testing.T) { | ||
vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1" | ||
vaaID, _ := VaaIDFromString(vaaIdString) | ||
expected := []byte{0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x2f, 0x31, 0x2f, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x34} | ||
|
||
assert.Equal(t, expected, vaaID.EmitterPrefixBytes()) | ||
} | ||
|
||
func TestEmitterPrefixBytesWithOnlyChainID(t *testing.T) { | ||
vaaID := VAAID{EmitterChain: vaa.ChainID(26)} | ||
assert.Equal(t, []byte("signed/26"), vaaID.EmitterPrefixBytes()) | ||
} | ||
|
||
func TestStoreSignedVAAUnsigned(t *testing.T) { | ||
dbPath := t.TempDir() | ||
db := OpenDb(zap.NewNop(), &dbPath) | ||
defer db.Close() | ||
defer os.Remove(dbPath) | ||
|
||
testVaa := getVAA() | ||
|
||
// Should panic because the VAA is not signed | ||
assert.Panics(t, func() { db.StoreSignedVAA(&testVaa) }, "The code did not panic") //nolint:errcheck | ||
} | ||
|
||
func TestStoreSignedVAASigned(t *testing.T) { | ||
dbPath := t.TempDir() | ||
db := OpenDb(zap.NewNop(), &dbPath) | ||
defer db.Close() | ||
defer os.Remove(dbPath) | ||
|
||
testVaa := getVAA() | ||
|
||
privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader) | ||
testVaa.AddSignature(privKey, 0) | ||
|
||
err2 := db.StoreSignedVAA(&testVaa) | ||
assert.NoError(t, err2) | ||
} | ||
|
||
func TestStoreSignedVAABatch(t *testing.T) { | ||
dbPath := t.TempDir() | ||
db := OpenDb(zap.NewNop(), &dbPath) | ||
defer db.Close() | ||
defer os.Remove(dbPath) | ||
|
||
// handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An | ||
// event may be received multiple times and must be handled in an idempotent fashion. | ||
func (p *Processor) handleMessage(k *common.MessagePublication) { | ||
if p.gs == nil { | ||
p.logger.Warn("dropping observation since we haven't initialized our guardian set yet", | ||
zap.String("message_id", k.MessageIDString()), | ||
zap.Uint32("nonce", k.Nonce), | ||
zap.Stringer("txhash", k.TxHash), | ||
zap.Time("timestamp", k.Timestamp), | ||
) | ||
return | ||
privKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) | ||
require.NoError(t, err) | ||
|
||
require.Less(t, int64(0), db.db.MaxBatchCount()) // In testing this was 104857. | ||
require.Less(t, int64(0), db.db.MaxBatchSize()) // In testing this was 10066329. | ||
|
||
// Make sure we exceed the max batch size. | ||
numVAAs := uint64(db.db.MaxBatchCount() + 1) | ||
|
||
// Build the VAA batch. | ||
vaaBatch := make([]*vaa.VAA, 0, numVAAs) | ||
for seqNum := uint64(0); seqNum < numVAAs; seqNum++ { | ||
v := getVAAWithSeqNum(seqNum) | ||
v.AddSignature(privKey, 0) | ||
vaaBatch = append(vaaBatch, &v) | ||
} | ||
|
||
messagesObservedTotal.WithLabelValues(k.EmitterChain.String()).Inc() | ||
|
||
// All nodes will create the exact same VAA and sign its digest. | ||
// Consensus is established on this digest. | ||
|
||
v := &VAA{ | ||
VAA: vaa.VAA{ | ||
Version: vaa.SupportedVAAVersion, | ||
GuardianSetIndex: p.gs.Index, | ||
Signatures: nil, | ||
Timestamp: k.Timestamp, | ||
Nonce: k.Nonce, | ||
EmitterChain: k.EmitterChain, | ||
EmitterAddress: k.EmitterAddress, | ||
Payload: k.Payload, | ||
Sequence: k.Sequence, | ||
ConsistencyLevel: k.ConsistencyLevel, | ||
}, | ||
Unreliable: k.Unreliable, | ||
Reobservation: k.IsReobservation, | ||
// Store the batch in the database. | ||
err = db.StoreSignedVAABatch(vaaBatch) | ||
require.NoError(t, err) | ||
|
||
// Verify all the VAAs are in the database. | ||
for _, v := range vaaBatch { | ||
storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) | ||
require.NoError(t, err) | ||
|
||
origBytes, err := v.Marshal() | ||
require.NoError(t, err) | ||
|
||
assert.True(t, bytes.Equal(origBytes, storedBytes)) | ||
} | ||
|
||
// Verify that updates work as well by tweaking the VAAs and rewriting them. | ||
for _, v := range vaaBatch { | ||
v.Nonce += 1 | ||
} | ||
|
||
// Generate digest of the unsigned VAA. | ||
digest := v.SigningDigest() | ||
hash := hex.EncodeToString(digest.Bytes()) | ||
// Store the updated batch in the database. | ||
err = db.StoreSignedVAABatch(vaaBatch) | ||
require.NoError(t, err) | ||
|
||
// Sign the digest using our node's guardian key. | ||
signature, err := crypto.Sign(digest.Bytes(), p.gk) | ||
if err != nil { | ||
panic(err) | ||
// Verify all the updated VAAs are in the database. | ||
for _, v := range vaaBatch { | ||
storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) | ||
require.NoError(t, err) | ||
|
||
origBytes, err := v.Marshal() | ||
require.NoError(t, err) | ||
|
||
assert.True(t, bytes.Equal(origBytes, storedBytes)) | ||
} | ||
} | ||
|
||
func TestGetSignedVAABytes(t *testing.T) { | ||
dbPath := t.TempDir() | ||
db := OpenDb(zap.NewNop(), &dbPath) | ||
defer db.Close() | ||
defer os.Remove(dbPath) | ||
|
||
testVaa := getVAA() | ||
|
||
vaaID := VaaIDFromVAA(&testVaa) | ||
|
||
privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader) | ||
testVaa.AddSignature(privKey, 0) | ||
|
||
// Store full VAA | ||
err2 := db.StoreSignedVAA(&testVaa) | ||
assert.NoError(t, err2) | ||
|
||
// Retrieve it using vaaID | ||
vaaBytes, err2 := db.GetSignedVAABytes(*vaaID) | ||
assert.NoError(t, err2) | ||
|
||
testVaaBytes, err3 := testVaa.Marshal() | ||
assert.NoError(t, err3) | ||
|
||
assert.Equal(t, testVaaBytes, vaaBytes) | ||
} | ||
|
||
func TestFindEmitterSequenceGap(t *testing.T) { | ||
dbPath := t.TempDir() | ||
db := OpenDb(zap.NewNop(), &dbPath) | ||
defer db.Close() | ||
defer os.Remove(dbPath) | ||
|
||
testVaa := getVAA() | ||
|
||
vaaID := VaaIDFromVAA(&testVaa) | ||
|
||
privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader) | ||
testVaa.AddSignature(privKey, 0) | ||
|
||
// Store full VAA | ||
err2 := db.StoreSignedVAA(&testVaa) | ||
assert.NoError(t, err2) | ||
|
||
resp, firstSeq, lastSeq, err := db.FindEmitterSequenceGap(*vaaID) | ||
|
||
assert.Equal(t, []uint64{0x0}, resp) | ||
assert.Equal(t, uint64(0x0), firstSeq) | ||
assert.Equal(t, uint64(0x1), lastSeq) | ||
assert.NoError(t, err) | ||
} | ||
|
||
// BenchmarkVaaLookup benchmarks db.GetSignedVAABytes | ||
// You need to set the environment variable WH_DBPATH to a path with a populated BadgerDB. | ||
// You may want to play with the CONCURRENCY parameter. | ||
func BenchmarkVaaLookup(b *testing.B) { | ||
CONCURRENCY := runtime.NumCPU() | ||
dbPath := os.Getenv("WH_DBPATH") | ||
require.NotEqual(b, dbPath, "") | ||
|
||
// open DB | ||
optionsDB := badger.DefaultOptions(dbPath) | ||
optionsDB.Logger = nil | ||
badgerDb, err := badger.Open(optionsDB) | ||
require.NoError(b, err) | ||
db := &Database{ | ||
db: badgerDb, | ||
} | ||
|
||
shouldPublishImmediately := p.shouldPublishImmediately(&v.VAA) | ||
|
||
if p.logger.Core().Enabled(zapcore.DebugLevel) { | ||
p.logger.Debug("observed and signed confirmed message publication", | ||
zap.String("message_id", k.MessageIDString()), | ||
zap.Stringer("txhash", k.TxHash), | ||
zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())), | ||
zap.String("hash", hash), | ||
zap.Uint32("nonce", k.Nonce), | ||
zap.Time("timestamp", k.Timestamp), | ||
zap.Uint8("consistency_level", k.ConsistencyLevel), | ||
zap.String("signature", hex.EncodeToString(signature)), | ||
zap.Bool("shouldPublishImmediately", shouldPublishImmediately), | ||
zap.Bool("isReobservation", k.IsReobservation), | ||
) | ||
if err != nil { | ||
b.Error("failed to open database") | ||
} | ||
defer db.Close() | ||
|
||
vaaIds := make(chan *VAAID, b.N) | ||
|
||
// Broadcast the signature. | ||
ourObs, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature, shouldPublishImmediately) | ||
for i := 0; i < b.N; i++ { | ||
randId := math_rand.Intn(250000) //nolint | ||
randId = 250000 - (i / 18) | ||
vaaId, err := VaaIDFromString(fmt.Sprintf("4/000000000000000000000000b6f6d86a8f9879a9c87f643768d9efc38c1da6e7/%d", randId)) | ||
assert.NoError(b, err) | ||
vaaIds <- vaaId | ||
} | ||
|
||
// Indicate that we observed this one. | ||
observationsReceivedTotal.Inc() | ||
observationsReceivedByGuardianAddressTotal.WithLabelValues(p.ourAddr.Hex()).Inc() | ||
b.ResetTimer() | ||
|
||
// Get / create our state entry. | ||
s := p.state.signatures[hash] | ||
if s == nil { | ||
s = &state{ | ||
firstObserved: time.Now(), | ||
nextRetry: time.Now().Add(nextRetryDuration(0)), | ||
signatures: map[ethCommon.Address][]byte{}, | ||
source: "loopback", | ||
} | ||
// actual timed code | ||
var errCtr atomic.Int32 | ||
var wg sync.WaitGroup | ||
|
||
p.state.signatures[hash] = s | ||
for i := 0; i < CONCURRENCY; i++ { | ||
wg.Add(1) | ||
go func() { | ||
for { | ||
select { | ||
case vaaId := <-vaaIds: | ||
_, err = db.GetSignedVAABytes(*vaaId) | ||
if err != nil { | ||
fmt.Printf("error retrieving %s/%s/%d: %s\n", vaaId.EmitterChain, vaaId.EmitterAddress, vaaId.Sequence, err) | ||
errCtr.Add(1) | ||
} | ||
default: | ||
wg.Done() | ||
return | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// Update our state. | ||
s.ourObservation = v | ||
s.txHash = k.TxHash.Bytes() | ||
s.source = v.GetEmitterChain().String() | ||
s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs | ||
s.signatures[p.ourAddr] = signature | ||
s.ourObs = ourObs | ||
s.ourMsg = msg | ||
|
||
// Fast path for our own signature. | ||
if !s.submitted { | ||
start := time.Now() | ||
p.checkForQuorum(ourObs, s, s.gs, hash) | ||
timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) | ||
wg.Wait() | ||
|
||
if int(errCtr.Load()) > b.N/3 { | ||
b.Error("More than 1/3 of GetSignedVAABytes failed.") | ||
} | ||
} |