Skip to content

Commit

Permalink
feat(payments): improve span management
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Jan 25, 2024
1 parent 0a9b8d7 commit 91c92e2
Show file tree
Hide file tree
Showing 45 changed files with 405 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"time"

"github.com/adyen/adyen-go-api-library/v7/src/management"
"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/adyen/client"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const (
Expand All @@ -22,15 +22,18 @@ const (
func taskFetchAccounts(client *client.Client) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
ingester ingestion.Ingester,
scheduler task.Scheduler,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("adyen.taskFetchAccounts")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"adyen.taskFetchAccounts",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

if err := fetchAccounts(ctx, client, connectorID, ingester, scheduler); err != nil {
otel.RecordError(span, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,27 @@ import (
"context"
"errors"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func taskMain() task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("adyen.taskMain")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"adyen.taskMain",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: "Fetch accounts from client",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ func handleStandardWebhooks() http.HandlerFunc {
func taskHandleStandardWebhooks(client *client.Client, webhookID uuid.UUID) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
storageReader storage.Reader,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("adyen.taskHandleStandardWebhooks")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"adyen.taskHandleStandardWebhooks",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("webhookID", webhookID.String()),
)
defer span.End()

w, err := storageReader.GetWebhook(ctx, webhookID)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,31 @@ import (
"errors"
"fmt"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func CreateExternalBankAccountTask(config Config, client *client.Client, newExternalBankAccount *models.BankAccount) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskCreateExternalBankAccount")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskCreateExternalBankAccount",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("bankAccount.name", newExternalBankAccount.Name),
attribute.String("bankAccount.id", newExternalBankAccount.ID.String()),
)
defer span.End()

err := validateExternalBankAccount(newExternalBankAccount)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
Expand All @@ -19,21 +20,23 @@ import (
"github.com/get-momo/atlar-v1-go-client/client/accounts"
"github.com/get-momo/atlar-v1-go-client/client/external_accounts"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func FetchAccountsTask(config Config, client *client.Client) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskFetchAccounts")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskFetchAccounts",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

// Pagination works by cursor token.
for token := ""; ; {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/big"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
Expand All @@ -18,22 +19,24 @@ import (
"github.com/get-momo/atlar-v1-go-client/client/transactions"
atlar_models "github.com/get-momo/atlar-v1-go-client/models"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func FetchTransactionsTask(config Config, client *client.Client) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
resolver task.StateResolver,
scheduler task.Scheduler,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskFetchTransactions")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskFetchTransactions",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

// Pagination works by cursor token.
for token := ""; ; {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,31 @@ package atlar
import (
"context"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// Launch accounts and payments tasks.
// Period between runs dictated by config.PollingPeriod.
func MainTask(logger logging.Logger) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskMain")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskMain",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: "Fetch accounts from client",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
Expand All @@ -20,26 +21,28 @@ import (
"github.com/get-momo/atlar-v1-go-client/client/credit_transfers"
atlar_models "github.com/get-momo/atlar-v1-go-client/models"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func InitiatePaymentTask(config Config, client *client.Client, transferID string) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
storageReader storage.Reader,
ingester ingestion.Ingester,
) error {
transferInitiationID := models.MustTransferInitiationIDFromString(transferID)

span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskInitiatePayment")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskInitiatePayment",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("transferID", transferID),
attribute.String("reference", transferInitiationID.Reference),
)
defer span.End()

transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true)
if err != nil {
Expand Down Expand Up @@ -165,6 +168,7 @@ func UpdatePaymentStatusTask(
) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
storageReader storage.Reader,
Expand All @@ -173,15 +177,17 @@ func UpdatePaymentStatusTask(
paymentID := models.MustPaymentIDFromString(stringPaymentID)
transferInitiationID := models.MustTransferInitiationIDFromString(transferID)

span := trace.SpanFromContext(ctx)
span.SetName("atlar.taskUpdatePaymentStatus")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskUpdatePaymentStatus",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("transferID", transferID),
attribute.String("paymentID", stringPaymentID),
attribute.Int("attempt", attempt),
attribute.String("reference", transferInitiationID.Reference),
)
defer span.End()

transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/bankingcircle/client"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/storage"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/formancehq/payments/internal/otel"
"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// No need to call any API for banking circle since it does not support it.
Expand All @@ -24,16 +24,19 @@ func taskCreateExternalAccount(
) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
ingester ingestion.Ingester,
storageReader storage.Reader,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("bankingcircle.taskCreateExternalAccount")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"bankingcircle.taskCreateExternalAccount",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("bankAccount.id", bankAccountID.String()),
)
defer span.End()

bankAccount, err := storageReader.GetBankAccount(ctx, bankAccountID, false)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,33 @@ import (
"math/big"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/bankingcircle/client"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func taskFetchAccounts(
client *client.Client,
) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
ingester ingestion.Ingester,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("bankingcircle.taskFetchAccounts")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"bankingcircle.taskFetchAccounts",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

if err := fetchAccount(ctx, client, connectorID, scheduler, ingester); err != nil {
otel.RecordError(span, err)
Expand Down
Loading

0 comments on commit 91c92e2

Please sign in to comment.