Skip to content

Commit

Permalink
Update group messages
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Oct 21, 2024
1 parent ba4aa9f commit 295b6aa
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 44 deletions.
20 changes: 15 additions & 5 deletions pkg/indexer/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/envelopes"
"github.com/xmtp/xmtpd/pkg/mocks/mlsvalidate"
"github.com/xmtp/xmtpd/pkg/testutils"
envelopesTestUtils "github.com/xmtp/xmtpd/pkg/testutils/envelopes"
"github.com/xmtp/xmtpd/pkg/topic"
"google.golang.org/protobuf/proto"
)

func startIndexing(t *testing.T) (*queries.Queries, context.Context, func()) {
Expand Down Expand Up @@ -59,26 +62,33 @@ func TestStoreMessages(t *testing.T) {
groupID := testutils.RandomGroupID()
topic := topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, groupID[:]).Bytes()

clientEnvelope := envelopesTestUtils.CreateGroupMessageClientEnvelope(groupID, message)
clientEnvelopeBytes, err := proto.Marshal(clientEnvelope)
require.NoError(t, err)

// Publish the message onto the blockchain
_, err := publisher.PublishGroupMessage(ctx, groupID, message)
_, err = publisher.PublishGroupMessage(ctx, groupID, clientEnvelopeBytes)
require.NoError(t, err)

// Poll the DB until the stored message shows up
require.Eventually(t, func() bool {
envelopes, err := querier.SelectGatewayEnvelopes(
results, err := querier.SelectGatewayEnvelopes(
context.Background(),
queries.SelectGatewayEnvelopesParams{
Topics: [][]byte{topic},
},
)
require.NoError(t, err)

if len(envelopes) == 0 {
if len(results) == 0 {
return false
}

firstEnvelope := envelopes[0]
require.Equal(t, firstEnvelope.OriginatorEnvelope, message)
firstEnvelope := results[0]
_, err = envelopes.NewOriginatorEnvelopeFromBytes(
firstEnvelope.OriginatorEnvelope,
)
require.NoError(t, err)
require.Equal(t, firstEnvelope.Topic, topic)

return true
Expand Down
40 changes: 39 additions & 1 deletion pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package storer

import (
"context"
"errors"

"github.com/ethereum/go-ethereum/core/types"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/envelopes"
"github.com/xmtp/xmtpd/pkg/topic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

type GroupMessageStorer struct {
Expand Down Expand Up @@ -37,14 +40,49 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS

topicStruct := topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, msgSent.GroupId[:])

clientEnvelope, err := envelopes.NewClientEnvelopeFromBytes(msgSent.Message)
if err != nil {
s.logger.Error("Error parsing client envelope", zap.Error(err))
return NewLogStorageError(err, false)
}

targetTopic := clientEnvelope.TargetTopic()

if !clientEnvelope.TopicMatchesPayload() {
s.logger.Error(
"Client envelope topic does not match payload type",
zap.Any("targetTopic", targetTopic.String()),
zap.Any("contractTopic", topicStruct.String()),
)
return NewLogStorageError(
errors.New("client envelope topic does not match payload topic"),
false,
)
}

signedOriginatorEnvelope, err := buildSignedOriginatorEnvelope(
buildOriginatorEnvelope(msgSent.SequenceId, msgSent.Message),
event.BlockHash,
)
if err != nil {
s.logger.Error("Error building signed originator envelope", zap.Error(err))
return NewLogStorageError(err, false)
}

originatorEnvelopeBytes, err := proto.Marshal(signedOriginatorEnvelope)
if err != nil {
s.logger.Error("Error marshalling originator envelope", zap.Error(err))
return NewLogStorageError(err, false)
}

s.logger.Debug("Inserting message from contract", zap.String("topic", topicStruct.String()))

if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
// We may not want to hardcode this to 0 and have an originator ID for each smart contract?
OriginatorNodeID: 0,
OriginatorSequenceID: int64(msgSent.SequenceId),
Topic: topicStruct.Bytes(),
OriginatorEnvelope: msgSent.Message, // TODO:nm parse originator envelope and do some validation
OriginatorEnvelope: originatorEnvelopeBytes,
}); err != nil {
s.logger.Error("Error inserting envelope from smart contract", zap.Error(err))
return NewLogStorageError(err, true)
Expand Down
34 changes: 26 additions & 8 deletions pkg/indexer/storer/groupMessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/envelopes"
"github.com/xmtp/xmtpd/pkg/testutils"
envelopesTestUtils "github.com/xmtp/xmtpd/pkg/testutils/envelopes"
"github.com/xmtp/xmtpd/pkg/topic"
"github.com/xmtp/xmtpd/pkg/utils"
)

Expand Down Expand Up @@ -46,24 +49,37 @@ func TestStoreGroupMessages(t *testing.T) {
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

logMessage := testutils.BuildMessageSentLog(t, groupID, message, sequenceID)
clientEnvelope := envelopesTestUtils.CreateGroupMessageClientEnvelope(groupID, message)

err := storer.StoreLog(
logMessage := testutils.BuildMessageSentLog(t, groupID, clientEnvelope, sequenceID)
var err error
err = storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)

envelopes, queryErr := storer.queries.SelectGatewayEnvelopes(
gatewayEnvelopes, err := storer.queries.SelectGatewayEnvelopes(
ctx,
queries.SelectGatewayEnvelopesParams{OriginatorNodeIds: []int32{0}},
)
require.NoError(t, queryErr)
require.NoError(t, err)

require.Equal(t, len(envelopes), 1)
require.Equal(t, len(gatewayEnvelopes), 1)

firstEnvelope := envelopes[0]
require.Equal(t, firstEnvelope.OriginatorEnvelope, message)
firstEnvelope := gatewayEnvelopes[0]
originatorEnvelope, err := envelopes.NewOriginatorEnvelopeFromBytes(
firstEnvelope.OriginatorEnvelope,
)
require.NoError(t, err)
require.True(
t,
originatorEnvelope.UnsignedOriginatorEnvelope.PayerEnvelope.ClientEnvelope.TopicMatchesPayload(),
)
targetTopic := originatorEnvelope.UnsignedOriginatorEnvelope.PayerEnvelope.ClientEnvelope.TargetTopic()
require.Equal(t, targetTopic.Kind(), topic.TOPIC_KIND_GROUP_MESSAGES_V1)
require.Equal(t, targetTopic.Identifier(), groupID[:])
require.Equal(t, firstEnvelope.OriginatorSequenceID, int64(sequenceID))
}

func TestStoreGroupMessageDuplicate(t *testing.T) {
Expand All @@ -76,7 +92,9 @@ func TestStoreGroupMessageDuplicate(t *testing.T) {
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

logMessage := testutils.BuildMessageSentLog(t, groupID, message, sequenceID)
clientEnvelope := envelopesTestUtils.CreateGroupMessageClientEnvelope(groupID, message)

logMessage := testutils.BuildMessageSentLog(t, groupID, clientEnvelope, sequenceID)

err := storer.StoreLog(
ctx,
Expand Down
26 changes: 9 additions & 17 deletions pkg/indexer/storer/identityUpdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ import (
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/envelopes"
"github.com/xmtp/xmtpd/pkg/mlsvalidate"
mlsvalidateMock "github.com/xmtp/xmtpd/pkg/mocks/mlsvalidate"
"github.com/xmtp/xmtpd/pkg/proto/identity/associations"
envelopesProto "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/testutils"
"github.com/xmtp/xmtpd/pkg/topic"
envelopesTestUtils "github.com/xmtp/xmtpd/pkg/testutils/envelopes"
"github.com/xmtp/xmtpd/pkg/utils"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -67,24 +66,17 @@ func TestStoreIdentityUpdate(t *testing.T) {
identityUpdate := associations.IdentityUpdate{
InboxId: utils.HexEncode(inboxId[:]),
}
clientEnvelope, err := envelopes.NewClientEnvelope(&envelopesProto.ClientEnvelope{
Aad: &envelopesProto.AuthenticatedData{
TargetOriginator: IDENTITY_UPDATE_ORIGINATOR_ID,
TargetTopic: topic.NewTopic(topic.TOPIC_KIND_IDENTITY_UPDATES_V1, inboxId[:]).
Bytes(),
},
Payload: &envelopesProto.ClientEnvelope_IdentityUpdate{
IdentityUpdate: &identityUpdate,
},
})
require.NoError(t, err)
clientEnvelopeBytes, err := clientEnvelope.Bytes()
require.NoError(t, err)

sequenceID := uint64(1)

logMessage := testutils.BuildIdentityUpdateLog(t, inboxId, clientEnvelopeBytes, sequenceID)
logMessage := testutils.BuildIdentityUpdateLog(
t,
inboxId,
envelopesTestUtils.CreateIdentityUpdateClientEnvelope(inboxId, &identityUpdate),
sequenceID,
)

err = storer.StoreLog(
err := storer.StoreLog(
ctx,
logMessage,
)
Expand Down
14 changes: 10 additions & 4 deletions pkg/testutils/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/abis"
envelopesProto "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/utils"
"google.golang.org/protobuf/proto"
)

// Build an abi encoded MessageSent event struct
Expand All @@ -30,10 +32,12 @@ func BuildMessageSentEvent(
func BuildMessageSentLog(
t *testing.T,
groupID [32]byte,
message []byte,
clientEnvelope *envelopesProto.ClientEnvelope,
sequenceID uint64,
) types.Log {
eventData, err := BuildMessageSentEvent(groupID, message, sequenceID)
messageBytes, err := proto.Marshal(clientEnvelope)
require.NoError(t, err)
eventData, err := BuildMessageSentEvent(groupID, messageBytes, sequenceID)
require.NoError(t, err)

abi, err := abis.GroupMessagesMetaData.GetAbi()
Expand All @@ -60,10 +64,12 @@ func BuildIdentityUpdateEvent(inboxId [32]byte, update []byte, sequenceID uint64
func BuildIdentityUpdateLog(
t *testing.T,
inboxId [32]byte,
update []byte,
clientEnvelope *envelopesProto.ClientEnvelope,
sequenceID uint64,
) types.Log {
eventData, err := BuildIdentityUpdateEvent(inboxId, update, sequenceID)
messageBytes, err := proto.Marshal(clientEnvelope)
require.NoError(t, err)
eventData, err := BuildIdentityUpdateEvent(inboxId, messageBytes, sequenceID)
require.NoError(t, err)

abi, err := abis.IdentityUpdatesMetaData.GetAbi()
Expand Down
39 changes: 39 additions & 0 deletions pkg/testutils/envelopes/envelopes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/proto/identity/associations"
mlsv1 "github.com/xmtp/xmtpd/pkg/proto/mls/api/v1"
envelopes "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/topic"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -38,6 +39,44 @@ func CreateClientEnvelope(aad ...*envelopes.AuthenticatedData) *envelopes.Client
}
}

func CreateGroupMessageClientEnvelope(
groupID [32]byte,
message []byte,
) *envelopes.ClientEnvelope {
return &envelopes.ClientEnvelope{
Aad: &envelopes.AuthenticatedData{
TargetTopic: topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, groupID[:]).
Bytes(),
TargetOriginator: 0,
},
Payload: &envelopes.ClientEnvelope_GroupMessage{
GroupMessage: &mlsv1.GroupMessageInput{
Version: &mlsv1.GroupMessageInput_V1_{
V1: &mlsv1.GroupMessageInput_V1{
Data: message,
},
},
},
},
}
}

func CreateIdentityUpdateClientEnvelope(
inboxID [32]byte,
update *associations.IdentityUpdate,
) *envelopes.ClientEnvelope {
return &envelopes.ClientEnvelope{
Aad: &envelopes.AuthenticatedData{
TargetTopic: topic.NewTopic(topic.TOPIC_KIND_IDENTITY_UPDATES_V1, inboxID[:]).
Bytes(),
TargetOriginator: 0,
},
Payload: &envelopes.ClientEnvelope_IdentityUpdate{
IdentityUpdate: update,
},
}
}

func CreatePayerEnvelope(
t *testing.T,
clientEnv ...*envelopes.ClientEnvelope,
Expand Down
19 changes: 10 additions & 9 deletions pkg/topic/topic_test.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,40 @@
package topic
package topic_test

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/testutils"
"github.com/xmtp/xmtpd/pkg/topic"
"github.com/xmtp/xmtpd/pkg/utils"
)

func TestValidTopic(t *testing.T) {
newTopic := []byte{1, 2, 3}
topic, err := ParseTopic(newTopic)
parsed, err := topic.ParseTopic(newTopic)
require.NoError(t, err)
require.Equal(t, TOPIC_KIND_WELCOME_MESSAGES_V1, topic.Kind())
require.Equal(t, []byte{2, 3}, topic.Identifier())
require.Equal(t, topic.TOPIC_KIND_WELCOME_MESSAGES_V1, parsed.Kind())
require.Equal(t, []byte{2, 3}, parsed.Identifier())
}

func TestMissingIdentifier(t *testing.T) {
newTopic := []byte{1}
topic, err := ParseTopic(newTopic)
parsed, err := topic.ParseTopic(newTopic)
require.Error(t, err)
require.Nil(t, topic)
require.Nil(t, parsed)
}

func TestInvalidKind(t *testing.T) {
newTopic := []byte{255, 2, 3}
topic, err := ParseTopic(newTopic)
topic, err := topic.ParseTopic(newTopic)
require.Error(t, err)
require.Nil(t, topic)
}

func TestTopicString(t *testing.T) {
identifier := testutils.RandomBytes(32)

groupMessagesTopic := NewTopic(TOPIC_KIND_GROUP_MESSAGES_V1, identifier)
groupMessagesTopic := topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, identifier)
require.Equal(t, "group_messages_v1", groupMessagesTopic.Kind().String())
require.Equal(t, identifier, groupMessagesTopic.Identifier())
require.Equal(
Expand All @@ -42,7 +43,7 @@ func TestTopicString(t *testing.T) {
groupMessagesTopic.String(),
)

identityUpdatesTopic := NewTopic(TOPIC_KIND_IDENTITY_UPDATES_V1, identifier)
identityUpdatesTopic := topic.NewTopic(topic.TOPIC_KIND_IDENTITY_UPDATES_V1, identifier)
require.Equal(t, "identity_updates_v1", identityUpdatesTopic.Kind().String())
require.Equal(t, identifier, identityUpdatesTopic.Identifier())
require.Equal(
Expand Down

0 comments on commit 295b6aa

Please sign in to comment.