Skip to content

Commit

Permalink
feat(payments): introduce mangopay states (#1154)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Jan 25, 2024
1 parent 2b70b37 commit 7ceee06
Show file tree
Hide file tree
Showing 57 changed files with 867 additions and 422 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"time"

"github.com/adyen/adyen-go-api-library/v7/src/management"
"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/adyen/client"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const (
Expand All @@ -22,15 +22,18 @@ const (
func taskFetchAccounts(client *client.Client) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
ingester ingestion.Ingester,
scheduler task.Scheduler,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("adyen.taskFetchAccounts")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"adyen.taskFetchAccounts",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

if err := fetchAccounts(ctx, client, connectorID, ingester, scheduler); err != nil {
otel.RecordError(span, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,27 @@ import (
"context"
"errors"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func taskMain() task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("adyen.taskMain")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"adyen.taskMain",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: "Fetch accounts from client",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ func handleStandardWebhooks() http.HandlerFunc {
func taskHandleStandardWebhooks(client *client.Client, webhookID uuid.UUID) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
storageReader storage.Reader,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("adyen.taskHandleStandardWebhooks")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"adyen.taskHandleStandardWebhooks",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("webhookID", webhookID.String()),
)
defer span.End()

w, err := storageReader.GetWebhook(ctx, webhookID)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,31 @@ import (
"errors"
"fmt"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func CreateExternalBankAccountTask(config Config, client *client.Client, newExternalBankAccount *models.BankAccount) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskCreateExternalBankAccount")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskCreateExternalBankAccount",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("bankAccount.name", newExternalBankAccount.Name),
attribute.String("bankAccount.id", newExternalBankAccount.ID.String()),
)
defer span.End()

err := validateExternalBankAccount(newExternalBankAccount)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
Expand All @@ -19,21 +20,23 @@ import (
"github.com/get-momo/atlar-v1-go-client/client/accounts"
"github.com/get-momo/atlar-v1-go-client/client/external_accounts"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func FetchAccountsTask(config Config, client *client.Client) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskFetchAccounts")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskFetchAccounts",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

// Pagination works by cursor token.
for token := ""; ; {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/big"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
Expand All @@ -18,22 +19,24 @@ import (
"github.com/get-momo/atlar-v1-go-client/client/transactions"
atlar_models "github.com/get-momo/atlar-v1-go-client/models"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func FetchTransactionsTask(config Config, client *client.Client) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
resolver task.StateResolver,
scheduler task.Scheduler,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskFetchTransactions")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskFetchTransactions",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

// Pagination works by cursor token.
for token := ""; ; {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,31 @@ package atlar
import (
"context"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// Launch accounts and payments tasks.
// Period between runs dictated by config.PollingPeriod.
func MainTask(logger logging.Logger) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskMain")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskMain",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: "Fetch accounts from client",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
Expand All @@ -20,26 +21,28 @@ import (
"github.com/get-momo/atlar-v1-go-client/client/credit_transfers"
atlar_models "github.com/get-momo/atlar-v1-go-client/models"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func InitiatePaymentTask(config Config, client *client.Client, transferID string) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
storageReader storage.Reader,
ingester ingestion.Ingester,
) error {
transferInitiationID := models.MustTransferInitiationIDFromString(transferID)

span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskInitiatePayment")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskInitiatePayment",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("transferID", transferID),
attribute.String("reference", transferInitiationID.Reference),
)
defer span.End()

transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true)
if err != nil {
Expand Down Expand Up @@ -165,6 +168,7 @@ func UpdatePaymentStatusTask(
) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
storageReader storage.Reader,
Expand All @@ -173,15 +177,17 @@ func UpdatePaymentStatusTask(
paymentID := models.MustPaymentIDFromString(stringPaymentID)
transferInitiationID := models.MustTransferInitiationIDFromString(transferID)

span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskUpdatePaymentStatus")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskUpdatePaymentStatus",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("transferID", transferID),
attribute.String("paymentID", stringPaymentID),
attribute.Int("attempt", attempt),
attribute.String("reference", transferInitiationID.Reference),
)
defer span.End()

transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/bankingcircle/client"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/storage"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/formancehq/payments/internal/otel"
"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// No need to call any API for banking circle since it does not support it.
Expand All @@ -24,16 +24,19 @@ func taskCreateExternalAccount(
) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
ingester ingestion.Ingester,
storageReader storage.Reader,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("bankingcircle.taskCreateExternalAccount")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"bankingcircle.taskCreateExternalAccount",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("bankAccount.id", bankAccountID.String()),
)
defer span.End()

bankAccount, err := storageReader.GetBankAccount(ctx, bankAccountID, false)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,33 @@ import (
"math/big"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/bankingcircle/client"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func taskFetchAccounts(
client *client.Client,
) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("bankingcircle.taskFetchAccounts")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"bankingcircle.taskFetchAccounts",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

if err := fetchAccount(ctx, client, connectorID, scheduler, ingester); err != nil {
otel.RecordError(span, err)
Expand Down
Loading

0 comments on commit 7ceee06

Please sign in to comment.