Skip to content

Commit

Permalink
fix(payments): fix concurrent map writes (#859)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Nov 14, 2023
1 parent 702be5b commit eaa6495
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 20 deletions.
73 changes: 53 additions & 20 deletions components/payments/cmd/connectors/internal/integration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integration
import (
"context"
"fmt"
"sync"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/payments/cmd/connectors/internal/messages"
Expand Down Expand Up @@ -33,10 +34,12 @@ type connectorManager struct {
type ConnectorsManager[Config models.ConnectorConfigObject] struct {
provider models.ConnectorProvider
loader Loader[Config]
connectors map[string]*connectorManager
store Repository
schedulerFactory TaskSchedulerFactory
publisher message.Publisher

connectors map[string]*connectorManager
mu sync.RWMutex
}

func (l *ConnectorsManager[ConnectorConfig]) logger(ctx context.Context) logging.Logger {
Expand All @@ -46,8 +49,28 @@ func (l *ConnectorsManager[ConnectorConfig]) logger(ctx context.Context) logging
})
}

func (l *ConnectorsManager[ConnectorConfig]) getManager(connectorID models.ConnectorID) (*connectorManager, error) {
l.mu.RLock()
defer l.mu.RUnlock()

connector, ok := l.connectors[connectorID.String()]
if !ok {
return nil, ErrNotInstalled
}

return connector, nil
}

func (l *ConnectorsManager[ConnectorConfig]) Connectors() map[string]*connectorManager {
return l.connectors
l.mu.RLock()
defer l.mu.RUnlock()

copy := make(map[string]*connectorManager, len(l.connectors))
for k, v := range l.connectors {
copy[k] = v
}

return copy
}

func (l *ConnectorsManager[ConnectorConfig]) ReadConfig(
Expand Down Expand Up @@ -94,10 +117,12 @@ func (l *ConnectorsManager[ConnectorConfig]) load(
c := l.loader.Load(l.logger(ctx), connectorConfig)
scheduler := l.schedulerFactory.Make(connectorID, c, l.loader.AllowTasks())

l.mu.Lock()
l.connectors[connectorID.String()] = &connectorManager{
connector: c,
scheduler: scheduler,
}
l.mu.Unlock()

return nil
}
Expand Down Expand Up @@ -151,7 +176,10 @@ func (l *ConnectorsManager[ConnectorConfig]) Install(
return models.ConnectorID{}, err
}

connectorManager := l.connectors[connector.ID.String()]
connectorManager, err := l.getManager(connector.ID)
if err != nil {
return models.ConnectorID{}, err
}

err = connectorManager.connector.Install(task.NewConnectorContext(logging.ContextWithLogger(
context.TODO(),
Expand All @@ -171,14 +199,13 @@ func (l *ConnectorsManager[ConnectorConfig]) Install(
func (l *ConnectorsManager[ConnectorConfig]) Uninstall(ctx context.Context, connectorID models.ConnectorID) error {
l.logger(ctx).Infof("Uninstalling connector: %s", connectorID)

connectorManager, ok := l.connectors[connectorID.String()]
if !ok {
connectorManager, err := l.getManager(connectorID)
if err != nil {
l.logger(ctx).Errorf("Connector not installed")

return ErrNotInstalled
return err
}

err := connectorManager.scheduler.Shutdown(ctx)
err = connectorManager.scheduler.Shutdown(ctx)
if err != nil {
return err
}
Expand All @@ -193,7 +220,9 @@ func (l *ConnectorsManager[ConnectorConfig]) Uninstall(ctx context.Context, conn
return err
}

l.mu.Lock()
delete(l.connectors, connectorID.String())
l.mu.Unlock()

l.logger(ctx).Infof("Connector %s uninstalled", connectorID)

Expand Down Expand Up @@ -225,9 +254,7 @@ func (l *ConnectorsManager[ConnectorConfig]) Restore(ctx context.Context) error
func (l *ConnectorsManager[ConnectorConfig]) restore(ctx context.Context, connector *models.Connector) error {
l.logger(ctx).Infof("Restoring state for connector: %s", connector.Name)

connectorID := connector.ID.String()
_, ok := l.connectors[connectorID]
if ok {
if manager, _ := l.getManager(connector.ID); manager != nil {
return ErrAlreadyRunning
}

Expand All @@ -240,7 +267,12 @@ func (l *ConnectorsManager[ConnectorConfig]) restore(ctx context.Context, connec
return err
}

if err := l.connectors[connectorID].scheduler.Restore(ctx); err != nil {
manager, err := l.getManager(connector.ID)
if err != nil {
return err
}

if err := manager.scheduler.Restore(ctx); err != nil {
return err
}

Expand All @@ -255,7 +287,7 @@ func (l *ConnectorsManager[ConnectorConfig]) FindAll(ctx context.Context) ([]*mo
return nil, err
}

providerConnectors := make([]*models.Connector, 0, len(l.connectors))
providerConnectors := make([]*models.Connector, 0, len(connectors))
for _, connector := range connectors {
if connector.Provider == l.provider {
providerConnectors = append(providerConnectors, connector)
Expand All @@ -274,17 +306,17 @@ func (l *ConnectorsManager[ConnectorConfig]) ListTasksStates(
connectorID models.ConnectorID,
pagination storage.PaginatorQuery,
) ([]models.Task, storage.PaginationDetails, error) {
connectorManager, ok := l.connectors[connectorID.String()]
if !ok {
connectorManager, err := l.getManager(connectorID)
if err != nil {
return nil, storage.PaginationDetails{}, ErrConnectorNotFound
}

return connectorManager.scheduler.ListTasks(ctx, pagination)
}

func (l *ConnectorsManager[Config]) ReadTaskState(ctx context.Context, connectorID models.ConnectorID, taskID uuid.UUID) (*models.Task, error) {
connectorManager, ok := l.connectors[connectorID.String()]
if !ok {
connectorManager, err := l.getManager(connectorID)
if err != nil {
return nil, ErrConnectorNotFound
}

Expand Down Expand Up @@ -322,11 +354,12 @@ func (l *ConnectorsManager[ConnectorConfig]) Reset(ctx context.Context, connecto
}

func (l *ConnectorsManager[ConnectorConfig]) InitiatePayment(ctx context.Context, transfer *models.TransferInitiation) error {
connectorManager, ok := l.connectors[transfer.ConnectorID.String()]
if !ok {
connectorManager, err := l.getManager(transfer.ConnectorID)
if err != nil {
return ErrConnectorNotFound
}
err := connectorManager.connector.InitiatePayment(task.NewConnectorContext(ctx, connectorManager.scheduler), transfer)

err = connectorManager.connector.InitiatePayment(task.NewConnectorContext(ctx, connectorManager.scheduler), transfer)
if err != nil {
return fmt.Errorf("initiating transfer: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integration_test
import (
"context"
"encoding/json"
"sync"

"github.com/formancehq/payments/cmd/connectors/internal/integration"
"github.com/formancehq/payments/internal/models"
Expand All @@ -18,9 +19,13 @@ type connector struct {
type InMemoryConnectorStore struct {
connectorsByID map[string]*connector
connectorsByName map[string]*connector
mu sync.RWMutex
}

func (i *InMemoryConnectorStore) Uninstall(ctx context.Context, connectorID models.ConnectorID) error {
i.mu.Lock()
defer i.mu.Unlock()

connector, ok := i.connectorsByID[connectorID.String()]
if !ok {
return nil
Expand All @@ -33,6 +38,9 @@ func (i *InMemoryConnectorStore) Uninstall(ctx context.Context, connectorID mode
}

func (i *InMemoryConnectorStore) ListConnectors(_ context.Context) ([]*models.Connector, error) {
i.mu.RLock()
defer i.mu.RUnlock()

connectors := make([]*models.Connector, 0, len(i.connectorsByID))
for _, c := range i.connectorsByID {
connectors = append(connectors, &models.Connector{
Expand All @@ -46,34 +54,50 @@ func (i *InMemoryConnectorStore) ListConnectors(_ context.Context) ([]*models.Co
}

func (i *InMemoryConnectorStore) IsInstalledByConnectorID(ctx context.Context, connectorID models.ConnectorID) (bool, error) {
i.mu.RLock()
defer i.mu.RUnlock()

_, ok := i.connectorsByID[connectorID.String()]
return ok, nil
}

func (i *InMemoryConnectorStore) IsInstalledByConnectorName(ctx context.Context, name string) (bool, error) {
i.mu.RLock()
defer i.mu.RUnlock()

_, ok := i.connectorsByName[name]
return ok, nil
}

func (i *InMemoryConnectorStore) Install(ctx context.Context, newConnector *models.Connector, config json.RawMessage) error {
i.mu.Lock()
defer i.mu.Unlock()

c := &connector{
name: newConnector.Name,
id: newConnector.ID,
config: config,
provider: newConnector.Provider,
}

i.connectorsByID[newConnector.ID.String()] = c
i.connectorsByName[newConnector.Name] = c

return nil
}

func (i *InMemoryConnectorStore) UpdateConfig(ctx context.Context, connectorID models.ConnectorID, config json.RawMessage) error {
i.mu.Lock()
defer i.mu.Unlock()

i.connectorsByID[connectorID.String()].config = config
return nil
}

func (i *InMemoryConnectorStore) GetConnector(ctx context.Context, connectorID models.ConnectorID) (*models.Connector, error) {
i.mu.RLock()
defer i.mu.RUnlock()

c, ok := i.connectorsByID[connectorID.String()]
if !ok {
return nil, integration.ErrNotFound
Expand Down

1 comment on commit eaa6495

@vercel
Copy link

@vercel vercel bot commented on eaa6495 Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.