Skip to content

Commit

Permalink
Merge branch 'main' into mkysel/registry-notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
mkysel committed Oct 24, 2024
2 parents 6a627c9 + 06f6e29 commit 96999e0
Show file tree
Hide file tree
Showing 25 changed files with 652 additions and 202 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ jobs:
submodules: recursive
- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1
with:
version: 'nightly-2044faec64f99a21f0e5f0094458a973612d0712'
- run: forge --version
- name: Run Forge fmt
# only format code, we do not want to format LIB
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ jobs:
- run: dev/docker/up
- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1
with:
version: 'nightly-2044faec64f99a21f0e5f0094458a973612d0712'
- run: dev/contracts/deploy-local
- name: Run Tests
run: |
Expand Down Expand Up @@ -48,6 +50,8 @@ jobs:

- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1
with:
version: 'nightly-2044faec64f99a21f0e5f0094458a973612d0712'

- name: Run Forge build
working-directory: contracts
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/message/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func TestPublishEnvelope(t *testing.T) {
api, db, cleanup := apiTestUtils.NewTestAPIClient(t)
api, db, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

payerEnvelope := envelopeTestUtils.CreatePayerEnvelope(t)
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestPublishEnvelope(t *testing.T) {
}

func TestUnmarshalErrorOnPublish(t *testing.T) {
api, _, cleanup := apiTestUtils.NewTestAPIClient(t)
api, _, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

envelope := envelopeTestUtils.CreatePayerEnvelope(t)
Expand All @@ -79,7 +79,7 @@ func TestUnmarshalErrorOnPublish(t *testing.T) {
}

func TestMismatchingOriginatorOnPublish(t *testing.T) {
api, _, cleanup := apiTestUtils.NewTestAPIClient(t)
api, _, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

clientEnv := envelopeTestUtils.CreateClientEnvelope()
Expand All @@ -96,7 +96,7 @@ func TestMismatchingOriginatorOnPublish(t *testing.T) {
}

func TestMissingTopicOnPublish(t *testing.T) {
api, _, cleanup := apiTestUtils.NewTestAPIClient(t)
api, _, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

clientEnv := envelopeTestUtils.CreateClientEnvelope()
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/message/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func setupTest(t *testing.T) (message_api.ReplicationApiClient, *sql.DB, func())
},
}

return testUtilsApi.NewTestAPIClient(t)
return testUtilsApi.NewTestReplicationAPIClient(t)
}

func insertInitialRows(t *testing.T, store *sql.DB) {
Expand Down
144 changes: 144 additions & 0 deletions pkg/api/payer/publish_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package payer_test

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/api/payer"
"github.com/xmtp/xmtpd/pkg/envelopes"
blockchainMocks "github.com/xmtp/xmtpd/pkg/mocks/blockchain"
registryMocks "github.com/xmtp/xmtpd/pkg/mocks/registry"
"github.com/xmtp/xmtpd/pkg/proto/identity/associations"
envelopesProto "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/payer_api"
"github.com/xmtp/xmtpd/pkg/registry"
"github.com/xmtp/xmtpd/pkg/testutils"
apiTestUtils "github.com/xmtp/xmtpd/pkg/testutils/api"
envelopesTestUtils "github.com/xmtp/xmtpd/pkg/testutils/envelopes"
"github.com/xmtp/xmtpd/pkg/utils"
"google.golang.org/protobuf/proto"
)

func buildPayerService(
t *testing.T,
) (*payer.Service, *blockchainMocks.MockIBlockchainPublisher, *registryMocks.MockNodeRegistry, func()) {
ctx, cancel := context.WithCancel(context.Background())
log := testutils.NewLog(t)
privKey, err := crypto.GenerateKey()
require.NoError(t, err)
mockRegistry := registryMocks.NewMockNodeRegistry(t)

require.NoError(t, err)
mockMessagePublisher := blockchainMocks.NewMockIBlockchainPublisher(t)

payerService, err := payer.NewPayerApiService(
ctx,
log,
mockRegistry,
privKey,
mockMessagePublisher,
)
require.NoError(t, err)

return payerService, mockMessagePublisher, mockRegistry, func() {
cancel()
}
}

func TestPublishIdentityUpdate(t *testing.T) {
ctx := context.Background()
svc, mockMessagePublisher, _, cleanup := buildPayerService(t)
defer cleanup()

inboxId := testutils.RandomInboxId()
inboxIdBytes, err := utils.ParseInboxId(inboxId)
require.NoError(t, err)

txnHash := common.Hash{1, 2, 3}
sequenceId := uint64(99)

identityUpdate := &associations.IdentityUpdate{
InboxId: inboxId,
}

envelope := envelopesTestUtils.CreateIdentityUpdateClientEnvelope(inboxIdBytes, identityUpdate)
envelopeBytes, err := proto.Marshal(envelope)
require.NoError(t, err)

mockMessagePublisher.EXPECT().
PublishIdentityUpdate(mock.Anything, mock.Anything, mock.Anything).
Return(&abis.IdentityUpdatesIdentityUpdateCreated{
Raw: types.Log{
TxHash: txnHash,
},
SequenceId: sequenceId,
Update: envelopeBytes,
}, nil)

publishResponse, err := svc.PublishClientEnvelopes(
ctx,
&payer_api.PublishClientEnvelopesRequest{
Envelopes: []*envelopesProto.ClientEnvelope{envelope},
},
)
require.NoError(t, err)
require.NotNil(t, publishResponse)
require.Len(t, publishResponse.OriginatorEnvelopes, 1)

responseEnvelope := publishResponse.OriginatorEnvelopes[0]
parsedOriginatorEnvelope, err := envelopes.NewOriginatorEnvelope(responseEnvelope)
require.NoError(t, err)

proof := parsedOriginatorEnvelope.Proto().Proof.(*envelopesProto.OriginatorEnvelope_BlockchainProof)

require.Equal(t, proof.BlockchainProof.TransactionHash, txnHash[:])
require.Equal(
t,
parsedOriginatorEnvelope.UnsignedOriginatorEnvelope.OriginatorSequenceID(),
sequenceId,
)
}

func TestPublishToNodes(t *testing.T) {
originatorServer, _, originatorCleanup := apiTestUtils.NewTestAPIServer(t)
defer originatorCleanup()

ctx := context.Background()
svc, _, mockRegistry, cleanup := buildPayerService(t)
defer cleanup()

mockRegistry.EXPECT().GetNode(mock.Anything).Return(&registry.Node{
HttpAddress: formatAddress(originatorServer.Addr().String()),
}, nil)

groupId := testutils.RandomGroupID()
testGroupMessage := envelopesTestUtils.CreateGroupMessageClientEnvelope(
groupId,
[]byte("test message"),
100, // This is the expected originator ID of the test server
)

publishResponse, err := svc.PublishClientEnvelopes(
ctx,
&payer_api.PublishClientEnvelopesRequest{
Envelopes: []*envelopesProto.ClientEnvelope{testGroupMessage},
},
)
require.NoError(t, err)
require.NotNil(t, publishResponse)
require.Len(t, publishResponse.OriginatorEnvelopes, 1)

responseEnvelope := publishResponse.OriginatorEnvelopes[0]
parsedOriginatorEnvelope, err := envelopes.NewOriginatorEnvelope(responseEnvelope)
require.NoError(t, err)

targetTopic := parsedOriginatorEnvelope.UnsignedOriginatorEnvelope.PayerEnvelope.ClientEnvelope.TargetTopic()

require.Equal(t, targetTopic.Identifier(), groupId[:])
}
70 changes: 62 additions & 8 deletions pkg/api/payer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package payer
import (
"context"
"crypto/ecdsa"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/constants"
"github.com/xmtp/xmtpd/pkg/envelopes"
Expand All @@ -18,6 +20,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

type Service struct {
Expand Down Expand Up @@ -59,8 +62,8 @@ func (s *Service) PublishClientEnvelopes(

// For each originator found in the request, publish all matching envelopes to the node
for originatorId, payloadsWithIndex := range grouped.forNodes {
s.log.Info("publishing to originator", zap.Uint32("originator_id", originatorId))
originatorEnvelopes, err := s.publishToNodes(ctx, originatorId, payloadsWithIndex)

if err != nil {
s.log.Error("error publishing payer envelopes", zap.Error(err))
return nil, status.Error(codes.Internal, "error publishing payer envelopes")
Expand All @@ -73,14 +76,20 @@ func (s *Service) PublishClientEnvelopes(
}

for _, payload := range grouped.forBlockchain {
s.log.Info(
"publishing to blockchain",
zap.String("topic", payload.payload.TargetTopic().String()),
)
var originatorEnvelope *envelopesProto.OriginatorEnvelope
if originatorEnvelope, err = s.publishToBlockchain(ctx, payload.payload); err != nil {
return nil, status.Errorf(codes.Internal, "error publishing group message: %v", err)
}
out[payload.originalIndex] = originatorEnvelope
}

return nil, status.Errorf(codes.Unimplemented, "method PublishClientEnvelopes not implemented")
return &payer_api.PublishClientEnvelopesResponse{
OriginatorEnvelopes: out,
}, nil
}

// A struct that groups client envelopes by their intended destination
Expand All @@ -94,7 +103,7 @@ type groupedEnvelopes struct {
func (s *Service) groupEnvelopes(
rawEnvelopes []*envelopesProto.ClientEnvelope,
) (*groupedEnvelopes, error) {
out := groupedEnvelopes{}
out := groupedEnvelopes{forNodes: make(map[uint32][]clientEnvelopeWithIndex)}

for i, rawClientEnvelope := range rawEnvelopes {
clientEnvelope, err := envelopes.NewClientEnvelope(rawClientEnvelope)
Expand Down Expand Up @@ -185,29 +194,59 @@ func (s *Service) publishToBlockchain(
)
}

var unsignedOriginatorEnvelope *envelopesProto.UnsignedOriginatorEnvelope
var hash common.Hash
switch kind {
case topic.TOPIC_KIND_GROUP_MESSAGES_V1:
hash, err = s.blockchainPublisher.PublishGroupMessage(ctx, idBytes, payload)
var logMessage *abis.GroupMessagesMessageSent
if logMessage, err = s.blockchainPublisher.PublishGroupMessage(ctx, idBytes, payload); err != nil {
return nil, status.Errorf(codes.Internal, "error publishing group message: %v", err)
}
if logMessage == nil {
return nil, status.Errorf(codes.Internal, "received nil logMessage")
}

hash = logMessage.Raw.TxHash
unsignedOriginatorEnvelope = buildUnsignedOriginatorEnvelopeFromChain(
clientEnvelope.Aad().TargetOriginator,
logMessage.SequenceId,
logMessage.Message,
)

case topic.TOPIC_KIND_IDENTITY_UPDATES_V1:
hash, err = s.blockchainPublisher.PublishIdentityUpdate(ctx, idBytes, payload)
var logMessage *abis.IdentityUpdatesIdentityUpdateCreated
if logMessage, err = s.blockchainPublisher.PublishIdentityUpdate(ctx, idBytes, payload); err != nil {
return nil, status.Errorf(codes.Internal, "error publishing identity update: %v", err)
}
if logMessage == nil {
return nil, status.Errorf(codes.Internal, "received nil logMessage")
}

hash = logMessage.Raw.TxHash
unsignedOriginatorEnvelope = buildUnsignedOriginatorEnvelopeFromChain(
clientEnvelope.Aad().TargetOriginator,
logMessage.SequenceId,
logMessage.Update,
)
default:
return nil, status.Errorf(
codes.InvalidArgument,
"Unknown blockchain message for topic %s",
targetTopic.String(),
)
}

unsignedBytes, err := proto.Marshal(unsignedOriginatorEnvelope)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"error publishing group message: %v",
"error marshalling unsigned originator envelope: %v",
err,
)
}

return &envelopesProto.OriginatorEnvelope{
UnsignedOriginatorEnvelope: payload,
UnsignedOriginatorEnvelope: unsignedBytes,
Proof: &envelopesProto.OriginatorEnvelope_BlockchainProof{
BlockchainProof: &envelopesProto.BlockchainProof{
TransactionHash: hash.Bytes(),
Expand All @@ -216,6 +255,21 @@ func (s *Service) publishToBlockchain(
}, nil
}

func buildUnsignedOriginatorEnvelopeFromChain(
targetOriginator uint32,
sequenceID uint64,
clientEnvelope []byte,
) *envelopesProto.UnsignedOriginatorEnvelope {
return &envelopesProto.UnsignedOriginatorEnvelope{
OriginatorNodeId: targetOriginator,
OriginatorSequenceId: sequenceID,
OriginatorNs: time.Now().UnixNano(), // TODO: get this data from the chain
PayerEnvelope: &envelopesProto.PayerEnvelope{
UnsignedClientEnvelope: clientEnvelope,
},
}
}

func (s *Service) signAllClientEnvelopes(
indexedEnvelopes []clientEnvelopeWithIndex,
) ([]*envelopesProto.PayerEnvelope, error) {
Expand Down Expand Up @@ -256,7 +310,7 @@ func shouldSendToBlockchain(targetTopic topic.Topic, aad *envelopesProto.Authent
case topic.TOPIC_KIND_IDENTITY_UPDATES_V1:
return true
case topic.TOPIC_KIND_GROUP_MESSAGES_V1:
return aad.TargetOriginator < constants.MAX_BLOCKCHAIN_ORIGINATOR_ID
return aad.TargetOriginator < uint32(constants.MAX_BLOCKCHAIN_ORIGINATOR_ID)
default:
return false
}
Expand Down
Loading

0 comments on commit 96999e0

Please sign in to comment.