Skip to content

Commit

Permalink
feat(payments): small improvments + worker pool (#1151)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Jan 25, 2024
1 parent c905d7a commit 0a9b8d7
Show file tree
Hide file tree
Showing 21 changed files with 152 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func addConnector[ConnectorConfig models.ConnectorConfigObject](loader manager.L
container := dig.New()

if err := container.Provide(func() ingestion.Ingester {
return ingestion.NewDefaultIngester(loader.Name(), descriptor, store, publisher, messages)
return ingestion.NewDefaultIngester(loader.Name(), connectorID, descriptor, store, publisher, messages)
}); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ func handleAuthorisation(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -223,9 +221,7 @@ func handleAuthorisationAdjustment(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -257,9 +253,7 @@ func handleCancellation(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -292,9 +286,7 @@ func handleCapture(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -326,9 +318,7 @@ func handleCaptureFailed(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -363,9 +353,7 @@ func handleRefund(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -402,9 +390,7 @@ func handleRefundedReversed(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -439,9 +425,7 @@ func handleRefundWithData(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -492,9 +476,7 @@ func handlePayoutThirdparty(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -543,9 +525,7 @@ func handlePayoutDecline(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down Expand Up @@ -594,9 +574,7 @@ func handlePayoutExpire(

if err := ingester.IngestPayments(
ctx,
connectorID,
ingestion.PaymentBatch{{Payment: payment}},
struct{}{},
); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func ingestPaymentsBatch(
batch = append(batch, batchElement)
}

if err := ingester.IngestPayments(ctx, connectorID, batch, struct{}{}); err != nil {
if err := ingester.IngestPayments(ctx, batch); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func ingestBatch(
batch = append(batch, batchElement)
}

if err := ingester.IngestPayments(ctx, connectorID, batch, struct{}{}); err != nil {
if err := ingester.IngestPayments(ctx, batch); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func ingestTransactions(
batch = append(batch, batchElement)
}

err = ingester.IngestPayments(ctx, connectorID, batch, struct{}{})
err = ingester.IngestPayments(ctx, batch)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,18 @@ func (m *MockIngester) IngestAccounts(ctx context.Context, batch ingestion.Accou
return nil
}

func (m *MockIngester) IngestPayments(ctx context.Context, connectorID models.ConnectorID, batch ingestion.PaymentBatch, commitState any) error {
func (m *MockIngester) IngestPayments(ctx context.Context, batch ingestion.PaymentBatch) error {
return nil
}

func (m *MockIngester) IngestBalances(ctx context.Context, batch ingestion.BalanceBatch, checkIfAccountExists bool) error {
return nil
}

func (m *MockIngester) UpdateTaskState(ctx context.Context, state any) error {
return nil
}

func (m *MockIngester) UpdateTransferInitiationPaymentsStatus(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, updatedAt time.Time) error {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func handleFile(
return err
}

err = ingester.IngestPayments(ctx, connectorID, batch, struct{}{})
err = ingester.IngestPayments(ctx, batch)
if err != nil {
return fmt.Errorf("failed to ingest file '%s': %w", descriptor.FileName, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func generatePaymentsFile(
if err != nil {
return fmt.Errorf("failed to marshal payment: %w", err)
}
if err := ingester.IngestPayments(ctx, connectorID, ingestion.PaymentBatch{
if err := ingester.IngestPayments(ctx, ingestion.PaymentBatch{
{
Payment: &models.Payment{
ID: models.PaymentID{
Expand All @@ -203,7 +203,7 @@ func generatePaymentsFile(
DestinationAccountID: destinationAccountID,
},
},
}, struct{}{}); err != nil {
}); err != nil {
return fmt.Errorf("failed to ingest payments: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func ingestBatch(
batch = append(batch, batchElement)
}

return ingester.IngestPayments(ctx, connectorID, batch, struct{}{})
return ingester.IngestPayments(ctx, batch)
}

func matchPaymentType(paymentType string) models.PaymentType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func fetchTransactions(
return err
}

if err := ingester.IngestPayments(ctx, connectorID, batch, struct{}{}); err != nil {
if err := ingester.IngestPayments(ctx, batch); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func ingestBatch(
batch = append(batch, batchElement)
}

return ingester.IngestPayments(ctx, connectorID, batch, struct{}{})
return ingester.IngestPayments(ctx, batch)
}

func matchPaymentType(transactionType string, transactionDirection string) (models.PaymentType, bool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ func ingestBatch(
"state": commitState,
}).Debugf("updating state")

err := ingester.IngestPayments(ctx, connectorID, batch, commitState)
err := ingester.IngestPayments(ctx, batch)
if err != nil {
return err
}

err = ingester.UpdateTaskState(ctx, commitState)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func fetchTransfers(
paymentBatch = append(paymentBatch, batchElement)
}

if err := ingester.IngestPayments(ctx, connectorID, paymentBatch, struct{}{}); err != nil {
if err := ingester.IngestPayments(ctx, paymentBatch); err != nil {
return err
}

Expand Down
26 changes: 15 additions & 11 deletions components/payments/cmd/connectors/internal/ingestion/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@ import (

type Ingester interface {
IngestAccounts(ctx context.Context, batch AccountBatch) error
IngestPayments(ctx context.Context, connectorID models.ConnectorID, batch PaymentBatch, commitState any) error
IngestPayments(ctx context.Context, batch PaymentBatch) error
IngestBalances(ctx context.Context, batch BalanceBatch, checkIfAccountExists bool) error
UpdateTaskState(ctx context.Context, state any) error
UpdateTransferInitiationPaymentsStatus(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, updatedAt time.Time) error
UpdateTransferReversalStatus(ctx context.Context, transfer *models.TransferInitiation, transferReversal *models.TransferReversal) error
AddTransferInitiationPaymentID(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, updatedAt time.Time) error
LinkBankAccountWithAccount(ctx context.Context, bankAccount *models.BankAccount, accountID *models.AccountID) error
}

type DefaultIngester struct {
provider models.ConnectorProvider
store Store
descriptor models.TaskDescriptor
publisher message.Publisher
messages *messages.Messages
provider models.ConnectorProvider
connectorID models.ConnectorID
store Store
descriptor models.TaskDescriptor
publisher message.Publisher
messages *messages.Messages
}

type Store interface {
Expand All @@ -42,17 +44,19 @@ type Store interface {

func NewDefaultIngester(
provider models.ConnectorProvider,
connectorID models.ConnectorID,
descriptor models.TaskDescriptor,
repo Store,
publisher message.Publisher,
messages *messages.Messages,
) *DefaultIngester {
return &DefaultIngester{
provider: provider,
descriptor: descriptor,
store: repo,
publisher: publisher,
messages: messages,
provider: provider,
connectorID: connectorID,
descriptor: descriptor,
store: repo,
publisher: publisher,
messages: messages,
}
}

Expand Down
12 changes: 0 additions & 12 deletions components/payments/cmd/connectors/internal/ingestion/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ingestion

import (
"context"
"encoding/json"
"fmt"
"time"

Expand All @@ -29,9 +28,7 @@ func (fn IngesterFn) IngestPayments(ctx context.Context, batch PaymentBatch, com

func (i *DefaultIngester) IngestPayments(
ctx context.Context,
connectorID models.ConnectorID,
batch PaymentBatch,
commitState any,
) error {
startingAt := time.Now()

Expand Down Expand Up @@ -62,15 +59,6 @@ func (i *DefaultIngester) IngestPayments(
idsInsertedMap[idsInserted[idx].String()] = struct{}{}
}

taskState, err := json.Marshal(commitState)
if err != nil {
return fmt.Errorf("error marshaling task state: %w", err)
}

if err = i.store.UpdateTaskState(ctx, connectorID, i.descriptor, taskState); err != nil {
return fmt.Errorf("error updating task state: %w", err)
}

for paymentIdx := range allPayments {
_, ok := idsInsertedMap[allPayments[paymentIdx].ID.String()]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,14 @@ func TestIngestPayments(t *testing.T) {

ingester := NewDefaultIngester(
models.ConnectorProviderDummyPay,
connectorID,
nil,
NewMockStore().WithPaymentIDsNotModified(tc.paymentIDsNotModified),
publisher,
messages.NewMessages(""),
)

err := ingester.IngestPayments(context.Background(), connectorID, tc.batch, nil)
err := ingester.IngestPayments(context.Background(), tc.batch)
publisher.Close()
require.NoError(t, err)

Expand Down
20 changes: 20 additions & 0 deletions components/payments/cmd/connectors/internal/ingestion/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ingestion

import (
"context"
"encoding/json"
"fmt"
)

func (i *DefaultIngester) UpdateTaskState(ctx context.Context, state any) error {
taskState, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("error marshaling task state: %w", err)
}

if err = i.store.UpdateTaskState(ctx, i.connectorID, i.descriptor, taskState); err != nil {
return fmt.Errorf("error updating task state: %w", err)
}

return nil
}
Loading

0 comments on commit 0a9b8d7

Please sign in to comment.