diff --git a/components/payments/cmd/connectors/internal/connectors/adyen/task_fetch_merchants_accounts.go b/components/payments/cmd/connectors/internal/connectors/adyen/task_fetch_merchants_accounts.go index 2c83ccaea0..c96cc11ae8 100644 --- a/components/payments/cmd/connectors/internal/connectors/adyen/task_fetch_merchants_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/adyen/task_fetch_merchants_accounts.go @@ -6,13 +6,13 @@ import ( "time" "github.com/adyen/adyen-go-api-library/v7/src/management" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/adyen/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) const ( @@ -22,15 +22,18 @@ const ( func taskFetchAccounts(client *client.Client) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("adyen.taskFetchAccounts") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "adyen.taskFetchAccounts", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchAccounts(ctx, client, connectorID, ingester, scheduler); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/adyen/task_main.go b/components/payments/cmd/connectors/internal/connectors/adyen/task_main.go index 9a5abe78fe..0ae9b725b8 100644 --- a/components/payments/cmd/connectors/internal/connectors/adyen/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/adyen/task_main.go @@ -4,24 +4,27 @@ import ( "context" "errors" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskMain() task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("adyen.taskMain") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "adyen.taskMain", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch accounts from client", diff --git a/components/payments/cmd/connectors/internal/connectors/adyen/task_standard_webhooks.go b/components/payments/cmd/connectors/internal/connectors/adyen/task_standard_webhooks.go index 7fc6dedeb1..0e670f8d27 100644 --- a/components/payments/cmd/connectors/internal/connectors/adyen/task_standard_webhooks.go +++ b/components/payments/cmd/connectors/internal/connectors/adyen/task_standard_webhooks.go @@ -63,16 +63,19 @@ func handleStandardWebhooks() http.HandlerFunc { func taskHandleStandardWebhooks(client *client.Client, webhookID uuid.UUID) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, storageReader storage.Reader, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("adyen.taskHandleStandardWebhooks") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "adyen.taskHandleStandardWebhooks", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("webhookID", webhookID.String()), ) + defer span.End() w, err := storageReader.GetWebhook(ctx, webhookID) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/atlar/task_create_external_bank_account.go b/components/payments/cmd/connectors/internal/connectors/atlar/task_create_external_bank_account.go index a31f294e4c..1570ba7f60 100644 --- a/components/payments/cmd/connectors/internal/connectors/atlar/task_create_external_bank_account.go +++ b/components/payments/cmd/connectors/internal/connectors/atlar/task_create_external_bank_account.go @@ -5,28 +5,31 @@ import ( "errors" "fmt" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func CreateExternalBankAccountTask(config Config, client *client.Client, newExternalBankAccount *models.BankAccount) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("atlar.taskCreateExternalBankAccount") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "atlar.taskCreateExternalBankAccount", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("bankAccount.name", newExternalBankAccount.Name), attribute.String("bankAccount.id", newExternalBankAccount.ID.String()), ) + defer span.End() err := validateExternalBankAccount(newExternalBankAccount) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_accounts.go b/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_accounts.go index 7b13c2f452..b197437e05 100644 --- a/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_accounts.go @@ -9,6 +9,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -19,21 +20,23 @@ import ( "github.com/get-momo/atlar-v1-go-client/client/accounts" "github.com/get-momo/atlar-v1-go-client/client/external_accounts" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func FetchAccountsTask(config Config, client *client.Client) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("atlar.taskFetchAccounts") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "atlar.taskFetchAccounts", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() // Pagination works by cursor token. for token := ""; ; { diff --git a/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_transactions.go index 9c3268adcd..4418d77bc1 100644 --- a/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_transactions.go @@ -8,6 +8,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -18,22 +19,24 @@ import ( "github.com/get-momo/atlar-v1-go-client/client/transactions" atlar_models "github.com/get-momo/atlar-v1-go-client/models" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func FetchTransactionsTask(config Config, client *client.Client) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, resolver task.StateResolver, scheduler task.Scheduler, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("atlar.taskFetchTransactions") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "atlar.taskFetchTransactions", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() // Pagination works by cursor token. for token := ""; ; { diff --git a/components/payments/cmd/connectors/internal/connectors/atlar/task_main.go b/components/payments/cmd/connectors/internal/connectors/atlar/task_main.go index db5c47388e..eff490d203 100644 --- a/components/payments/cmd/connectors/internal/connectors/atlar/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/atlar/task_main.go @@ -3,13 +3,13 @@ package atlar import ( "context" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "github.com/formancehq/stack/libs/go-libs/logging" "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // Launch accounts and payments tasks. @@ -17,14 +17,17 @@ import ( func MainTask(logger logging.Logger) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("atlar.taskMain") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "atlar.taskMain", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch accounts from client", diff --git a/components/payments/cmd/connectors/internal/connectors/atlar/task_payments.go b/components/payments/cmd/connectors/internal/connectors/atlar/task_payments.go index e9afe3bc47..7e1ecbeec6 100644 --- a/components/payments/cmd/connectors/internal/connectors/atlar/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/atlar/task_payments.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/atlar/client" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -20,12 +21,12 @@ import ( "github.com/get-momo/atlar-v1-go-client/client/credit_transfers" atlar_models "github.com/get-momo/atlar-v1-go-client/models" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func InitiatePaymentTask(config Config, client *client.Client, transferID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, storageReader storage.Reader, @@ -33,13 +34,15 @@ func InitiatePaymentTask(config Config, client *client.Client, transferID string ) error { transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("atlar.taskInitiatePayment") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "atlar.taskInitiatePayment", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -165,6 +168,7 @@ func UpdatePaymentStatusTask( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, storageReader storage.Reader, @@ -173,15 +177,17 @@ func UpdatePaymentStatusTask( paymentID := models.MustPaymentIDFromString(stringPaymentID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("atlar.taskUpdatePaymentStatus") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "atlar.taskUpdatePaymentStatus", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("paymentID", stringPaymentID), attribute.Int("attempt", attempt), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_create_external_account.go b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_create_external_account.go index d017b9a8c0..46e175b346 100644 --- a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_create_external_account.go +++ b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_create_external_account.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/bankingcircle/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/storage" @@ -12,7 +13,6 @@ import ( "github.com/formancehq/payments/internal/otel" "github.com/google/uuid" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // No need to call any API for banking circle since it does not support it. @@ -24,16 +24,19 @@ func taskCreateExternalAccount( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, storageReader storage.Reader, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("bankingcircle.taskCreateExternalAccount") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "bankingcircle.taskCreateExternalAccount", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("bankAccount.id", bankAccountID.String()), ) + defer span.End() bankAccount, err := storageReader.GetBankAccount(ctx, bankAccountID, false) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_accounts.go b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_accounts.go index 080aa50379..26809d46b4 100644 --- a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_accounts.go @@ -9,6 +9,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/bankingcircle/client" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -16,7 +17,6 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchAccounts( @@ -24,15 +24,18 @@ func taskFetchAccounts( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("bankingcircle.taskFetchAccounts") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "bankingcircle.taskFetchAccounts", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchAccount(ctx, client, connectorID, scheduler, ingester); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_payments.go b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_payments.go index 61fab58956..79923e7806 100644 --- a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_payments.go @@ -6,6 +6,7 @@ import ( "math" "math/big" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/bankingcircle/client" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -13,7 +14,6 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchPayments( @@ -21,14 +21,17 @@ func taskFetchPayments( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("bankingcircle.taskFetchPayments") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "bankingcircle.taskFetchPayments", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchPayments(ctx, client, connectorID, ingester); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_main.go b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_main.go index 26244f17b7..907bfeb16d 100644 --- a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_main.go @@ -4,25 +4,28 @@ import ( "context" "errors" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // taskMain is the main task of the connector. It launches the other tasks. func taskMain() task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("bankingcircle.taskMain") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "bankingcircle.taskMain", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch accounts from client", diff --git a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_payments.go b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_payments.go index fe855beaa4..e3b4b4fde8 100644 --- a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_payments.go @@ -7,6 +7,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/bankingcircle/client" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -16,12 +17,12 @@ import ( "github.com/formancehq/payments/internal/otel" "github.com/formancehq/stack/libs/go-libs/contextutil" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskInitiatePayment(bankingCircleClient *client.Client, transferID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -29,13 +30,15 @@ func taskInitiatePayment(bankingCircleClient *client.Client, transferID string) ) error { transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("bankingcircle.taskInitiatePayment") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "bankingcircle.taskInitiatePayment", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -233,6 +236,8 @@ func taskUpdatePaymentStatus( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, + connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, storageReader storage.Reader, @@ -240,14 +245,17 @@ func taskUpdatePaymentStatus( paymentID := models.MustPaymentIDFromString(pID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("bankingcircle.taskUpdatePaymentStatus") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "bankingcircle.taskUpdatePaymentStatus", + attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("paymentID", pID), attribute.Int("attempt", attempt), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, false) if err != nil { @@ -301,6 +309,7 @@ func updatePaymentStatus( Name: "Update transfer initiation status", Key: taskNameUpdatePaymentStatus, TransferID: transfer.ID.String(), + PaymentID: paymentID.String(), Attempt: attempt + 1, }) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_accounts.go b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_accounts.go index 1a76fd13c7..62d7110606 100644 --- a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_accounts.go @@ -5,13 +5,13 @@ import ( "encoding/json" "errors" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currencycloud/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchAccounts( @@ -19,15 +19,18 @@ func taskFetchAccounts( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("currencycloud.taskFetchAccounts") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "currencycloud.taskFetchAccounts", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchAccount(ctx, client, connectorID, ingester, scheduler); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_balances.go b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_balances.go index 423752d9e3..9d71a96abe 100644 --- a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_balances.go +++ b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_balances.go @@ -7,6 +7,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currencycloud/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -14,7 +15,6 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchBalances( @@ -22,14 +22,17 @@ func taskFetchBalances( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("currencycloud.taskFetchBalances") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "currencycloud.taskFetchBalances", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchBalances(ctx, client, connectorID, ingester); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_beneficiaries.go b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_beneficiaries.go index 2df4abb995..4418cb3400 100644 --- a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_beneficiaries.go +++ b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_beneficiaries.go @@ -4,13 +4,13 @@ import ( "context" "encoding/json" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currencycloud/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchBeneficiaries( @@ -18,15 +18,18 @@ func taskFetchBeneficiaries( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("currencycloud.taskFetchBeneficiaries") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "currencycloud.taskFetchBeneficiaries", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchBeneficiaries(ctx, client, connectorID, ingester, scheduler); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_transactions.go index c178b79a73..91b40b839a 100644 --- a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_transactions.go @@ -7,6 +7,7 @@ import ( "math" "math/big" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currencycloud/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -14,20 +15,22 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchTransactions(client *client.Client, config Config) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("currencycloud.taskFetchTransactions") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "currencycloud.taskFetchTransactions", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := ingestTransactions(ctx, connectorID, client, ingester); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_main.go b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_main.go index cde5ebb6e8..ee5ea17e16 100644 --- a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_main.go @@ -4,25 +4,28 @@ import ( "context" "errors" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // taskMain is the main task of the connector. It launches the other tasks. func taskMain() task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("currencycloud.taskMain") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "currencycloud.taskMain", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch accounts from client", diff --git a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_payments.go b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_payments.go index dfd7c203ab..f8b79a728c 100644 --- a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_payments.go @@ -7,6 +7,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currencycloud/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -17,12 +18,12 @@ import ( "github.com/formancehq/stack/libs/go-libs/contextutil" "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskInitiatePayment(currencyCloudClient *client.Client, transferID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -30,13 +31,15 @@ func taskInitiatePayment(currencyCloudClient *client.Client, transferID string) ) error { transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("currencycloud.taskInitiatePayment") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "currencycloud.taskInitiatePayment", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferInitiationID", transferInitiationID.String()), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -190,6 +193,8 @@ func taskUpdatePaymentStatus( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, + connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, storageReader storage.Reader, @@ -197,14 +202,17 @@ func taskUpdatePaymentStatus( paymentID := models.MustPaymentIDFromString(pID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("currencycloud.taskUpdatePaymentStatus") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "currencycloud.taskUpdatePaymentStatus", + attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferInitiationID", transferInitiationID.String()), attribute.String("paymentID", paymentID.String()), attribute.Int("attempt", attempt), attribute.String("reference", paymentID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, false) if err != nil { @@ -258,6 +266,7 @@ func updatePaymentStatus( Name: "Update transfer initiation status", Key: taskNameUpdatePaymentStatus, TransferID: transfer.ID.String(), + PaymentID: paymentID.String(), Attempt: attempt + 1, }) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_accounts.go b/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_accounts.go index a29f48910b..666e99c089 100644 --- a/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_accounts.go @@ -9,6 +9,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/modulr/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -16,21 +17,23 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchAccounts(config Config, client *client.Client) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("modulr.taskFetchAccounts") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "modulr.taskFetchAccounts", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchAccounts(ctx, config, client, connectorID, ingester, scheduler); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_beneficiaries.go b/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_beneficiaries.go index aba39dcaa0..df7bfec78a 100644 --- a/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_beneficiaries.go +++ b/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_beneficiaries.go @@ -5,11 +5,11 @@ import ( "encoding/json" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "github.com/formancehq/payments/cmd/connectors/internal/connectors/modulr/client" "github.com/formancehq/payments/cmd/connectors/internal/task" @@ -18,15 +18,18 @@ import ( func taskFetchBeneficiaries(config Config, client *client.Client) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("modulr.taskFetchBeneficiaries") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "modulr.taskFetchBeneficiaries", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchBeneficiaries(ctx, config, client, connectorID, ingester, scheduler); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_transactions.go index d446f617fa..a6a667991b 100644 --- a/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_transactions.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/modulr/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -16,21 +17,23 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchTransactions(config Config, client *client.Client, accountID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("modulr.taskFetchTransactions") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "modulr.taskFetchTransactions", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("accountID", accountID), ) + defer span.End() if err := fetchTransactions(ctx, config, client, accountID, connectorID, ingester); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/modulr/task_main.go b/components/payments/cmd/connectors/internal/connectors/modulr/task_main.go index 80ba940074..c43bf83de2 100644 --- a/components/payments/cmd/connectors/internal/connectors/modulr/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/modulr/task_main.go @@ -4,25 +4,28 @@ import ( "context" "errors" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // taskMain is the main task of the connector. It launches the other tasks. func taskMain() task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("modulr.taskMain") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "modulr.taskMain", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch accounts from client", diff --git a/components/payments/cmd/connectors/internal/connectors/modulr/task_payments.go b/components/payments/cmd/connectors/internal/connectors/modulr/task_payments.go index 7589097ad4..18d1713c95 100644 --- a/components/payments/cmd/connectors/internal/connectors/modulr/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/modulr/task_payments.go @@ -7,6 +7,7 @@ import ( "regexp" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/modulr/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -17,7 +18,6 @@ import ( "github.com/formancehq/stack/libs/go-libs/contextutil" "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) var ( @@ -27,6 +27,7 @@ var ( func taskInitiatePayment(modulrClient *client.Client, transferID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -34,13 +35,15 @@ func taskInitiatePayment(modulrClient *client.Client, transferID string) task.Ta ) error { transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("modulr.taskInitiatePayment") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "modulr.taskInitiatePayment", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -197,6 +200,8 @@ func taskUpdatePaymentStatus( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, + connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, storageReader storage.Reader, @@ -204,13 +209,16 @@ func taskUpdatePaymentStatus( paymentID := models.MustPaymentIDFromString(pID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("modulr.taskUpdatePaymentStatus") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "modulr.taskUpdatePaymentStatus", + attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("paymentID", pID), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, false) if err != nil { @@ -267,6 +275,7 @@ func updatePaymentStatus( Name: "Update transfer initiation status", Key: taskNameUpdatePaymentStatus, TransferID: transfer.ID.String(), + PaymentID: paymentID.String(), Attempt: attempt + 1, }) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_accounts.go b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_accounts.go index 392aafe76e..eac9f78327 100644 --- a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_accounts.go @@ -6,27 +6,30 @@ import ( "errors" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/moneycorp/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchAccounts(client *client.Client) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("moneycorp.taskFetchAccounts") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "moneycorp.taskFetchAccounts", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() if err := fetchAccounts(ctx, client, connectorID, ingester, scheduler); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_balances.go b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_balances.go index dacef89454..5a6bc7523d 100644 --- a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_balances.go +++ b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_balances.go @@ -7,6 +7,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/moneycorp/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -14,21 +15,23 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchBalances(client *client.Client, accountID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("moneycorp.taskFetchBalances") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "moneycorp.taskFetchBalances", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("accountID", accountID), ) + defer span.End() if err := fetchBalances(ctx, client, accountID, connectorID, ingester); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_recipients.go b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_recipients.go index 059b6a390c..6701462a7e 100644 --- a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_recipients.go +++ b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_recipients.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/moneycorp/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -13,22 +14,24 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchRecipients(client *client.Client, accountID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("moneycorp.taskFetchRecipients") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "moneycorp.taskFetchRecipients", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("accountID", accountID), ) + defer span.End() if err := fetchRecipients(ctx, client, accountID, connectorID, ingester, scheduler); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_transactions.go index 3d1db2f376..f4afe32e93 100644 --- a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_transactions.go @@ -9,6 +9,7 @@ import ( "strconv" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/moneycorp/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -16,7 +17,6 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) const ( @@ -26,15 +26,18 @@ const ( func taskFetchTransactions(client *client.Client, accountID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("moneycorp.taskFetchTransactions") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "moneycorp.taskFetchTransactions", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("accountID", accountID), ) + defer span.End() if err := fetchTransactions(ctx, client, accountID, connectorID, ingester); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_main.go b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_main.go index 972e6519f3..334edce664 100644 --- a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_main.go @@ -4,25 +4,28 @@ import ( "context" "errors" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // taskMain is the main task of the connector. It launches the other tasks. func taskMain() task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("moneycorp.taskMain") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "moneycorp.taskMain", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch accounts from client", diff --git a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_payments.go b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_payments.go index 0a389d5396..a76962597f 100644 --- a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_payments.go @@ -8,6 +8,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/moneycorp/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -17,12 +18,12 @@ import ( "github.com/formancehq/payments/internal/otel" "github.com/formancehq/stack/libs/go-libs/contextutil" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskInitiatePayment(moneycorpClient *client.Client, transferID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -30,13 +31,15 @@ func taskInitiatePayment(moneycorpClient *client.Client, transferID string) task ) error { transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("moneycorp.taskInitiatePayment") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "moneycorp.taskInitiatePayment", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -186,6 +189,8 @@ func taskUpdatePaymentStatus( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, + connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, storageReader storage.Reader, @@ -193,14 +198,17 @@ func taskUpdatePaymentStatus( paymentID := models.MustPaymentIDFromString(pID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("moneycorp.taskUpdatePaymentStatus") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "moneycorp.taskUpdatePaymentStatus", + attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("paymentID", pID), attribute.Int("attempt", attempt), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -255,6 +263,7 @@ func updatePaymentStatus( Name: "Update transfer initiation status", Key: taskNameUpdatePaymentStatus, TransferID: transfer.ID.String(), + PaymentID: paymentID.String(), Attempt: attempt + 1, }) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_accounts.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_accounts.go index 918edc5f98..71c6fe5574 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_accounts.go @@ -5,6 +5,7 @@ import ( "encoding/json" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/stripe/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -15,7 +16,6 @@ import ( "github.com/pkg/errors" "github.com/stripe/stripe-go/v72" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) const ( @@ -26,16 +26,19 @@ func fetchAccountsTask(config TimelineConfig, client *client.DefaultClient) task return func( ctx context.Context, logger logging.Logger, + taskID models.TaskID, connectorID models.ConnectorID, resolver task.StateResolver, scheduler task.Scheduler, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("stripe.fetchAccountsTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.fetchAccountsTask", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() // Register root account. if err := registerRootAccount(ctx, connectorID, ingester, scheduler); err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_balances.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_balances.go index 126612b9b1..4058d0f913 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_balances.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_balances.go @@ -5,6 +5,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/stripe/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -13,20 +14,25 @@ import ( "github.com/formancehq/payments/internal/otel" "github.com/formancehq/stack/libs/go-libs/logging" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) -func balanceTask(account string, client *client.DefaultClient) func(ctx context.Context, logger logging.Logger, connectorID models.ConnectorID, - ingester ingestion.Ingester, resolver task.StateResolver) error { - return func(ctx context.Context, logger logging.Logger, connectorID models.ConnectorID, ingester ingestion.Ingester, +func balanceTask(account string, client *client.DefaultClient) task.Task { + return func( + ctx context.Context, + logger logging.Logger, + taskID models.TaskID, + connectorID models.ConnectorID, + ingester ingestion.Ingester, resolver task.StateResolver, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("stripe.balanceTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.balanceTask", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("account", account), ) + defer span.End() stripeAccount := account if account == rootAccountReference { diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_external_accounts.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_external_accounts.go index 2bf2b9c57e..98cb037897 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_external_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_external_accounts.go @@ -5,6 +5,7 @@ import ( "encoding/json" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/stripe/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -14,24 +15,26 @@ import ( "github.com/formancehq/stack/libs/go-libs/logging" "github.com/stripe/stripe-go/v72" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func fetchExternalAccountsTask(config TimelineConfig, account string, client *client.DefaultClient) task.Task { return func( ctx context.Context, logger logging.Logger, + taskID models.TaskID, connectorID models.ConnectorID, resolver task.StateResolver, scheduler task.Scheduler, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("stripe.fetchExternalAccountsTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.fetchExternalAccountsTask", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("account", account), ) + defer span.End() tt := NewTimelineTrigger( logger, diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments.go index e11dbea4e0..b7212ec9b4 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments.go @@ -3,6 +3,7 @@ package stripe import ( "context" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/stripe/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" @@ -11,20 +12,26 @@ import ( "github.com/formancehq/stack/libs/go-libs/logging" "github.com/stripe/stripe-go/v72" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) -func fetchPaymentsTask(config TimelineConfig, client *client.DefaultClient) func(ctx context.Context, logger logging.Logger, connectorID models.ConnectorID, resolver task.StateResolver, - scheduler task.Scheduler, ingester ingestion.Ingester) error { - return func(ctx context.Context, logger logging.Logger, connectorID models.ConnectorID, resolver task.StateResolver, - scheduler task.Scheduler, ingester ingestion.Ingester, +func fetchPaymentsTask(config TimelineConfig, client *client.DefaultClient) task.Task { + return func( + ctx context.Context, + logger logging.Logger, + taskID models.TaskID, + connectorID models.ConnectorID, + resolver task.StateResolver, + scheduler task.Scheduler, + ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("stripe.fetchPaymentsTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.fetchPaymentsTask", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("account", rootAccountReference), ) + defer span.End() tt := NewTimelineTrigger( logger, diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments_for_connected_account.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments_for_connected_account.go index 52c0134c59..b6bac50453 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments_for_connected_account.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments_for_connected_account.go @@ -3,6 +3,7 @@ package stripe import ( "context" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/stripe/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" @@ -11,7 +12,6 @@ import ( "github.com/formancehq/stack/libs/go-libs/logging" "github.com/stripe/stripe-go/v72" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func ingestBatch( @@ -59,17 +59,23 @@ func ingestBatch( return nil } -func connectedAccountTask(config TimelineConfig, account string, client *client.DefaultClient) func(ctx context.Context, logger logging.Logger, connectorID models.ConnectorID, - ingester ingestion.Ingester, resolver task.StateResolver) error { - return func(ctx context.Context, logger logging.Logger, connectorID models.ConnectorID, ingester ingestion.Ingester, +func connectedAccountTask(config TimelineConfig, account string, client *client.DefaultClient) task.Task { + return func( + ctx context.Context, + logger logging.Logger, + taskID models.TaskID, + connectorID models.ConnectorID, + ingester ingestion.Ingester, resolver task.StateResolver, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("stripe.connectedAccountTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.connectedAccountTask", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("account", account), ) + defer span.End() trigger := NewTimelineTrigger( logger, diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_main.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_main.go index a0bbcb12cf..bd6684a9fa 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_main.go @@ -3,26 +3,29 @@ package stripe import ( "context" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // Launch accounts and payments tasks func (c *Connector) mainTask() task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("stripe.mainTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.mainTask", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskAccounts, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch accounts from client", diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_payments.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_payments.go index 68677f3c8b..9886bea768 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_payments.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/stripe/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -16,7 +17,6 @@ import ( "github.com/formancehq/stack/libs/go-libs/contextutil" "github.com/stripe/stripe-go/v72" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) const ( @@ -26,6 +26,7 @@ const ( func initiatePaymentTask(transferID string, stripeClient *client.DefaultClient) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -33,13 +34,15 @@ func initiatePaymentTask(transferID string, stripeClient *client.DefaultClient) ) error { transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("stripe.initiatePaymentTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.initiatePaymentTask", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -188,6 +191,8 @@ func updatePaymentStatusTask( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, + connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, storageReader storage.Reader, @@ -195,14 +200,17 @@ func updatePaymentStatusTask( paymentID := models.MustPaymentIDFromString(pID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("stripe.updatePaymentStatusTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.updatePaymentStatusTask", + attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("paymentID", pID), attribute.Int("attempt", attempt), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, false) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_reverse_payment.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_reverse_payment.go index 483262b7b4..be2230014a 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_reverse_payment.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_reverse_payment.go @@ -5,6 +5,7 @@ import ( "errors" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/stripe/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/storage" @@ -13,12 +14,12 @@ import ( "github.com/formancehq/payments/internal/otel" "github.com/formancehq/stack/libs/go-libs/contextutil" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func reversePaymentTask(transferReversalID string, stripeClient *client.DefaultClient) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -26,13 +27,15 @@ func reversePaymentTask(transferReversalID string, stripeClient *client.DefaultC ) error { reversalID := models.MustTransferReversalIDFromString(transferReversalID) - span := trace.SpanFromContext(ctx) - span.SetName("stripe.reversePaymentTask") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "stripe.reversePaymentTask", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferReversalID", transferReversalID), attribute.String("reference", reversalID.Reference), ) + defer span.End() transferReversal, err := getTransferReversal(ctx, storageReader, reversalID) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/utils.go b/components/payments/cmd/connectors/internal/connectors/utils.go index 85d9105535..eea7591cc1 100644 --- a/components/payments/cmd/connectors/internal/connectors/utils.go +++ b/components/payments/cmd/connectors/internal/connectors/utils.go @@ -6,8 +6,10 @@ import ( "time" "github.com/formancehq/payments/cmd/connectors/internal/metrics" + "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" ) type DeferrableFunc func(ctx context.Context, timeSince time.Time) @@ -29,3 +31,22 @@ func ClientMetrics(ctx context.Context, connectorName, operation string) Deferra metrics.GetMetricsRegistry().ConnectorPSPCallLatencies().Record(ctx, time.Since(timeSince).Milliseconds(), metric.WithAttributes(attributes...)) } } + +func StartSpan( + ctx context.Context, + spanName string, + attributes ...attribute.KeyValue, +) (context.Context, trace.Span) { + parentSpan := trace.SpanFromContext(ctx) + return otel.Tracer().Start( + ctx, + spanName, + trace.WithNewRoot(), + trace.WithLinks(trace.Link{ + SpanContext: parentSpan.SpanContext(), + }), + trace.WithAttributes( + attributes..., + ), + ) +} diff --git a/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_recipient_accounts.go b/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_recipient_accounts.go index 5c34502e71..0cd373b88f 100644 --- a/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_recipient_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_recipient_accounts.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/wise/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -14,21 +15,23 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchRecipientAccounts(wiseClient *client.Client, profileID uint64) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("wise.taskFetchRecipientAccounts") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "wise.taskFetchRecipientAccounts", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("profileID", strconv.FormatUint(profileID, 10)), ) + defer span.End() recipientAccounts, err := wiseClient.GetRecipientAccounts(ctx, profileID) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_transfers.go b/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_transfers.go index b332c787c1..70b3c72101 100644 --- a/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_transfers.go +++ b/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_transfers.go @@ -8,6 +8,7 @@ import ( "math/big" "strconv" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/wise/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -15,22 +16,24 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func taskFetchTransfers(wiseClient *client.Client, profileID uint64) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ingester ingestion.Ingester, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("wise.taskFetchTransfers") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "wise.taskFetchTransfers", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("profileID", strconv.FormatUint(profileID, 10)), ) + defer span.End() if err := fetchTransfers(ctx, wiseClient, profileID, connectorID, scheduler, ingester); err != nil { otel.RecordError(span, err) diff --git a/components/payments/cmd/connectors/internal/connectors/wise/task_main.go b/components/payments/cmd/connectors/internal/connectors/wise/task_main.go index 1382187718..601c6207e7 100644 --- a/components/payments/cmd/connectors/internal/connectors/wise/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/wise/task_main.go @@ -4,25 +4,28 @@ import ( "context" "errors" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // taskMain is the main task of the connector. It launches the other tasks. func taskMain() task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("wise.taskMain") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "wise.taskMain", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskUsers, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch users from client", diff --git a/components/payments/cmd/connectors/internal/connectors/wise/task_payments.go b/components/payments/cmd/connectors/internal/connectors/wise/task_payments.go index bb12a7f960..815c8341ca 100644 --- a/components/payments/cmd/connectors/internal/connectors/wise/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/wise/task_payments.go @@ -10,8 +10,8 @@ import ( "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/wise/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -28,6 +28,7 @@ func taskInitiatePayment( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -35,13 +36,15 @@ func taskInitiatePayment( ) error { transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("wise.taskInitiatePayment") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "wise.taskInitiatePayment", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -194,6 +197,8 @@ func taskUpdatePaymentStatus( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, + connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, storageReader storage.Reader, @@ -201,14 +206,17 @@ func taskUpdatePaymentStatus( paymentID := models.MustPaymentIDFromString(pID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("wise.taskUpdatePaymentStatus") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "wise.taskUpdatePaymentStatus", + attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("paymentID", pID), attribute.Int("attempt", attempt), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, false) if err != nil { @@ -267,6 +275,7 @@ func updatePaymentStatus( Name: "Update transfer initiation status", Key: taskNameUpdatePaymentStatus, TransferID: transfer.ID.String(), + PaymentID: paymentID.String(), Attempt: attempt + 1, }) if err != nil { diff --git a/components/payments/cmd/connectors/internal/task/scheduler.go b/components/payments/cmd/connectors/internal/task/scheduler.go index 6720753ce4..779a87d644 100644 --- a/components/payments/cmd/connectors/internal/task/scheduler.go +++ b/components/payments/cmd/connectors/internal/task/scheduler.go @@ -12,12 +12,9 @@ import ( "github.com/formancehq/payments/cmd/connectors/internal/metrics" "github.com/formancehq/payments/cmd/connectors/internal/storage" "github.com/formancehq/payments/internal/models" - "github.com/formancehq/payments/internal/otel" "github.com/formancehq/stack/libs/go-libs/logging" "github.com/google/uuid" "github.com/pkg/errors" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "go.uber.org/dig" ) @@ -324,19 +321,6 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. } ctx, cancel := context.WithCancel(ctx) - parentSpan := trace.SpanFromContext(ctx) - ctx, span := otel.Tracer().Start( - ctx, - "task", - trace.WithNewRoot(), - trace.WithLinks(trace.Link{ - SpanContext: parentSpan.SpanContext(), - }), - trace.WithAttributes( - attribute.Stringer("id", task.ID), - attribute.String("connectorID", s.connectorID.String()), - ), - ) holder := &taskHolder{ cancel: cancel, @@ -371,6 +355,10 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. panic(err) } + err = container.Provide(func() models.TaskID { + return models.TaskID(task.ID) + }) + err = container.Provide(func() StopChan { s.mu.Lock() defer s.mu.Unlock() @@ -414,7 +402,6 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. if err != nil { errChan <- err close(errChan) - span.End() return errChan } @@ -447,7 +434,6 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. logger.Infof("Starting task...") defer func() { - defer span.End() defer s.deleteTask(ctx, holder) if sendError { @@ -513,7 +499,6 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. case models.OPTIONS_RUN_PERIODICALLY: go func() { defer func() { - defer span.End() defer s.deleteTask(ctx, holder) if e := recover(); e != nil { @@ -548,13 +533,13 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. } // launch it once before starting the ticker - stop, err := processFunc() + stopped, err := processFunc() if err != nil { // error is already registered return } - if stop { + if stopped { // Task is stopped or context is done return } diff --git a/components/payments/internal/models/task.go b/components/payments/internal/models/task.go index 84ef227c17..566c16305b 100644 --- a/components/payments/internal/models/task.go +++ b/components/payments/internal/models/task.go @@ -11,6 +11,12 @@ import ( "github.com/google/uuid" ) +type TaskID uuid.UUID + +func (id TaskID) String() string { + return uuid.UUID(id).String() +} + type ScheduleOption int const (