Skip to content

Commit

Permalink
Add tests and fix bugs for payer API (#248)
Browse files Browse the repository at this point in the history
## TL;DR

Gets the Payer API service to a mostly working state.

## TODO

- Error handling is not great here. What happens when you have a payload that is a mix of envelopes destined for the blockchain and for other nodes, and some of the requests fail. There's no way to roll back across the other services. I think we will need to return more fine-grained error responses that return errors per `ClientEnvelope` instead of having the entire request/response succeed or fail.

## AI Generated Summary

### What changed?

- Added `publish_test.go` for the payer API, implementing tests for publishing identity updates and group messages.
- Updated `service.go` in the payer package to implement the `PublishClientEnvelopes` method.
- Modified existing test files to use `NewTestReplicationAPIClient` instead of `NewTestAPIClient`.
- Updated envelope-related functions to provide more specific error messages.
- Added `NewPayerAPIClient` function in the API test utilities.
- Modified `CreateGroupMessageClientEnvelope` to include a `targetOriginator` parameter.

### How to test?

1. Run the new payer API tests in `publish_test.go`.
2. Verify that existing tests pass with the updated client creation methods.
3. Test the `PublishClientEnvelopes` method of the payer service with various input scenarios.

### Why make this change?

This change implements the payer API service, which is crucial for handling client envelope publishing. It also improves test utilities and error messages, making the codebase more robust and easier to debug. The updates to existing tests ensure compatibility with the new payer API functionality.

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

## Release Notes

- **New Features**
	- Introduced a new Payer API client for enhanced functionality.
	- Added comprehensive unit tests for the Payer API service.

- **Improvements**
	- Enhanced error messages for better clarity in envelope-related functions.
	- Improved logging and response handling in the PublishClientEnvelopes method.

- **Bug Fixes**
	- Updated API client initialization in various test functions to ensure consistency and accuracy.

- **Tests**
	- Expanded test coverage for group message and client envelope functionalities.
	- Modified existing tests to adapt to new API client structures and parameters.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
neekolas authored Oct 23, 2024
1 parent 9dac9eb commit 88e7845
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 39 deletions.
8 changes: 4 additions & 4 deletions pkg/api/message/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func TestPublishEnvelope(t *testing.T) {
api, db, cleanup := apiTestUtils.NewTestAPIClient(t)
api, db, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

payerEnvelope := envelopeTestUtils.CreatePayerEnvelope(t)
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestPublishEnvelope(t *testing.T) {
}

func TestUnmarshalErrorOnPublish(t *testing.T) {
api, _, cleanup := apiTestUtils.NewTestAPIClient(t)
api, _, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

envelope := envelopeTestUtils.CreatePayerEnvelope(t)
Expand All @@ -79,7 +79,7 @@ func TestUnmarshalErrorOnPublish(t *testing.T) {
}

func TestMismatchingOriginatorOnPublish(t *testing.T) {
api, _, cleanup := apiTestUtils.NewTestAPIClient(t)
api, _, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

clientEnv := envelopeTestUtils.CreateClientEnvelope()
Expand All @@ -96,7 +96,7 @@ func TestMismatchingOriginatorOnPublish(t *testing.T) {
}

func TestMissingTopicOnPublish(t *testing.T) {
api, _, cleanup := apiTestUtils.NewTestAPIClient(t)
api, _, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

clientEnv := envelopeTestUtils.CreateClientEnvelope()
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/message/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func setupTest(t *testing.T) (message_api.ReplicationApiClient, *sql.DB, func())
},
}

return testUtilsApi.NewTestAPIClient(t)
return testUtilsApi.NewTestReplicationAPIClient(t)
}

func insertInitialRows(t *testing.T, store *sql.DB) {
Expand Down
144 changes: 144 additions & 0 deletions pkg/api/payer/publish_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package payer_test

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/api/payer"
"github.com/xmtp/xmtpd/pkg/envelopes"
blockchainMocks "github.com/xmtp/xmtpd/pkg/mocks/blockchain"
registryMocks "github.com/xmtp/xmtpd/pkg/mocks/registry"
"github.com/xmtp/xmtpd/pkg/proto/identity/associations"
envelopesProto "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/payer_api"
"github.com/xmtp/xmtpd/pkg/registry"
"github.com/xmtp/xmtpd/pkg/testutils"
apiTestUtils "github.com/xmtp/xmtpd/pkg/testutils/api"
envelopesTestUtils "github.com/xmtp/xmtpd/pkg/testutils/envelopes"
"github.com/xmtp/xmtpd/pkg/utils"
"google.golang.org/protobuf/proto"
)

func buildPayerService(
t *testing.T,
) (*payer.Service, *blockchainMocks.MockIBlockchainPublisher, *registryMocks.MockNodeRegistry, func()) {
ctx, cancel := context.WithCancel(context.Background())
log := testutils.NewLog(t)
privKey, err := crypto.GenerateKey()
require.NoError(t, err)
mockRegistry := registryMocks.NewMockNodeRegistry(t)

require.NoError(t, err)
mockMessagePublisher := blockchainMocks.NewMockIBlockchainPublisher(t)

payerService, err := payer.NewPayerApiService(
ctx,
log,
mockRegistry,
privKey,
mockMessagePublisher,
)
require.NoError(t, err)

return payerService, mockMessagePublisher, mockRegistry, func() {
cancel()
}
}

func TestPublishIdentityUpdate(t *testing.T) {
ctx := context.Background()
svc, mockMessagePublisher, _, cleanup := buildPayerService(t)
defer cleanup()

inboxId := testutils.RandomInboxId()
inboxIdBytes, err := utils.ParseInboxId(inboxId)
require.NoError(t, err)

txnHash := common.Hash{1, 2, 3}
sequenceId := uint64(99)

identityUpdate := &associations.IdentityUpdate{
InboxId: inboxId,
}

envelope := envelopesTestUtils.CreateIdentityUpdateClientEnvelope(inboxIdBytes, identityUpdate)
envelopeBytes, err := proto.Marshal(envelope)
require.NoError(t, err)

mockMessagePublisher.EXPECT().
PublishIdentityUpdate(mock.Anything, mock.Anything, mock.Anything).
Return(&abis.IdentityUpdatesIdentityUpdateCreated{
Raw: types.Log{
TxHash: txnHash,
},
SequenceId: sequenceId,
Update: envelopeBytes,
}, nil)

publishResponse, err := svc.PublishClientEnvelopes(
ctx,
&payer_api.PublishClientEnvelopesRequest{
Envelopes: []*envelopesProto.ClientEnvelope{envelope},
},
)
require.NoError(t, err)
require.NotNil(t, publishResponse)
require.Len(t, publishResponse.OriginatorEnvelopes, 1)

responseEnvelope := publishResponse.OriginatorEnvelopes[0]
parsedOriginatorEnvelope, err := envelopes.NewOriginatorEnvelope(responseEnvelope)
require.NoError(t, err)

proof := parsedOriginatorEnvelope.Proto().Proof.(*envelopesProto.OriginatorEnvelope_BlockchainProof)

require.Equal(t, proof.BlockchainProof.TransactionHash, txnHash[:])
require.Equal(
t,
parsedOriginatorEnvelope.UnsignedOriginatorEnvelope.OriginatorSequenceID(),
sequenceId,
)
}

func TestPublishToNodes(t *testing.T) {
originatorServer, _, originatorCleanup := apiTestUtils.NewTestAPIServer(t)
defer originatorCleanup()

ctx := context.Background()
svc, _, mockRegistry, cleanup := buildPayerService(t)
defer cleanup()

mockRegistry.EXPECT().GetNode(mock.Anything).Return(&registry.Node{
HttpAddress: formatAddress(originatorServer.Addr().String()),
}, nil)

groupId := testutils.RandomGroupID()
testGroupMessage := envelopesTestUtils.CreateGroupMessageClientEnvelope(
groupId,
[]byte("test message"),
100, // This is the expected originator ID of the test server
)

publishResponse, err := svc.PublishClientEnvelopes(
ctx,
&payer_api.PublishClientEnvelopesRequest{
Envelopes: []*envelopesProto.ClientEnvelope{testGroupMessage},
},
)
require.NoError(t, err)
require.NotNil(t, publishResponse)
require.Len(t, publishResponse.OriginatorEnvelopes, 1)

responseEnvelope := publishResponse.OriginatorEnvelopes[0]
parsedOriginatorEnvelope, err := envelopes.NewOriginatorEnvelope(responseEnvelope)
require.NoError(t, err)

targetTopic := parsedOriginatorEnvelope.UnsignedOriginatorEnvelope.PayerEnvelope.ClientEnvelope.TargetTopic()

require.Equal(t, targetTopic.Identifier(), groupId[:])
}
56 changes: 51 additions & 5 deletions pkg/api/payer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package payer
import (
"context"
"crypto/ecdsa"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/xmtp/xmtpd/pkg/abis"
Expand All @@ -19,6 +20,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

type Service struct {
Expand Down Expand Up @@ -60,8 +62,8 @@ func (s *Service) PublishClientEnvelopes(

// For each originator found in the request, publish all matching envelopes to the node
for originatorId, payloadsWithIndex := range grouped.forNodes {
s.log.Info("publishing to originator", zap.Uint32("originator_id", originatorId))
originatorEnvelopes, err := s.publishToNodes(ctx, originatorId, payloadsWithIndex)

if err != nil {
s.log.Error("error publishing payer envelopes", zap.Error(err))
return nil, status.Error(codes.Internal, "error publishing payer envelopes")
Expand All @@ -74,14 +76,20 @@ func (s *Service) PublishClientEnvelopes(
}

for _, payload := range grouped.forBlockchain {
s.log.Info(
"publishing to blockchain",
zap.String("topic", payload.payload.TargetTopic().String()),
)
var originatorEnvelope *envelopesProto.OriginatorEnvelope
if originatorEnvelope, err = s.publishToBlockchain(ctx, payload.payload); err != nil {
return nil, status.Errorf(codes.Internal, "error publishing group message: %v", err)
}
out[payload.originalIndex] = originatorEnvelope
}

return nil, status.Errorf(codes.Unimplemented, "method PublishClientEnvelopes not implemented")
return &payer_api.PublishClientEnvelopesResponse{
OriginatorEnvelopes: out,
}, nil
}

// A struct that groups client envelopes by their intended destination
Expand All @@ -95,7 +103,7 @@ type groupedEnvelopes struct {
func (s *Service) groupEnvelopes(
rawEnvelopes []*envelopesProto.ClientEnvelope,
) (*groupedEnvelopes, error) {
out := groupedEnvelopes{}
out := groupedEnvelopes{forNodes: make(map[uint32][]clientEnvelopeWithIndex)}

for i, rawClientEnvelope := range rawEnvelopes {
clientEnvelope, err := envelopes.NewClientEnvelope(rawClientEnvelope)
Expand Down Expand Up @@ -186,6 +194,7 @@ func (s *Service) publishToBlockchain(
)
}

var unsignedOriginatorEnvelope *envelopesProto.UnsignedOriginatorEnvelope
var hash common.Hash
switch kind {
case topic.TOPIC_KIND_GROUP_MESSAGES_V1:
Expand All @@ -196,7 +205,14 @@ func (s *Service) publishToBlockchain(
if logMessage == nil {
return nil, status.Errorf(codes.Internal, "received nil logMessage")
}

hash = logMessage.Raw.TxHash
unsignedOriginatorEnvelope = buildUnsignedOriginatorEnvelopeFromChain(
clientEnvelope.Aad().TargetOriginator,
logMessage.SequenceId,
logMessage.Message,
)

case topic.TOPIC_KIND_IDENTITY_UPDATES_V1:
var logMessage *abis.IdentityUpdatesIdentityUpdateCreated
if logMessage, err = s.blockchainPublisher.PublishIdentityUpdate(ctx, idBytes, payload); err != nil {
Expand All @@ -205,7 +221,13 @@ func (s *Service) publishToBlockchain(
if logMessage == nil {
return nil, status.Errorf(codes.Internal, "received nil logMessage")
}

hash = logMessage.Raw.TxHash
unsignedOriginatorEnvelope = buildUnsignedOriginatorEnvelopeFromChain(
clientEnvelope.Aad().TargetOriginator,
logMessage.SequenceId,
logMessage.Update,
)
default:
return nil, status.Errorf(
codes.InvalidArgument,
Expand All @@ -214,8 +236,17 @@ func (s *Service) publishToBlockchain(
)
}

unsignedBytes, err := proto.Marshal(unsignedOriginatorEnvelope)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"error marshalling unsigned originator envelope: %v",
err,
)
}

return &envelopesProto.OriginatorEnvelope{
UnsignedOriginatorEnvelope: payload,
UnsignedOriginatorEnvelope: unsignedBytes,
Proof: &envelopesProto.OriginatorEnvelope_BlockchainProof{
BlockchainProof: &envelopesProto.BlockchainProof{
TransactionHash: hash.Bytes(),
Expand All @@ -224,6 +255,21 @@ func (s *Service) publishToBlockchain(
}, nil
}

func buildUnsignedOriginatorEnvelopeFromChain(
targetOriginator uint32,
sequenceID uint64,
clientEnvelope []byte,
) *envelopesProto.UnsignedOriginatorEnvelope {
return &envelopesProto.UnsignedOriginatorEnvelope{
OriginatorNodeId: targetOriginator,
OriginatorSequenceId: sequenceID,
OriginatorNs: time.Now().UnixNano(), // TODO: get this data from the chain
PayerEnvelope: &envelopesProto.PayerEnvelope{
UnsignedClientEnvelope: clientEnvelope,
},
}
}

func (s *Service) signAllClientEnvelopes(
indexedEnvelopes []clientEnvelopeWithIndex,
) ([]*envelopesProto.PayerEnvelope, error) {
Expand Down Expand Up @@ -264,7 +310,7 @@ func shouldSendToBlockchain(targetTopic topic.Topic, aad *envelopesProto.Authent
case topic.TOPIC_KIND_IDENTITY_UPDATES_V1:
return true
case topic.TOPIC_KIND_GROUP_MESSAGES_V1:
return aad.TargetOriginator < constants.MAX_BLOCKCHAIN_ORIGINATOR_ID
return aad.TargetOriginator < uint32(constants.MAX_BLOCKCHAIN_ORIGINATOR_ID)
default:
return false
}
Expand Down
Loading

0 comments on commit 88e7845

Please sign in to comment.