From 0a9b8d76558223dd5be1dcea7ff35762ee4f3f40 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Thu, 25 Jan 2024 08:53:24 +0100 Subject: [PATCH] feat(payments): small improvments + worker pool (#1151) --- .../internal/api/connectormodule.go | 2 +- .../adyen/task_standard_webhooks.go | 22 ----- .../atlar/task_fetch_transactions.go | 2 +- .../bankingcircle/task_fetch_payments.go | 2 +- .../currencycloud/task_fetch_transactions.go | 2 +- .../connectors/dummypay/connector_test.go | 6 +- .../connectors/dummypay/task_ingest.go | 2 +- .../dummypay/task_init_directory.go | 4 +- .../mangopay/task_fetch_transactions.go | 2 +- .../modulr/task_fetch_transactions.go | 2 +- .../moneycorp/task_fetch_transactions.go | 2 +- ...sk_fetch_payments_for_connected_account.go | 7 +- .../connectors/wise/task_fetch_transfers.go | 2 +- .../connectors/internal/ingestion/ingester.go | 26 +++--- .../connectors/internal/ingestion/payments.go | 12 --- .../internal/ingestion/payments_test.go | 3 +- .../cmd/connectors/internal/ingestion/task.go | 20 +++++ .../cmd/connectors/internal/task/scheduler.go | 83 +++++++++++++------ .../internal/task/scheduler_test.go | 41 ++++++--- components/payments/go.mod | 1 + components/payments/go.sum | 2 + 21 files changed, 152 insertions(+), 93 deletions(-) create mode 100644 components/payments/cmd/connectors/internal/ingestion/task.go diff --git a/components/payments/cmd/connectors/internal/api/connectormodule.go b/components/payments/cmd/connectors/internal/api/connectormodule.go index 7ce9bc22e4..9dd3868b0f 100644 --- a/components/payments/cmd/connectors/internal/api/connectormodule.go +++ b/components/payments/cmd/connectors/internal/api/connectormodule.go @@ -50,7 +50,7 @@ func addConnector[ConnectorConfig models.ConnectorConfigObject](loader manager.L container := dig.New() if err := container.Provide(func() ingestion.Ingester { - return ingestion.NewDefaultIngester(loader.Name(), descriptor, store, publisher, messages) + return ingestion.NewDefaultIngester(loader.Name(), connectorID, descriptor, store, publisher, messages) }); err != nil { return nil, err } diff --git a/components/payments/cmd/connectors/internal/connectors/adyen/task_standard_webhooks.go b/components/payments/cmd/connectors/internal/connectors/adyen/task_standard_webhooks.go index 1cbb0df860..7fc6dedeb1 100644 --- a/components/payments/cmd/connectors/internal/connectors/adyen/task_standard_webhooks.go +++ b/components/payments/cmd/connectors/internal/connectors/adyen/task_standard_webhooks.go @@ -189,9 +189,7 @@ func handleAuthorisation( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -223,9 +221,7 @@ func handleAuthorisationAdjustment( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -257,9 +253,7 @@ func handleCancellation( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -292,9 +286,7 @@ func handleCapture( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -326,9 +318,7 @@ func handleCaptureFailed( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -363,9 +353,7 @@ func handleRefund( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -402,9 +390,7 @@ func handleRefundedReversed( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -439,9 +425,7 @@ func handleRefundWithData( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -492,9 +476,7 @@ func handlePayoutThirdparty( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -543,9 +525,7 @@ func handlePayoutDecline( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } @@ -594,9 +574,7 @@ func handlePayoutExpire( if err := ingester.IngestPayments( ctx, - connectorID, ingestion.PaymentBatch{{Payment: payment}}, - struct{}{}, ); err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_transactions.go index 916467e468..9c3268adcd 100644 --- a/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/atlar/task_fetch_transactions.go @@ -133,7 +133,7 @@ func ingestPaymentsBatch( batch = append(batch, batchElement) } - if err := ingester.IngestPayments(ctx, connectorID, batch, struct{}{}); err != nil { + if err := ingester.IngestPayments(ctx, batch); err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_payments.go b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_payments.go index f10d8cc4bb..61fab58956 100644 --- a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_fetch_payments.go @@ -127,7 +127,7 @@ func ingestBatch( batch = append(batch, batchElement) } - if err := ingester.IngestPayments(ctx, connectorID, batch, struct{}{}); err != nil { + if err := ingester.IngestPayments(ctx, batch); err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_transactions.go index 2d93e18087..c178b79a73 100644 --- a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_fetch_transactions.go @@ -119,7 +119,7 @@ func ingestTransactions( batch = append(batch, batchElement) } - err = ingester.IngestPayments(ctx, connectorID, batch, struct{}{}) + err = ingester.IngestPayments(ctx, batch) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/dummypay/connector_test.go b/components/payments/cmd/connectors/internal/connectors/dummypay/connector_test.go index a3e74b439e..24a6365c85 100644 --- a/components/payments/cmd/connectors/internal/connectors/dummypay/connector_test.go +++ b/components/payments/cmd/connectors/internal/connectors/dummypay/connector_test.go @@ -86,7 +86,7 @@ func (m *MockIngester) IngestAccounts(ctx context.Context, batch ingestion.Accou return nil } -func (m *MockIngester) IngestPayments(ctx context.Context, connectorID models.ConnectorID, batch ingestion.PaymentBatch, commitState any) error { +func (m *MockIngester) IngestPayments(ctx context.Context, batch ingestion.PaymentBatch) error { return nil } @@ -94,6 +94,10 @@ func (m *MockIngester) IngestBalances(ctx context.Context, batch ingestion.Balan return nil } +func (m *MockIngester) UpdateTaskState(ctx context.Context, state any) error { + return nil +} + func (m *MockIngester) UpdateTransferInitiationPaymentsStatus(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, updatedAt time.Time) error { return nil } diff --git a/components/payments/cmd/connectors/internal/connectors/dummypay/task_ingest.go b/components/payments/cmd/connectors/internal/connectors/dummypay/task_ingest.go index d617ed4b23..416bb882fe 100644 --- a/components/payments/cmd/connectors/internal/connectors/dummypay/task_ingest.go +++ b/components/payments/cmd/connectors/internal/connectors/dummypay/task_ingest.go @@ -70,7 +70,7 @@ func handleFile( return err } - err = ingester.IngestPayments(ctx, connectorID, batch, struct{}{}) + err = ingester.IngestPayments(ctx, batch) if err != nil { return fmt.Errorf("failed to ingest file '%s': %w", descriptor.FileName, err) } diff --git a/components/payments/cmd/connectors/internal/connectors/dummypay/task_init_directory.go b/components/payments/cmd/connectors/internal/connectors/dummypay/task_init_directory.go index 92a6227690..a8c93782c4 100644 --- a/components/payments/cmd/connectors/internal/connectors/dummypay/task_init_directory.go +++ b/components/payments/cmd/connectors/internal/connectors/dummypay/task_init_directory.go @@ -179,7 +179,7 @@ func generatePaymentsFile( if err != nil { return fmt.Errorf("failed to marshal payment: %w", err) } - if err := ingester.IngestPayments(ctx, connectorID, ingestion.PaymentBatch{ + if err := ingester.IngestPayments(ctx, ingestion.PaymentBatch{ { Payment: &models.Payment{ ID: models.PaymentID{ @@ -203,7 +203,7 @@ func generatePaymentsFile( DestinationAccountID: destinationAccountID, }, }, - }, struct{}{}); err != nil { + }); err != nil { return fmt.Errorf("failed to ingest payments: %w", err) } diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_transactions.go index 202914a1a1..301d0904b4 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_fetch_transactions.go @@ -124,7 +124,7 @@ func ingestBatch( batch = append(batch, batchElement) } - return ingester.IngestPayments(ctx, connectorID, batch, struct{}{}) + return ingester.IngestPayments(ctx, batch) } func matchPaymentType(paymentType string) models.PaymentType { diff --git a/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_transactions.go index e883fbbf77..d446f617fa 100644 --- a/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/modulr/task_fetch_transactions.go @@ -64,7 +64,7 @@ func fetchTransactions( return err } - if err := ingester.IngestPayments(ctx, connectorID, batch, struct{}{}); err != nil { + if err := ingester.IngestPayments(ctx, batch); err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_transactions.go b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_transactions.go index 81aedeff9a..3d1db2f376 100644 --- a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_transactions.go +++ b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_fetch_transactions.go @@ -161,7 +161,7 @@ func ingestBatch( batch = append(batch, batchElement) } - return ingester.IngestPayments(ctx, connectorID, batch, struct{}{}) + return ingester.IngestPayments(ctx, batch) } func matchPaymentType(transactionType string, transactionDirection string) (models.PaymentType, bool) { diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments_for_connected_account.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments_for_connected_account.go index 863042093f..52c0134c59 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments_for_connected_account.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_fetch_payments_for_connected_account.go @@ -46,7 +46,12 @@ func ingestBatch( "state": commitState, }).Debugf("updating state") - err := ingester.IngestPayments(ctx, connectorID, batch, commitState) + err := ingester.IngestPayments(ctx, batch) + if err != nil { + return err + } + + err = ingester.UpdateTaskState(ctx, commitState) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_transfers.go b/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_transfers.go index 6fc867a511..b332c787c1 100644 --- a/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_transfers.go +++ b/components/payments/cmd/connectors/internal/connectors/wise/task_fetch_transfers.go @@ -126,7 +126,7 @@ func fetchTransfers( paymentBatch = append(paymentBatch, batchElement) } - if err := ingester.IngestPayments(ctx, connectorID, paymentBatch, struct{}{}); err != nil { + if err := ingester.IngestPayments(ctx, paymentBatch); err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/ingestion/ingester.go b/components/payments/cmd/connectors/internal/ingestion/ingester.go index dd9de00321..79289f3b43 100644 --- a/components/payments/cmd/connectors/internal/ingestion/ingester.go +++ b/components/payments/cmd/connectors/internal/ingestion/ingester.go @@ -13,8 +13,9 @@ import ( type Ingester interface { IngestAccounts(ctx context.Context, batch AccountBatch) error - IngestPayments(ctx context.Context, connectorID models.ConnectorID, batch PaymentBatch, commitState any) error + IngestPayments(ctx context.Context, batch PaymentBatch) error IngestBalances(ctx context.Context, batch BalanceBatch, checkIfAccountExists bool) error + UpdateTaskState(ctx context.Context, state any) error UpdateTransferInitiationPaymentsStatus(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, updatedAt time.Time) error UpdateTransferReversalStatus(ctx context.Context, transfer *models.TransferInitiation, transferReversal *models.TransferReversal) error AddTransferInitiationPaymentID(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, updatedAt time.Time) error @@ -22,11 +23,12 @@ type Ingester interface { } type DefaultIngester struct { - provider models.ConnectorProvider - store Store - descriptor models.TaskDescriptor - publisher message.Publisher - messages *messages.Messages + provider models.ConnectorProvider + connectorID models.ConnectorID + store Store + descriptor models.TaskDescriptor + publisher message.Publisher + messages *messages.Messages } type Store interface { @@ -42,17 +44,19 @@ type Store interface { func NewDefaultIngester( provider models.ConnectorProvider, + connectorID models.ConnectorID, descriptor models.TaskDescriptor, repo Store, publisher message.Publisher, messages *messages.Messages, ) *DefaultIngester { return &DefaultIngester{ - provider: provider, - descriptor: descriptor, - store: repo, - publisher: publisher, - messages: messages, + provider: provider, + connectorID: connectorID, + descriptor: descriptor, + store: repo, + publisher: publisher, + messages: messages, } } diff --git a/components/payments/cmd/connectors/internal/ingestion/payments.go b/components/payments/cmd/connectors/internal/ingestion/payments.go index 8fe031ce75..6dffcf763f 100644 --- a/components/payments/cmd/connectors/internal/ingestion/payments.go +++ b/components/payments/cmd/connectors/internal/ingestion/payments.go @@ -2,7 +2,6 @@ package ingestion import ( "context" - "encoding/json" "fmt" "time" @@ -29,9 +28,7 @@ func (fn IngesterFn) IngestPayments(ctx context.Context, batch PaymentBatch, com func (i *DefaultIngester) IngestPayments( ctx context.Context, - connectorID models.ConnectorID, batch PaymentBatch, - commitState any, ) error { startingAt := time.Now() @@ -62,15 +59,6 @@ func (i *DefaultIngester) IngestPayments( idsInsertedMap[idsInserted[idx].String()] = struct{}{} } - taskState, err := json.Marshal(commitState) - if err != nil { - return fmt.Errorf("error marshaling task state: %w", err) - } - - if err = i.store.UpdateTaskState(ctx, connectorID, i.descriptor, taskState); err != nil { - return fmt.Errorf("error updating task state: %w", err) - } - for paymentIdx := range allPayments { _, ok := idsInsertedMap[allPayments[paymentIdx].ID.String()] if !ok { diff --git a/components/payments/cmd/connectors/internal/ingestion/payments_test.go b/components/payments/cmd/connectors/internal/ingestion/payments_test.go index 88925ad9e1..73d47dd09f 100644 --- a/components/payments/cmd/connectors/internal/ingestion/payments_test.go +++ b/components/payments/cmd/connectors/internal/ingestion/payments_test.go @@ -166,13 +166,14 @@ func TestIngestPayments(t *testing.T) { ingester := NewDefaultIngester( models.ConnectorProviderDummyPay, + connectorID, nil, NewMockStore().WithPaymentIDsNotModified(tc.paymentIDsNotModified), publisher, messages.NewMessages(""), ) - err := ingester.IngestPayments(context.Background(), connectorID, tc.batch, nil) + err := ingester.IngestPayments(context.Background(), tc.batch) publisher.Close() require.NoError(t, err) diff --git a/components/payments/cmd/connectors/internal/ingestion/task.go b/components/payments/cmd/connectors/internal/ingestion/task.go new file mode 100644 index 0000000000..b8588071a1 --- /dev/null +++ b/components/payments/cmd/connectors/internal/ingestion/task.go @@ -0,0 +1,20 @@ +package ingestion + +import ( + "context" + "encoding/json" + "fmt" +) + +func (i *DefaultIngester) UpdateTaskState(ctx context.Context, state any) error { + taskState, err := json.Marshal(state) + if err != nil { + return fmt.Errorf("error marshaling task state: %w", err) + } + + if err = i.store.UpdateTaskState(ctx, i.connectorID, i.descriptor, taskState); err != nil { + return fmt.Errorf("error updating task state: %w", err) + } + + return nil +} diff --git a/components/payments/cmd/connectors/internal/task/scheduler.go b/components/payments/cmd/connectors/internal/task/scheduler.go index a8cd717aac..6720753ce4 100644 --- a/components/payments/cmd/connectors/internal/task/scheduler.go +++ b/components/payments/cmd/connectors/internal/task/scheduler.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/alitto/pond" "github.com/formancehq/payments/cmd/connectors/internal/metrics" "github.com/formancehq/payments/cmd/connectors/internal/storage" "github.com/formancehq/payments/internal/models" @@ -46,9 +47,9 @@ type DefaultTaskScheduler struct { containerFactory ContainerCreateFunc tasks map[string]*taskHolder mu sync.Mutex - maxTasks int resolver Resolver stopped bool + workerPool *pond.WorkerPool } func (s *DefaultTaskScheduler) ListTasks(ctx context.Context, pagination storage.PaginatorQuery) ([]*models.Task, storage.PaginationDetails, error) { @@ -133,15 +134,6 @@ func (s *DefaultTaskScheduler) schedule(ctx context.Context, descriptor models.T // Do nothing } - if s.maxTasks != 0 && len(s.tasks) >= s.maxTasks || s.stopped { - err := s.stackTask(ctx, descriptor) - if err != nil { - return returnErrorFunc(errors.Wrap(err, "stacking task")) - } - - return returnErrorFunc(nil) - } - errChan := s.startTask(ctx, descriptor, options) return errChan @@ -153,6 +145,7 @@ func (s *DefaultTaskScheduler) Shutdown(ctx context.Context) error { s.mu.Unlock() s.logger(ctx).Infof("Stopping scheduler...") + s.workerPool.Stop() for name, task := range s.tasks { task.logger.Debugf("Stopping task") @@ -462,6 +455,16 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. } if e := recover(); e != nil { + switch v := e.(type) { + case error: + if errors.Is(v, pond.ErrSubmitOnStoppedPool) { + // Pool is stopped and task is marked as active, + // nothing to do as they will be restarted on + // next startup + return + } + } + s.registerTaskError(ctx, holder, e) debug.PrintStack() @@ -476,7 +479,16 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. } }() - err = container.Invoke(taskResolver) + done := make(chan struct{}) + s.workerPool.Submit(func() { + defer close(done) + err = container.Invoke(taskResolver) + }) + select { + case <-done: + case <-ctx.Done(): + return + } if err != nil { s.registerTaskError(ctx, holder, err) @@ -512,11 +524,38 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. } }() + processFunc := func() (bool, error) { + done := make(chan struct{}) + s.workerPool.Submit(func() { + defer close(done) + err = container.Invoke(taskResolver) + }) + select { + case <-done: + case <-ctx.Done(): + return true, nil + case ch := <-holder.stopChan: + logger.Infof("Stopping task...") + close(ch) + return true, nil + } + if err != nil { + s.registerTaskError(ctx, holder, err) + return false, err + } + + return false, err + } + // launch it once before starting the ticker - err = container.Invoke(taskResolver) + stop, err := processFunc() if err != nil { - s.registerTaskError(ctx, holder, err) + // error is already registered + return + } + if stop { + // Task is stopped or context is done return } @@ -532,10 +571,14 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. return case <-ticker.C: logger.Infof("Polling trigger, running task...") - err = container.Invoke(taskResolver) + stop, err := processFunc() if err != nil { - s.registerTaskError(ctx, holder, err) + // error is already registered + return + } + if stop { + // Task is stopped or context is done return } } @@ -551,14 +594,6 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. return errChan } -func (s *DefaultTaskScheduler) stackTask(ctx context.Context, descriptor models.TaskDescriptor) error { - s.logger(ctx).WithFields(map[string]interface{}{ - "descriptor": string(descriptor), - }).Infof("Stacking task") - - return s.store.UpdateTaskStatus(ctx, s.connectorID, descriptor, models.TaskStatusPending, "") -} - func (s *DefaultTaskScheduler) logger(ctx context.Context) logging.Logger { return logging.FromContext(ctx).WithFields(map[string]any{ "component": "scheduler", @@ -582,7 +617,7 @@ func NewDefaultScheduler( metricsRegistry: metricsRegistry, tasks: map[string]*taskHolder{}, containerFactory: containerFactory, - maxTasks: maxTasks, resolver: resolver, + workerPool: pond.New(maxTasks, maxTasks), } } diff --git a/components/payments/cmd/connectors/internal/task/scheduler_test.go b/components/payments/cmd/connectors/internal/task/scheduler_test.go index 265c3a5b5e..6cd8f07d4d 100644 --- a/components/payments/cmd/connectors/internal/task/scheduler_test.go +++ b/components/payments/cmd/connectors/internal/task/scheduler_test.go @@ -169,6 +169,9 @@ func TestTaskScheduler(t *testing.T) { descriptor1 := newDescriptor() descriptor2 := newDescriptor() + task1Launched := make(chan struct{}) + task2Launched := make(chan struct{}) + task1Terminated := make(chan struct{}) task2Terminated := make(chan struct{}) @@ -177,6 +180,7 @@ func TestTaskScheduler(t *testing.T) { switch string(descriptor) { case string(descriptor1): return func(ctx context.Context) error { + close(task1Launched) select { case <-task1Terminated: return nil @@ -186,6 +190,7 @@ func TestTaskScheduler(t *testing.T) { } case string(descriptor2): return func(ctx context.Context) error { + close(task2Launched) select { case <-task2Terminated: return nil @@ -206,13 +211,25 @@ func TestTaskScheduler(t *testing.T) { ScheduleOption: models.OPTIONS_RUN_NOW, RestartOption: models.OPTIONS_RESTART_NEVER, })) - require.Eventually(t, TaskActive(store, connectorID, descriptor1), time.Second, 100*time.Millisecond) - require.Eventually(t, TaskPending(store, connectorID, descriptor2), time.Second, 100*time.Millisecond) - close(task1Terminated) - require.Eventually(t, TaskTerminated(store, connectorID, descriptor1), time.Second, 100*time.Millisecond) - require.Eventually(t, TaskActive(store, connectorID, descriptor2), time.Second, 100*time.Millisecond) - close(task2Terminated) - require.Eventually(t, TaskTerminated(store, connectorID, descriptor2), time.Second, 100*time.Millisecond) + + select { + case <-task1Launched: + require.Eventually(t, TaskActive(store, connectorID, descriptor1), time.Second, 100*time.Millisecond) + require.Eventually(t, TaskActive(store, connectorID, descriptor2), time.Second, 100*time.Millisecond) + close(task1Terminated) + require.Eventually(t, TaskTerminated(store, connectorID, descriptor1), time.Second, 100*time.Millisecond) + require.Eventually(t, TaskActive(store, connectorID, descriptor2), time.Second, 100*time.Millisecond) + close(task2Terminated) + require.Eventually(t, TaskTerminated(store, connectorID, descriptor2), time.Second, 100*time.Millisecond) + case <-task2Launched: + require.Eventually(t, TaskActive(store, connectorID, descriptor1), time.Second, 100*time.Millisecond) + require.Eventually(t, TaskActive(store, connectorID, descriptor2), time.Second, 100*time.Millisecond) + close(task2Terminated) + require.Eventually(t, TaskTerminated(store, connectorID, descriptor2), time.Second, 100*time.Millisecond) + require.Eventually(t, TaskActive(store, connectorID, descriptor1), time.Second, 100*time.Millisecond) + close(task1Terminated) + require.Eventually(t, TaskTerminated(store, connectorID, descriptor1), time.Second, 100*time.Millisecond) + } }) t.Run("Stop scheduler", func(t *testing.T) { @@ -238,7 +255,9 @@ func TestTaskScheduler(t *testing.T) { })) } default: - panic("should not be called") + return func() { + + } } }), metrics.NewNoOpMetricsRegistry(), 1) @@ -248,7 +267,9 @@ func TestTaskScheduler(t *testing.T) { })) require.Eventually(t, TaskActive(store, connectorID, mainDescriptor), time.Second, 100*time.Millisecond) require.NoError(t, scheduler.Shutdown(context.TODO())) - require.Eventually(t, TaskTerminated(store, connectorID, mainDescriptor), time.Second, 100*time.Millisecond) - require.Eventually(t, TaskPending(store, connectorID, workerDescriptor), time.Second, 100*time.Millisecond) + // the main task should be still marked as active since it failed to + // schedule the worker task because the scheduler was stopped + require.Eventually(t, TaskActive(store, connectorID, mainDescriptor), time.Second, 100*time.Millisecond) + require.Eventually(t, TaskActive(store, connectorID, workerDescriptor), time.Second, 100*time.Millisecond) }) } diff --git a/components/payments/go.mod b/components/payments/go.mod index 17d9d05981..9ef2d4a095 100644 --- a/components/payments/go.mod +++ b/components/payments/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/ThreeDotsLabs/watermill v1.2.0 github.com/adyen/adyen-go-api-library/v7 v7.3.1 + github.com/alitto/pond v1.8.3 github.com/bombsimon/logrusr/v3 v3.1.0 github.com/davecgh/go-spew v1.1.1 github.com/formancehq/stack/libs/go-libs v0.0.0-20230221161632-e6dc6a89a85e diff --git a/components/payments/go.sum b/components/payments/go.sum index 34c241135b..d39f92ce1f 100644 --- a/components/payments/go.sum +++ b/components/payments/go.sum @@ -629,6 +629,8 @@ github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3 github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs= +github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0=