Skip to content

Commit

Permalink
feat(payments): mangopay states
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Jan 25, 2024
1 parent 91c92e2 commit 8ded607
Show file tree
Hide file tree
Showing 12 changed files with 444 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
Expand All @@ -16,7 +17,7 @@ type bankAccount struct {
CreationDate int64 `json:"CreationDate"`
}

func (c *Client) GetBankAccounts(ctx context.Context, userID string, page int) ([]*bankAccount, error) {
func (c *Client) GetBankAccounts(ctx context.Context, userID string, page, pageSize int) ([]*bankAccount, error) {
f := connectors.ClientMetrics(ctx, "mangopay", "list_bank_accounts")
now := time.Now()
defer f(ctx, now)
Expand All @@ -28,8 +29,9 @@ func (c *Client) GetBankAccounts(ctx context.Context, userID string, page int) (
}

q := req.URL.Query()
q.Add("per_page", "100")
q.Add("per_page", strconv.Itoa(pageSize))
q.Add("page", fmt.Sprint(page))
q.Add("Sort", "CreationDate:ASC")
req.URL.RawQuery = q.Encode()

resp, err := c.httpClient.Do(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
Expand Down Expand Up @@ -38,7 +39,7 @@ type Payment struct {
DebitedWalletID string `json:"DebitedWalletId"`
}

func (c *Client) GetTransactions(ctx context.Context, walletsID string, page int) ([]*Payment, error) {
func (c *Client) GetTransactions(ctx context.Context, walletsID string, page, pageSize int, afterCreatedAt time.Time) ([]*Payment, error) {
f := connectors.ClientMetrics(ctx, "mangopay", "list_transactions")
now := time.Now()
defer f(ctx, now)
Expand All @@ -50,8 +51,10 @@ func (c *Client) GetTransactions(ctx context.Context, walletsID string, page int
}

q := req.URL.Query()
q.Add("per_page", "100")
q.Add("per_page", strconv.Itoa(pageSize))
q.Add("page", fmt.Sprint(page))
q.Add("Sort", "CreationDate:ASC")
q.Add("AfterDate", strconv.FormatInt(afterCreatedAt.Unix(), 10))
req.URL.RawQuery = q.Encode()

resp, err := c.httpClient.Do(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,42 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
)

type user struct {
ID string `json:"Id"`
ID string `json:"Id"`
CreationDate int64 `json:"CreationDate"`
}

func (c *Client) GetAllUsers(ctx context.Context) ([]*user, error) {
func (c *Client) GetAllUsers(ctx context.Context, lastPage int, pageSize int) ([]*user, int, error) {
var users []*user
var currentPage int

for page := 1; ; page++ {
pagedUsers, err := c.getUsers(ctx, page)
for currentPage = lastPage; ; currentPage++ {
pagedUsers, err := c.getUsers(ctx, currentPage, pageSize)
if err != nil {
return nil, err
return nil, lastPage, err
}

if len(pagedUsers) == 0 {
break
}

users = append(users, pagedUsers...)

if len(pagedUsers) < pageSize {
break
}
}

return users, nil
return users, currentPage, nil
}

func (c *Client) getUsers(ctx context.Context, page int) ([]*user, error) {
func (c *Client) getUsers(ctx context.Context, page int, pageSize int) ([]*user, error) {
f := connectors.ClientMetrics(ctx, "mangopay", "list_users")
now := time.Now()
defer f(ctx, now)
Expand All @@ -45,8 +52,9 @@ func (c *Client) getUsers(ctx context.Context, page int) ([]*user, error) {
}

q := req.URL.Query()
q.Add("per_page", "100")
q.Add("per_page", strconv.Itoa(pageSize))
q.Add("page", fmt.Sprint(page))
q.Add("Sort", "CreationDate:ASC")
req.URL.RawQuery = q.Encode()

resp, err := c.httpClient.Do(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
)

type wallet struct {
type Wallet struct {
ID string `json:"Id"`
Description string `json:"Description"`
CreationDate int64 `json:"CreationDate"`
Expand All @@ -21,7 +22,7 @@ type wallet struct {
} `json:"Balance"`
}

func (c *Client) GetWallets(ctx context.Context, userID string, page int) ([]*wallet, error) {
func (c *Client) GetWallets(ctx context.Context, userID string, page, pageSize int) ([]*Wallet, error) {
f := connectors.ClientMetrics(ctx, "mangopay", "list_wallets")
now := time.Now()
defer f(ctx, now)
Expand All @@ -33,8 +34,9 @@ func (c *Client) GetWallets(ctx context.Context, userID string, page int) ([]*wa
}

q := req.URL.Query()
q.Add("per_page", "100")
q.Add("per_page", strconv.Itoa(pageSize))
q.Add("page", fmt.Sprint(page))
q.Add("Sort", "CreationDate:ASC")
req.URL.RawQuery = q.Encode()

resp, err := c.httpClient.Do(req)
Expand All @@ -53,7 +55,7 @@ func (c *Client) GetWallets(ctx context.Context, userID string, page int) ([]*wa
return nil, unmarshalError(resp.StatusCode, resp.Body).Error()
}

var wallets []*wallet
var wallets []*Wallet
if err := json.NewDecoder(resp.Body).Decode(&wallets); err != nil {
return nil, fmt.Errorf("failed to unmarshal wallets response body: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"github.com/formancehq/payments/cmd/connectors/internal/connectors/configtemplate"
)

const (
pageSize = 100
)

type Config struct {
Name string `json:"name" yaml:"name" bson:"name"`
ClientID string `json:"clientID" yaml:"clientID" bson:"clientID"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,52 @@ import (
"encoding/json"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/mangopay/client"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type fetchBankAccountsState struct {
LastPage int `json:"last_page"`
LastCreationDate time.Time `json:"last_creation_date"`
}

func taskFetchBankAccounts(client *client.Client, userID string) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
scheduler task.Scheduler,
ingester ingestion.Ingester,
resolver task.StateResolver,
) error {
span := trace.SpanFromContext(ctx)
span.SetName("mangopay.taskFetchBankAccounts")
span.SetAttributes(
ctx, span := connectors.StartSpan(
ctx,
"mangopay.taskFetchBankAccounts",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("userID", userID),
)
defer span.End()

state := task.MustResolveTo(ctx, resolver, fetchBankAccountsState{})
if state.LastPage == 0 {
// If last page is 0, it means we haven't fetched any wallets yet.
// Mangopay pages starts at 1.
state.LastPage = 1
}

newState, err := ingestBankAccounts(ctx, client, userID, connectorID, scheduler, ingester, state)
if err != nil {
otel.RecordError(span, err)
return err
}

if err := fetchBankAccounts(ctx, client, userID, connectorID, scheduler, ingester); err != nil {
if err := ingester.UpdateTaskState(ctx, newState); err != nil {
otel.RecordError(span, err)
return err
}
Expand All @@ -37,19 +59,25 @@ func taskFetchBankAccounts(client *client.Client, userID string) task.Task {
}
}

func fetchBankAccounts(
func ingestBankAccounts(
ctx context.Context,
client *client.Client,
userID string,
connectorID models.ConnectorID,
scheduler task.Scheduler,
ingester ingestion.Ingester,
) error {
state fetchBankAccountsState,
) (fetchBankAccountsState, error) {
var currentPage int

newState := fetchBankAccountsState{
LastCreationDate: state.LastCreationDate,
}

for page := 1; ; page++ {
pagedBankAccounts, err := client.GetBankAccounts(ctx, userID, page)
for currentPage = state.LastPage; ; currentPage++ {
pagedBankAccounts, err := client.GetBankAccounts(ctx, userID, currentPage, pageSize)
if err != nil {
return err
return fetchBankAccountsState{}, err
}

if len(pagedBankAccounts) == 0 {
Expand All @@ -58,17 +86,23 @@ func fetchBankAccounts(

var accountBatch ingestion.AccountBatch
for _, bankAccount := range pagedBankAccounts {
creationDate := time.Unix(bankAccount.CreationDate, 0)
if creationDate.Before(state.LastCreationDate) {
continue
}
newState.LastCreationDate = creationDate

buf, err := json.Marshal(bankAccount)
if err != nil {
return err
return fetchBankAccountsState{}, err
}

accountBatch = append(accountBatch, &models.Account{
ID: models.AccountID{
Reference: bankAccount.ID,
ConnectorID: connectorID,
},
CreatedAt: time.Unix(bankAccount.CreationDate, 0),
CreatedAt: creationDate,
Reference: bankAccount.ID,
ConnectorID: connectorID,
AccountName: bankAccount.OwnerName,
Expand All @@ -78,12 +112,20 @@ func fetchBankAccounts(
},
RawData: buf,
})

newState.LastCreationDate = creationDate
}

if err := ingester.IngestAccounts(ctx, accountBatch); err != nil {
return err
return fetchBankAccountsState{}, err
}

if len(pagedBankAccounts) < pageSize {
break
}
}

return nil
newState.LastPage = currentPage

return newState, nil
}
Loading

0 comments on commit 8ded607

Please sign in to comment.