From 8ded60781db6033cb71cf79e4d9a17a4292c6d57 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Thu, 25 Jan 2024 12:14:10 +0100 Subject: [PATCH] feat(payments): mangopay states --- .../mangopay/client/bank_accounts.go | 6 +- .../mangopay/client/transactions.go | 7 +- .../connectors/mangopay/client/users.go | 24 +- .../connectors/mangopay/client/wallets.go | 10 +- .../internal/connectors/mangopay/config.go | 4 + .../mangopay/task_fetch_bank_accounts.go | 70 ++++-- .../mangopay/task_fetch_transactions.go | 176 ++++++++----- .../connectors/mangopay/task_fetch_users.go | 75 ++++-- .../connectors/mangopay/task_fetch_wallets.go | 236 ++++++++++++------ .../internal/connectors/mangopay/task_main.go | 11 +- .../connectors/mangopay/task_payments.go | 21 +- .../connectors/mangopay/task_resolve.go | 4 +- 12 files changed, 444 insertions(+), 200 deletions(-) diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/client/bank_accounts.go b/components/payments/cmd/connectors/internal/connectors/mangopay/client/bank_accounts.go index 900983ed89..0d0c5da972 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/client/bank_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/client/bank_accounts.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "time" "github.com/formancehq/payments/cmd/connectors/internal/connectors" @@ -16,7 +17,7 @@ type bankAccount struct { CreationDate int64 `json:"CreationDate"` } -func (c *Client) GetBankAccounts(ctx context.Context, userID string, page int) ([]*bankAccount, error) { +func (c *Client) GetBankAccounts(ctx context.Context, userID string, page, pageSize int) ([]*bankAccount, error) { f := connectors.ClientMetrics(ctx, "mangopay", "list_bank_accounts") now := time.Now() defer f(ctx, now) @@ -28,8 +29,9 @@ func (c *Client) GetBankAccounts(ctx context.Context, userID string, page int) ( } q := req.URL.Query() - q.Add("per_page", "100") + q.Add("per_page", strconv.Itoa(pageSize)) q.Add("page", fmt.Sprint(page)) + q.Add("Sort", "CreationDate:ASC") req.URL.RawQuery = q.Encode() resp, err := c.httpClient.Do(req) diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/client/transactions.go b/components/payments/cmd/connectors/internal/connectors/mangopay/client/transactions.go index 21ba06b141..df04bbdf9f 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/client/transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/client/transactions.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "time" "github.com/formancehq/payments/cmd/connectors/internal/connectors" @@ -38,7 +39,7 @@ type Payment struct { DebitedWalletID string `json:"DebitedWalletId"` } -func (c *Client) GetTransactions(ctx context.Context, walletsID string, page int) ([]*Payment, error) { +func (c *Client) GetTransactions(ctx context.Context, walletsID string, page, pageSize int, afterCreatedAt time.Time) ([]*Payment, error) { f := connectors.ClientMetrics(ctx, "mangopay", "list_transactions") now := time.Now() defer f(ctx, now) @@ -50,8 +51,10 @@ func (c *Client) GetTransactions(ctx context.Context, walletsID string, page int } q := req.URL.Query() - q.Add("per_page", "100") + q.Add("per_page", strconv.Itoa(pageSize)) q.Add("page", fmt.Sprint(page)) + q.Add("Sort", "CreationDate:ASC") + q.Add("AfterDate", strconv.FormatInt(afterCreatedAt.Unix(), 10)) req.URL.RawQuery = q.Encode() resp, err := c.httpClient.Do(req) diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/client/users.go b/components/payments/cmd/connectors/internal/connectors/mangopay/client/users.go index df56894f29..75559b5088 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/client/users.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/client/users.go @@ -5,22 +5,25 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "time" "github.com/formancehq/payments/cmd/connectors/internal/connectors" ) type user struct { - ID string `json:"Id"` + ID string `json:"Id"` + CreationDate int64 `json:"CreationDate"` } -func (c *Client) GetAllUsers(ctx context.Context) ([]*user, error) { +func (c *Client) GetAllUsers(ctx context.Context, lastPage int, pageSize int) ([]*user, int, error) { var users []*user + var currentPage int - for page := 1; ; page++ { - pagedUsers, err := c.getUsers(ctx, page) + for currentPage = lastPage; ; currentPage++ { + pagedUsers, err := c.getUsers(ctx, currentPage, pageSize) if err != nil { - return nil, err + return nil, lastPage, err } if len(pagedUsers) == 0 { @@ -28,12 +31,16 @@ func (c *Client) GetAllUsers(ctx context.Context) ([]*user, error) { } users = append(users, pagedUsers...) + + if len(pagedUsers) < pageSize { + break + } } - return users, nil + return users, currentPage, nil } -func (c *Client) getUsers(ctx context.Context, page int) ([]*user, error) { +func (c *Client) getUsers(ctx context.Context, page int, pageSize int) ([]*user, error) { f := connectors.ClientMetrics(ctx, "mangopay", "list_users") now := time.Now() defer f(ctx, now) @@ -45,8 +52,9 @@ func (c *Client) getUsers(ctx context.Context, page int) ([]*user, error) { } q := req.URL.Query() - q.Add("per_page", "100") + q.Add("per_page", strconv.Itoa(pageSize)) q.Add("page", fmt.Sprint(page)) + q.Add("Sort", "CreationDate:ASC") req.URL.RawQuery = q.Encode() resp, err := c.httpClient.Do(req) diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/client/wallets.go b/components/payments/cmd/connectors/internal/connectors/mangopay/client/wallets.go index ee5638d965..61ffa119a1 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/client/wallets.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/client/wallets.go @@ -5,12 +5,13 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "time" "github.com/formancehq/payments/cmd/connectors/internal/connectors" ) -type wallet struct { +type Wallet struct { ID string `json:"Id"` Description string `json:"Description"` CreationDate int64 `json:"CreationDate"` @@ -21,7 +22,7 @@ type wallet struct { } `json:"Balance"` } -func (c *Client) GetWallets(ctx context.Context, userID string, page int) ([]*wallet, error) { +func (c *Client) GetWallets(ctx context.Context, userID string, page, pageSize int) ([]*Wallet, error) { f := connectors.ClientMetrics(ctx, "mangopay", "list_wallets") now := time.Now() defer f(ctx, now) @@ -33,8 +34,9 @@ func (c *Client) GetWallets(ctx context.Context, userID string, page int) ([]*wa } q := req.URL.Query() - q.Add("per_page", "100") + q.Add("per_page", strconv.Itoa(pageSize)) q.Add("page", fmt.Sprint(page)) + q.Add("Sort", "CreationDate:ASC") req.URL.RawQuery = q.Encode() resp, err := c.httpClient.Do(req) @@ -53,7 +55,7 @@ func (c *Client) GetWallets(ctx context.Context, userID string, page int) ([]*wa return nil, unmarshalError(resp.StatusCode, resp.Body).Error() } - var wallets []*wallet + var wallets []*Wallet if err := json.NewDecoder(resp.Body).Decode(&wallets); err != nil { return nil, fmt.Errorf("failed to unmarshal wallets response body: %w", err) } diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/config.go b/components/payments/cmd/connectors/internal/connectors/mangopay/config.go index e017f255f1..7e29f47515 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/config.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/config.go @@ -8,6 +8,10 @@ import ( "github.com/formancehq/payments/cmd/connectors/internal/connectors/configtemplate" ) +const ( + pageSize = 100 +) + type Config struct { Name string `json:"name" yaml:"name" bson:"name"` ClientID string `json:"clientID" yaml:"clientID" bson:"clientID"` diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_bank_accounts.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_bank_accounts.go index fe7ccf1e9e..d7aaf8deee 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_bank_accounts.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_bank_accounts.go @@ -5,30 +5,52 @@ import ( "encoding/json" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/mangopay/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) +type fetchBankAccountsState struct { + LastPage int `json:"last_page"` + LastCreationDate time.Time `json:"last_creation_date"` +} + func taskFetchBankAccounts(client *client.Client, userID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ingester ingestion.Ingester, + resolver task.StateResolver, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("mangopay.taskFetchBankAccounts") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "mangopay.taskFetchBankAccounts", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("userID", userID), ) + defer span.End() + + state := task.MustResolveTo(ctx, resolver, fetchBankAccountsState{}) + if state.LastPage == 0 { + // If last page is 0, it means we haven't fetched any wallets yet. + // Mangopay pages starts at 1. + state.LastPage = 1 + } + + newState, err := ingestBankAccounts(ctx, client, userID, connectorID, scheduler, ingester, state) + if err != nil { + otel.RecordError(span, err) + return err + } - if err := fetchBankAccounts(ctx, client, userID, connectorID, scheduler, ingester); err != nil { + if err := ingester.UpdateTaskState(ctx, newState); err != nil { otel.RecordError(span, err) return err } @@ -37,19 +59,25 @@ func taskFetchBankAccounts(client *client.Client, userID string) task.Task { } } -func fetchBankAccounts( +func ingestBankAccounts( ctx context.Context, client *client.Client, userID string, connectorID models.ConnectorID, scheduler task.Scheduler, ingester ingestion.Ingester, -) error { + state fetchBankAccountsState, +) (fetchBankAccountsState, error) { + var currentPage int + + newState := fetchBankAccountsState{ + LastCreationDate: state.LastCreationDate, + } - for page := 1; ; page++ { - pagedBankAccounts, err := client.GetBankAccounts(ctx, userID, page) + for currentPage = state.LastPage; ; currentPage++ { + pagedBankAccounts, err := client.GetBankAccounts(ctx, userID, currentPage, pageSize) if err != nil { - return err + return fetchBankAccountsState{}, err } if len(pagedBankAccounts) == 0 { @@ -58,9 +86,15 @@ func fetchBankAccounts( var accountBatch ingestion.AccountBatch for _, bankAccount := range pagedBankAccounts { + creationDate := time.Unix(bankAccount.CreationDate, 0) + if creationDate.Before(state.LastCreationDate) { + continue + } + newState.LastCreationDate = creationDate + buf, err := json.Marshal(bankAccount) if err != nil { - return err + return fetchBankAccountsState{}, err } accountBatch = append(accountBatch, &models.Account{ @@ -68,7 +102,7 @@ func fetchBankAccounts( Reference: bankAccount.ID, ConnectorID: connectorID, }, - CreatedAt: time.Unix(bankAccount.CreationDate, 0), + CreatedAt: creationDate, Reference: bankAccount.ID, ConnectorID: connectorID, AccountName: bankAccount.OwnerName, @@ -78,12 +112,20 @@ func fetchBankAccounts( }, RawData: buf, }) + + newState.LastCreationDate = creationDate } if err := ingester.IngestAccounts(ctx, accountBatch); err != nil { - return err + return fetchBankAccountsState{}, err + } + + if len(pagedBankAccounts) < pageSize { + break } } - return nil + newState.LastPage = currentPage + + return newState, nil } diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_transactions.go index 301d0904b4..be57f6a05a 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_transactions.go @@ -7,6 +7,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/mangopay/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -14,23 +15,49 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) +type fetchTransactionsState struct { + // Mangopay only allows us to sort/filter by creation date. + // So in order to have every last updates of transactions, we need to + // keep track of the first transaction with created status in order to + // refetch all transactions created after this one. + // Example: + // - SUCCEEDED + // - FAILED + // - CREATED -> We want to keep track of the creation date of this transaction since we want its updates + // - SUCCEEDED + // - CREATED + // - SUCCEEDED + FirstCreatedTransactionCreationDate time.Time `json:"first_created_transaction_creation_date"` +} + func taskFetchTransactions(client *client.Client, userID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, + resolver task.StateResolver, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("mangopay.taskFetchTransactions") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "mangopay.taskFetchTransactions", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("userID", userID), ) + defer span.End() + + state := task.MustResolveTo(ctx, resolver, fetchTransactionsState{}) + + newState, err := fetchTransactions(ctx, client, userID, connectorID, ingester, state) + if err != nil { + otel.RecordError(span, err) + return err + } - if err := fetchTransactions(ctx, client, userID, connectorID, ingester); err != nil { + if err := ingester.UpdateTaskState(ctx, newState); err != nil { otel.RecordError(span, err) return err } @@ -45,86 +72,117 @@ func fetchTransactions( userID string, connectorID models.ConnectorID, ingester ingestion.Ingester, -) error { + state fetchTransactionsState, +) (fetchTransactionsState, error) { + newState := fetchTransactionsState{} + + var firstCreatedCreationDate time.Time + var lastCreationDate time.Time for page := 1; ; page++ { - pagedPayments, err := client.GetTransactions(ctx, userID, page) + pagedPayments, err := client.GetTransactions(ctx, userID, page, pageSize, state.FirstCreatedTransactionCreationDate) if err != nil { - return err + return fetchTransactionsState{}, err } if len(pagedPayments) == 0 { break } - if err := ingestBatch(ctx, connectorID, ingester, pagedPayments); err != nil { - return err + batch := ingestion.PaymentBatch{} + for _, payment := range pagedPayments { + batchElement, err := processPayment(ctx, connectorID, payment) + if err != nil { + return fetchTransactionsState{}, err + } + + if batchElement.Payment != nil { + if firstCreatedCreationDate.IsZero() && + batchElement.Payment.Status == models.PaymentStatusPending { + firstCreatedCreationDate = batchElement.Payment.CreatedAt + } + + lastCreationDate = batchElement.Payment.CreatedAt + } + + batch = append(batch, batchElement) + } + + err = ingester.IngestPayments(ctx, batch) + if err != nil { + return fetchTransactionsState{}, err + } + + if len(pagedPayments) < pageSize { + break } } - return nil + newState.FirstCreatedTransactionCreationDate = firstCreatedCreationDate + if newState.FirstCreatedTransactionCreationDate.IsZero() { + // No new created payments, let's set the last creation date to the last + // transaction we fetched. + newState.FirstCreatedTransactionCreationDate = lastCreationDate + } + + return newState, nil } -func ingestBatch( +func processPayment( ctx context.Context, connectorID models.ConnectorID, - ingester ingestion.Ingester, - payments []*client.Payment, -) error { - batch := ingestion.PaymentBatch{} - for _, payment := range payments { - rawData, err := json.Marshal(payment) - if err != nil { - return fmt.Errorf("failed to marshal transaction: %w", err) - } + payment *client.Payment, +) (ingestion.PaymentBatchElement, error) { + rawData, err := json.Marshal(payment) + if err != nil { + return ingestion.PaymentBatchElement{}, fmt.Errorf("failed to marshal transaction: %w", err) + } - paymentType := matchPaymentType(payment.Type) + paymentType := matchPaymentType(payment.Type) + paymentStatus := matchPaymentStatus(payment.Status) - var amount big.Int - _, ok := amount.SetString(payment.DebitedFunds.Amount.String(), 10) - if !ok { - return fmt.Errorf("failed to parse amount %s", payment.DebitedFunds.Amount.String()) - } + var amount big.Int + _, ok := amount.SetString(payment.DebitedFunds.Amount.String(), 10) + if !ok { + return ingestion.PaymentBatchElement{}, fmt.Errorf("failed to parse amount %s", payment.DebitedFunds.Amount.String()) + } - batchElement := ingestion.PaymentBatchElement{ - Payment: &models.Payment{ - ID: models.PaymentID{ - PaymentReference: models.PaymentReference{ - Reference: payment.Id, - Type: paymentType, - }, - ConnectorID: connectorID, + batchElement := ingestion.PaymentBatchElement{ + Payment: &models.Payment{ + ID: models.PaymentID{ + PaymentReference: models.PaymentReference{ + Reference: payment.Id, + Type: paymentType, }, - CreatedAt: time.Unix(payment.CreationDate, 0), - Reference: payment.Id, - Amount: &amount, - InitialAmount: &amount, - ConnectorID: connectorID, - Type: paymentType, - Status: matchPaymentStatus(payment.Status), - Scheme: models.PaymentSchemeOther, - Asset: currency.FormatAsset(supportedCurrenciesWithDecimal, payment.DebitedFunds.Currency), - RawData: rawData, + ConnectorID: connectorID, }, - } + CreatedAt: time.Unix(payment.CreationDate, 0), + Reference: payment.Id, + Amount: &amount, + InitialAmount: &amount, + ConnectorID: connectorID, + Type: paymentType, + Status: paymentStatus, + Scheme: models.PaymentSchemeOther, + Asset: currency.FormatAsset(supportedCurrenciesWithDecimal, payment.DebitedFunds.Currency), + RawData: rawData, + }, + } - if payment.DebitedWalletID != "" { - batchElement.Payment.SourceAccountID = &models.AccountID{ - Reference: payment.DebitedWalletID, - ConnectorID: connectorID, - } + if payment.DebitedWalletID != "" { + batchElement.Payment.SourceAccountID = &models.AccountID{ + Reference: payment.DebitedWalletID, + ConnectorID: connectorID, } + } - if payment.CreditedWalletID != "" { - batchElement.Payment.DestinationAccountID = &models.AccountID{ - Reference: payment.CreditedWalletID, - ConnectorID: connectorID, - } + if payment.CreditedWalletID != "" { + batchElement.Payment.DestinationAccountID = &models.AccountID{ + Reference: payment.CreditedWalletID, + ConnectorID: connectorID, } - - batch = append(batch, batchElement) } - return ingester.IngestPayments(ctx, batch) + return batchElement, nil } func matchPaymentType(paymentType string) models.PaymentType { diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_users.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_users.go index d22870773a..89fdb1e8e0 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_users.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_users.go @@ -3,28 +3,53 @@ package mangopay import ( "context" "errors" + "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/mangopay/client" + "github.com/formancehq/payments/cmd/connectors/internal/ingestion" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) -func taskFetchUsers(client *client.Client) task.Task { +type fetchUsersState struct { + LastPage int `json:"last_page"` + LastCreationDate time.Time `json:"last_creation_date"` +} + +func taskFetchUsers(client *client.Client, config *Config) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, + ingester ingestion.Ingester, + resolver task.StateResolver, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("mangopay.taskFetchUsers") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "mangopay.taskFetchUsers", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() + + state := task.MustResolveTo(ctx, resolver, fetchUsersState{}) + if state.LastPage == 0 { + // If last page is 0, it means we haven't fetched any users yet. + // Mangopay pages starts at 1. + state.LastPage = 1 + } + + newState, err := ingestUsers(ctx, client, config, connectorID, scheduler, state) + if err != nil { + otel.RecordError(span, err) + return err + } - if err := fetchUsers(ctx, client, connectorID, scheduler); err != nil { + if err := ingester.UpdateTaskState(ctx, newState); err != nil { otel.RecordError(span, err) return err } @@ -33,33 +58,46 @@ func taskFetchUsers(client *client.Client) task.Task { } } -func fetchUsers( +func ingestUsers( ctx context.Context, client *client.Client, + config *Config, connectorID models.ConnectorID, scheduler task.Scheduler, -) error { - users, err := client.GetAllUsers(ctx) + state fetchUsersState, +) (fetchUsersState, error) { + users, lastPage, err := client.GetAllUsers(ctx, state.LastPage, pageSize) if err != nil { - return err + return fetchUsersState{}, err + } + + newState := fetchUsersState{ + LastPage: lastPage, + LastCreationDate: state.LastCreationDate, } for _, user := range users { + userCreationDate := time.Unix(user.CreationDate, 0) + if userCreationDate.Before(state.LastCreationDate) { + continue + } + walletsTask, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch wallets from client by user", Key: taskNameFetchWallets, UserID: user.ID, }) if err != nil { - return err + return fetchUsersState{}, err } err = scheduler.Schedule(ctx, walletsTask, models.TaskSchedulerOptions{ - ScheduleOption: models.OPTIONS_RUN_NOW, + ScheduleOption: models.OPTIONS_RUN_PERIODICALLY, + Duration: config.PollingPeriod.Duration, RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE, }) if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { - return err + return fetchUsersState{}, err } bankAccountsTask, err := models.EncodeTaskDescriptor(TaskDescriptor{ @@ -68,17 +106,20 @@ func fetchUsers( UserID: user.ID, }) if err != nil { - return err + return fetchUsersState{}, err } err = scheduler.Schedule(ctx, bankAccountsTask, models.TaskSchedulerOptions{ - ScheduleOption: models.OPTIONS_RUN_NOW, + ScheduleOption: models.OPTIONS_RUN_PERIODICALLY, + Duration: config.PollingPeriod.Duration, RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE, }) if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { - return err + return fetchUsersState{}, err } + + newState.LastCreationDate = userCreationDate } - return nil + return newState, nil } diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_wallets.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_wallets.go index bd8c20ec63..6588489789 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_wallets.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_wallets.go @@ -8,6 +8,7 @@ import ( "math/big" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/mangopay/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -15,23 +16,45 @@ import ( "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) -func taskFetchWallets(client *client.Client, userID string) task.Task { +type fetchWalletsState struct { + LastPage int `json:"last_page"` + LastCreationDate time.Time `json:"last_creation_date"` +} + +func taskFetchWallets(client *client.Client, config *Config, userID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ingester ingestion.Ingester, + resolver task.StateResolver, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("mangopay.taskFetchWallets") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "mangopay.taskFetchWallets", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), + attribute.String("userID", userID), ) + defer span.End() + + state := task.MustResolveTo(ctx, resolver, fetchWalletsState{}) + if state.LastPage == 0 { + // If last page is 0, it means we haven't fetched any wallets yet. + // Mangopay pages starts at 1. + state.LastPage = 1 + } - if err := fetchWallets(ctx, client, userID, connectorID, scheduler, ingester); err != nil { + newState, err := ingestWallets(ctx, client, config, userID, connectorID, scheduler, ingester, state) + if err != nil { + otel.RecordError(span, err) + return err + } + + if err := ingester.UpdateTaskState(ctx, newState); err != nil { otel.RecordError(span, err) return err } @@ -40,101 +63,152 @@ func taskFetchWallets(client *client.Client, userID string) task.Task { } } -func fetchWallets( +func ingestWallets( ctx context.Context, client *client.Client, + config *Config, userID string, connectorID models.ConnectorID, scheduler task.Scheduler, ingester ingestion.Ingester, -) error { - for page := 1; ; page++ { - pagedWallets, err := client.GetWallets(ctx, userID, page) + state fetchWalletsState, +) (fetchWalletsState, error) { + var currentPage int + + newState := fetchWalletsState{ + LastCreationDate: state.LastCreationDate, + } + + for currentPage = state.LastPage; ; currentPage++ { + pagedWallets, err := client.GetWallets(ctx, userID, currentPage, pageSize) if err != nil { - return err + return fetchWalletsState{}, err } if len(pagedWallets) == 0 { break } - var accountBatch ingestion.AccountBatch - var balanceBatch ingestion.BalanceBatch - var transactionTasks []models.TaskDescriptor - for _, wallet := range pagedWallets { - transactionTask, err := models.EncodeTaskDescriptor(TaskDescriptor{ - Name: "Fetch transactions from client by user and wallets", - Key: taskNameFetchTransactions, - UserID: userID, - WalletID: wallet.ID, - }) - if err != nil { - return err - } - - buf, err := json.Marshal(wallet) - if err != nil { - return err - } - - transactionTasks = append(transactionTasks, transactionTask) - accountBatch = append(accountBatch, &models.Account{ - ID: models.AccountID{ - Reference: wallet.ID, - ConnectorID: connectorID, - }, - CreatedAt: time.Unix(wallet.CreationDate, 0), - Reference: wallet.ID, - ConnectorID: connectorID, - DefaultAsset: currency.FormatAsset(supportedCurrenciesWithDecimal, wallet.Currency), - AccountName: wallet.Description, - // Wallets are internal accounts on our side, since we - // can have their balances. - Type: models.AccountTypeInternal, - Metadata: map[string]string{ - "user_id": userID, - }, - RawData: buf, - }) - - var amount big.Int - _, ok := amount.SetString(wallet.Balance.Amount.String(), 10) - if !ok { - return fmt.Errorf("failed to parse amount: %s", wallet.Balance.Amount.String()) - } - - now := time.Now() - balanceBatch = append(balanceBatch, &models.Balance{ - AccountID: models.AccountID{ - Reference: wallet.ID, - ConnectorID: connectorID, - }, - Asset: currency.FormatAsset(supportedCurrenciesWithDecimal, wallet.Balance.Currency), - Balance: &amount, - CreatedAt: now, - LastUpdatedAt: now, - ConnectorID: connectorID, - }) + lastCreationDate, err := handleWallets( + ctx, + config, + userID, + connectorID, + ingester, + scheduler, + pagedWallets, + state, + ) + if err != nil { + return fetchWalletsState{}, err } + newState.LastCreationDate = lastCreationDate - if err := ingester.IngestAccounts(ctx, accountBatch); err != nil { - return err + if len(pagedWallets) < pageSize { + break } + } - if err := ingester.IngestBalances(ctx, balanceBatch, false); err != nil { - return err + newState.LastPage = currentPage + + return newState, nil +} + +func handleWallets( + ctx context.Context, + config *Config, + userID string, + connectorID models.ConnectorID, + ingester ingestion.Ingester, + scheduler task.Scheduler, + pagedWallets []*client.Wallet, + state fetchWalletsState, +) (time.Time, error) { + var accountBatch ingestion.AccountBatch + var balanceBatch ingestion.BalanceBatch + var transactionTasks []models.TaskDescriptor + var lastCreationDate time.Time + for _, wallet := range pagedWallets { + creationDate := time.Unix(wallet.CreationDate, 0) + if creationDate.Before(state.LastCreationDate) { + continue + } + + transactionTask, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Fetch transactions from client by user and wallets", + Key: taskNameFetchTransactions, + UserID: userID, + WalletID: wallet.ID, + }) + if err != nil { + return time.Time{}, err + } + + buf, err := json.Marshal(wallet) + if err != nil { + return time.Time{}, err } - for _, transactionTask := range transactionTasks { - err = scheduler.Schedule(ctx, transactionTask, models.TaskSchedulerOptions{ - ScheduleOption: models.OPTIONS_RUN_NOW, - RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE, - }) - if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { - return err - } + transactionTasks = append(transactionTasks, transactionTask) + accountBatch = append(accountBatch, &models.Account{ + ID: models.AccountID{ + Reference: wallet.ID, + ConnectorID: connectorID, + }, + CreatedAt: time.Unix(wallet.CreationDate, 0), + Reference: wallet.ID, + ConnectorID: connectorID, + DefaultAsset: currency.FormatAsset(supportedCurrenciesWithDecimal, wallet.Currency), + AccountName: wallet.Description, + // Wallets are internal accounts on our side, since we + // can have their balances. + Type: models.AccountTypeInternal, + Metadata: map[string]string{ + "user_id": userID, + }, + RawData: buf, + }) + + var amount big.Int + _, ok := amount.SetString(wallet.Balance.Amount.String(), 10) + if !ok { + return time.Time{}, fmt.Errorf("failed to parse amount: %s", wallet.Balance.Amount.String()) + } + + now := time.Now() + balanceBatch = append(balanceBatch, &models.Balance{ + AccountID: models.AccountID{ + Reference: wallet.ID, + ConnectorID: connectorID, + }, + Asset: currency.FormatAsset(supportedCurrenciesWithDecimal, wallet.Balance.Currency), + Balance: &amount, + CreatedAt: now, + LastUpdatedAt: now, + ConnectorID: connectorID, + }) + + lastCreationDate = creationDate + } + + if err := ingester.IngestAccounts(ctx, accountBatch); err != nil { + return time.Time{}, err + } + + if err := ingester.IngestBalances(ctx, balanceBatch, false); err != nil { + return time.Time{}, err + } + + for _, transactionTask := range transactionTasks { + err := scheduler.Schedule(ctx, transactionTask, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_PERIODICALLY, + Duration: config.PollingPeriod.Duration, + RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { + return time.Time{}, err } } - return nil + return lastCreationDate, nil } diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_main.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_main.go index f516931063..fdd82caa8d 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_main.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_main.go @@ -4,25 +4,28 @@ import ( "context" "errors" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/task" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // taskMain is the main task of the connector. It launches the other tasks. func taskMain() task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, scheduler task.Scheduler, ) error { - span := trace.SpanFromContext(ctx) - span.SetName("mangopay.taskMain") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "mangopay.taskMain", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), ) + defer span.End() taskUsers, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch users from client", diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_payments.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_payments.go index b9083f8d52..12f2318928 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_payments.go @@ -7,6 +7,7 @@ import ( "regexp" "time" + "github.com/formancehq/payments/cmd/connectors/internal/connectors" "github.com/formancehq/payments/cmd/connectors/internal/connectors/currency" "github.com/formancehq/payments/cmd/connectors/internal/connectors/mangopay/client" "github.com/formancehq/payments/cmd/connectors/internal/ingestion" @@ -16,7 +17,6 @@ import ( "github.com/formancehq/payments/internal/otel" "github.com/formancehq/stack/libs/go-libs/contextutil" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) var ( @@ -26,6 +26,7 @@ var ( func taskInitiatePayment(mangopayClient *client.Client, transferID string) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -33,13 +34,15 @@ func taskInitiatePayment(mangopayClient *client.Client, transferID string) task. ) error { transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("mangopay.taskInitiatePayment") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "mangopay.taskInitiatePayment", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) if err != nil { @@ -204,6 +207,7 @@ func taskUpdatePaymentStatus( ) task.Task { return func( ctx context.Context, + taskID models.TaskID, connectorID models.ConnectorID, ingester ingestion.Ingester, scheduler task.Scheduler, @@ -212,14 +216,16 @@ func taskUpdatePaymentStatus( paymentID := models.MustPaymentIDFromString(pID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) - span := trace.SpanFromContext(ctx) - span.SetName("mangopay.taskUpdatePaymentStatus") - span.SetAttributes( + ctx, span := connectors.StartSpan( + ctx, + "mangopay.taskUpdatePaymentStatus", attribute.String("connectorID", connectorID.String()), + attribute.String("taskID", taskID.String()), attribute.String("transferID", transferID), attribute.String("paymentID", paymentID.String()), attribute.String("reference", transferInitiationID.Reference), ) + defer span.End() transfer, err := getTransfer(ctx, storageReader, transferInitiationID, false) if err != nil { @@ -277,6 +283,7 @@ func udpatePaymentStatus( Name: "Update transfer initiation status", Key: taskNameUpdatePaymentStatus, TransferID: transfer.ID.String(), + PaymentID: paymentID.String(), Attempt: attempt + 1, }) if err != nil { diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_resolve.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_resolve.go index b9ed441070..1b8ca0561a 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_resolve.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_resolve.go @@ -54,7 +54,7 @@ func resolveTasks(logger logging.Logger, config Config) func(taskDefinition Task case taskNameMain: return taskMain() case taskNameFetchUsers: - return taskFetchUsers(mangopayClient) + return taskFetchUsers(mangopayClient, &config) case taskNameFetchBankAccounts: return taskFetchBankAccounts(mangopayClient, taskDescriptor.UserID) case taskNameFetchTransactions: @@ -64,7 +64,7 @@ func resolveTasks(logger logging.Logger, config Config) func(taskDefinition Task case taskNameUpdatePaymentStatus: return taskUpdatePaymentStatus(mangopayClient, taskDescriptor.TransferID, taskDescriptor.PaymentID, taskDescriptor.Attempt) case taskNameFetchWallets: - return taskFetchWallets(mangopayClient, taskDescriptor.UserID) + return taskFetchWallets(mangopayClient, &config, taskDescriptor.UserID) } // This should never happen.