Skip to content

Commit

Permalink
Add message publishing to payer service
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Oct 21, 2024
1 parent ea704f1 commit 41bc7db
Show file tree
Hide file tree
Showing 15 changed files with 361 additions and 119 deletions.
77 changes: 12 additions & 65 deletions pkg/api/message/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ import (
"database/sql"
"fmt"

"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/envelopes"
"github.com/xmtp/xmtpd/pkg/proto/identity/associations"
envelopesProto "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api"
"github.com/xmtp/xmtpd/pkg/registrant"
"github.com/xmtp/xmtpd/pkg/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand All @@ -32,21 +29,19 @@ const (
type Service struct {
message_api.UnimplementedReplicationApiServer

ctx context.Context
log *zap.Logger
registrant *registrant.Registrant
store *sql.DB
publishWorker *publishWorker
subscribeWorker *subscribeWorker
messagePublisher blockchain.IBlockchainPublisher
ctx context.Context
log *zap.Logger
registrant *registrant.Registrant
store *sql.DB
publishWorker *publishWorker
subscribeWorker *subscribeWorker
}

func NewReplicationApiService(
ctx context.Context,
log *zap.Logger,
registrant *registrant.Registrant,
store *sql.DB,
messagePublisher blockchain.IBlockchainPublisher,

) (*Service, error) {
publishWorker, err := startPublishWorker(ctx, log, registrant, store)
Expand All @@ -59,13 +54,12 @@ func NewReplicationApiService(
}

return &Service{
ctx: ctx,
log: log,
registrant: registrant,
store: store,
publishWorker: publishWorker,
subscribeWorker: subscribeWorker,
messagePublisher: messagePublisher,
ctx: ctx,
log: log,
registrant: registrant,
store: store,
publishWorker: publishWorker,
subscribeWorker: subscribeWorker,
}, nil
}

Expand Down Expand Up @@ -227,14 +221,6 @@ func (s *Service) PublishPayerEnvelopes(
return nil, err
}

didPublish, err := s.maybePublishToBlockchain(ctx, &payerEnv.ClientEnvelope)
if err != nil {
return nil, err
}
if didPublish {
return &message_api.PublishPayerEnvelopesResponse{}, nil
}

// TODO(rich): Properly support batch publishing
payerBytes, err := payerEnv.Bytes()
if err != nil {
Expand Down Expand Up @@ -263,45 +249,6 @@ func (s *Service) PublishPayerEnvelopes(
}, nil
}

func (s *Service) maybePublishToBlockchain(
ctx context.Context,
clientEnv *envelopes.ClientEnvelope,
) (didPublish bool, err error) {
payload, ok := clientEnv.Payload().(*envelopesProto.ClientEnvelope_IdentityUpdate)
if ok && payload.IdentityUpdate != nil {
if err = s.publishIdentityUpdate(ctx, payload.IdentityUpdate); err != nil {
s.log.Error("could not publish identity update", zap.Error(err))
return false, status.Errorf(
codes.Internal,
"could not publish identity update: %v",
err,
)
}
return true, nil
}

return false, nil
}

func (s *Service) publishIdentityUpdate(
ctx context.Context,
identityUpdate *associations.IdentityUpdate,
) error {
identityUpdateBytes, err := proto.Marshal(identityUpdate)
if err != nil {
return err
}
inboxId, err := utils.ParseInboxId(identityUpdate.InboxId)
if err != nil {
return err
}
return s.messagePublisher.PublishIdentityUpdate(
ctx,
inboxId,
identityUpdateBytes,
)
}

func (s *Service) GetInboxIds(
ctx context.Context,
req *message_api.GetInboxIdsRequest,
Expand Down
Loading

0 comments on commit 41bc7db

Please sign in to comment.