From b29735c8d84a2fab7ed8ab2e57f82aea8999805a Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Thu, 2 Nov 2023 10:29:29 +0100 Subject: [PATCH] feat(payments): insert all data in search via benthos (#783) --- .../search-benthos-streams.yaml | 87 +++++++++------- .../search-benthos-streams.yaml | 87 +++++++++------- .../search-benthos-streams.yaml | 87 +++++++++------- .../search-benthos-streams.yaml | 87 +++++++++------- .../search-benthos-streams.yaml | 87 +++++++++------- .../deployments-apps-v1/search-benthos.yaml | 2 +- .../search-benthos-streams.yaml | 87 +++++++++------- .../deployments-apps-v1/search-benthos.yaml | 2 +- .../search-benthos-streams.yaml | 87 +++++++++------- .../deployments-apps-v1/search-benthos.yaml | 2 +- .../search-benthos-streams.yaml | 87 +++++++++------- .../deployments-apps-v1/search-benthos.yaml | 2 +- .../search-benthos-streams.yaml | 87 +++++++++------- .../deployments-apps-v1/search-benthos.yaml | 2 +- .../search-benthos-streams.yaml | 87 +++++++++------- .../deployments-apps-v1/search-benthos.yaml | 2 +- .../connectors/internal/api/bank_account.go | 22 ++++- .../cmd/connectors/internal/api/module.go | 2 +- .../cmd/connectors/internal/api/router.go | 12 ++- .../internal/api/transfer_initiation.go | 55 ++++++++++- .../connectors/bankingcircle/task_payments.go | 21 ++-- .../connectors/currencycloud/task_payments.go | 21 ++-- .../connectors/mangopay/task_payments.go | 21 ++-- .../connectors/modulr/task_payments.go | 21 ++-- .../connectors/moneycorp/task_payments.go | 21 ++-- .../connectors/stripe/task_payments.go | 21 ++-- .../internal/connectors/wise/task_payments.go | 22 ++--- .../connectors/internal/ingestion/accounts.go | 14 ++- .../connectors/internal/ingestion/balances.go | 13 ++- .../connectors/internal/ingestion/ingester.go | 4 +- .../internal/ingestion/transfer_initiation.go | 56 ++++++++++- .../connectors/internal/messages/accounts.go | 22 ++--- .../connectors/internal/messages/balances.go | 18 ++-- .../internal/messages/bank_account.go | 44 +++++++++ .../internal/messages/transfer_initiations.go | 91 +++++++++++++++++ components/payments/pkg/events/event.go | 11 ++- .../benthos/streams/payments_deletion.yaml | 99 +++++++++++++++++++ .../benthos/streams/payments_ingestion.yaml | 85 ++++++++++++++++ .../benthos/streams/payments_reset.yaml | 46 --------- components/search/pkg/searchhttp/http.go | 6 ++ 40 files changed, 1116 insertions(+), 514 deletions(-) create mode 100644 components/payments/cmd/connectors/internal/messages/bank_account.go create mode 100644 components/payments/cmd/connectors/internal/messages/transfer_initiations.go create mode 100644 components/search/benthos/streams/payments_deletion.yaml delete mode 100644 components/search/benthos/streams/payments_reset.yaml diff --git a/components/operator/internal/controllers/stack/testdata/monopod-disabled-one-service/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/monopod-disabled-one-service/results/configmaps--v1/search-benthos-streams.yaml index a0aca9e551..83b5de49df 100644 --- a/components/operator/internal/controllers/stack/testdata/monopod-disabled-one-service/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/monopod-disabled-one-service/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/monopod-latest/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/monopod-latest/results/configmaps--v1/search-benthos-streams.yaml index cd1cf2cb63..da6b2d0410 100644 --- a/components/operator/internal/controllers/stack/testdata/monopod-latest/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/monopod-latest/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/configmaps--v1/search-benthos-streams.yaml index 12ce4e8acd..ec8aa42e9b 100644 --- a/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/configmaps--v1/search-benthos-streams.yaml index d520f11517..9a79972027 100644 --- a/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/multipod-debug/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/multipod-debug/results/configmaps--v1/search-benthos-streams.yaml index ff7287e15a..80d4033193 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-debug/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-debug/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/multipod-debug/results/deployments-apps-v1/search-benthos.yaml b/components/operator/internal/controllers/stack/testdata/multipod-debug/results/deployments-apps-v1/search-benthos.yaml index adf1f8d2aa..2f2f97379f 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-debug/results/deployments-apps-v1/search-benthos.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-debug/results/deployments-apps-v1/search-benthos.yaml @@ -20,7 +20,7 @@ spec: template: metadata: annotations: - stack.formance.cloud/volumes-hash: -4rlu3uNH8ndoL_iQtjTJD4IuQ3UBKSeid_RC4H9F2o= + stack.formance.cloud/volumes-hash: khCrZMuTRCyx5vwUixT1Cew6gJOx23gM5YOT74ASPq4= creationTimestamp: null labels: app.kubernetes.io/name: search-benthos diff --git a/components/operator/internal/controllers/stack/testdata/multipod-disabled-one-service/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/multipod-disabled-one-service/results/configmaps--v1/search-benthos-streams.yaml index 0e02483009..1bad994142 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-disabled-one-service/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-disabled-one-service/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/multipod-disabled-one-service/results/deployments-apps-v1/search-benthos.yaml b/components/operator/internal/controllers/stack/testdata/multipod-disabled-one-service/results/deployments-apps-v1/search-benthos.yaml index af27c0d86b..e90e0935d3 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-disabled-one-service/results/deployments-apps-v1/search-benthos.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-disabled-one-service/results/deployments-apps-v1/search-benthos.yaml @@ -20,7 +20,7 @@ spec: template: metadata: annotations: - stack.formance.cloud/volumes-hash: -4rlu3uNH8ndoL_iQtjTJD4IuQ3UBKSeid_RC4H9F2o= + stack.formance.cloud/volumes-hash: khCrZMuTRCyx5vwUixT1Cew6gJOx23gM5YOT74ASPq4= creationTimestamp: null labels: app.kubernetes.io/name: search-benthos diff --git a/components/operator/internal/controllers/stack/testdata/multipod-latest-no-monitoring/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/multipod-latest-no-monitoring/results/configmaps--v1/search-benthos-streams.yaml index ee64418778..38b88900c8 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-latest-no-monitoring/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-latest-no-monitoring/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/multipod-latest-no-monitoring/results/deployments-apps-v1/search-benthos.yaml b/components/operator/internal/controllers/stack/testdata/multipod-latest-no-monitoring/results/deployments-apps-v1/search-benthos.yaml index 31eac2d579..7bdf848f44 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-latest-no-monitoring/results/deployments-apps-v1/search-benthos.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-latest-no-monitoring/results/deployments-apps-v1/search-benthos.yaml @@ -20,7 +20,7 @@ spec: template: metadata: annotations: - stack.formance.cloud/volumes-hash: sU_pRLwTOr6VemxX4J61hd_1Qh_qhyEtlebkPlpuMkA= + stack.formance.cloud/volumes-hash: MyjlVwqIs8tfbl4gluvJtiPorDVVchUfLwjJL9iGTGE= creationTimestamp: null labels: app.kubernetes.io/name: search-benthos diff --git a/components/operator/internal/controllers/stack/testdata/multipod-latest-with-secrets/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/multipod-latest-with-secrets/results/configmaps--v1/search-benthos-streams.yaml index 313def9a4c..d5ab6f4f64 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-latest-with-secrets/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-latest-with-secrets/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/multipod-latest-with-secrets/results/deployments-apps-v1/search-benthos.yaml b/components/operator/internal/controllers/stack/testdata/multipod-latest-with-secrets/results/deployments-apps-v1/search-benthos.yaml index 75117ce075..393603397c 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-latest-with-secrets/results/deployments-apps-v1/search-benthos.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-latest-with-secrets/results/deployments-apps-v1/search-benthos.yaml @@ -20,7 +20,7 @@ spec: template: metadata: annotations: - stack.formance.cloud/volumes-hash: -4rlu3uNH8ndoL_iQtjTJD4IuQ3UBKSeid_RC4H9F2o= + stack.formance.cloud/volumes-hash: khCrZMuTRCyx5vwUixT1Cew6gJOx23gM5YOT74ASPq4= creationTimestamp: null labels: app.kubernetes.io/name: search-benthos diff --git a/components/operator/internal/controllers/stack/testdata/multipod-latest/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/multipod-latest/results/configmaps--v1/search-benthos-streams.yaml index 93c6297ff6..ada80a80b2 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-latest/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-latest/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/multipod-latest/results/deployments-apps-v1/search-benthos.yaml b/components/operator/internal/controllers/stack/testdata/multipod-latest/results/deployments-apps-v1/search-benthos.yaml index a3f08c2f2b..1a09903cfa 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-latest/results/deployments-apps-v1/search-benthos.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-latest/results/deployments-apps-v1/search-benthos.yaml @@ -20,7 +20,7 @@ spec: template: metadata: annotations: - stack.formance.cloud/volumes-hash: -4rlu3uNH8ndoL_iQtjTJD4IuQ3UBKSeid_RC4H9F2o= + stack.formance.cloud/volumes-hash: khCrZMuTRCyx5vwUixT1Cew6gJOx23gM5YOT74ASPq4= creationTimestamp: null labels: app.kubernetes.io/name: search-benthos diff --git a/components/operator/internal/controllers/stack/testdata/multipod-service-annotation/results/configmaps--v1/search-benthos-streams.yaml b/components/operator/internal/controllers/stack/testdata/multipod-service-annotation/results/configmaps--v1/search-benthos-streams.yaml index c7797518a4..0bbccc6d07 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-service-annotation/results/configmaps--v1/search-benthos-streams.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-service-annotation/results/configmaps--v1/search-benthos-streams.yaml @@ -299,50 +299,70 @@ data: output: resource: elasticsearch - payments_ingestion.yaml: | + payments_deletion.yaml: | input: event_bus: topic: payments - consumer_group: search + consumer_group: search-payments-resets pipeline: processors: - switch_event_type: events: - - label: SAVED_PAYMENT + - label: CONNECTOR_RESET processors: - bloblang: | root = { - "data": this.payload, - "indexed": { - "provider": this.payload.provider, - "reference": this.payload.reference, - "scheme": this.payload.scheme, - "type": this.payload.type, - "status": this.payload.status, - "id": this.payload.id, - "initialAmount": this.payload.initialAmount, - "createdAt": this.payload.createdAt - }, - "kind": "PAYMENT", - "when": this.date + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } } - meta action = "index" - meta id = "PAYMENT-%s".format(this.payload.id) - - output: - resource: elasticsearch - payments_reset.yaml: | - input: - event_bus: - topic: payments - consumer_group: search-payments-resets - pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET + - label: DELETED_TRANSFER_INITIATION processors: - bloblang: | root = { @@ -351,12 +371,12 @@ data: "must": [ { "match": { - "kind": "PAYMENT" + "kind": "PAYMENT_TRANSFER_INITIATION" } }, { "match": { - "indexed.provider": this.payload.connector + "indexed.id": this.payload.id } }, { @@ -379,6 +399,7 @@ data: enabled: ${BASIC_AUTH_ENABLED} username: ${BASIC_AUTH_USERNAME} password: ${BASIC_AUTH_PASSWORD} + payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n" kind: ConfigMap metadata: labels: diff --git a/components/operator/internal/controllers/stack/testdata/multipod-service-annotation/results/deployments-apps-v1/search-benthos.yaml b/components/operator/internal/controllers/stack/testdata/multipod-service-annotation/results/deployments-apps-v1/search-benthos.yaml index edd34852d4..0bdf4971fd 100644 --- a/components/operator/internal/controllers/stack/testdata/multipod-service-annotation/results/deployments-apps-v1/search-benthos.yaml +++ b/components/operator/internal/controllers/stack/testdata/multipod-service-annotation/results/deployments-apps-v1/search-benthos.yaml @@ -20,7 +20,7 @@ spec: template: metadata: annotations: - stack.formance.cloud/volumes-hash: sU_pRLwTOr6VemxX4J61hd_1Qh_qhyEtlebkPlpuMkA= + stack.formance.cloud/volumes-hash: MyjlVwqIs8tfbl4gluvJtiPorDVVchUfLwjJL9iGTGE= creationTimestamp: null labels: app.kubernetes.io/name: search-benthos diff --git a/components/payments/cmd/connectors/internal/api/bank_account.go b/components/payments/cmd/connectors/internal/api/bank_account.go index fd9914dbae..b7749e6d41 100644 --- a/components/payments/cmd/connectors/internal/api/bank_account.go +++ b/components/payments/cmd/connectors/internal/api/bank_account.go @@ -6,8 +6,12 @@ import ( "net/http" "time" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/formancehq/payments/cmd/connectors/internal/messages" "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" "github.com/formancehq/stack/libs/go-libs/api" + "github.com/formancehq/stack/libs/go-libs/publish" "github.com/google/uuid" "github.com/pkg/errors" ) @@ -39,7 +43,10 @@ type createBankAccountRequest struct { Name string `json:"name"` } -func createBankAccountHandler(repo createBankAccountRepository) http.HandlerFunc { +func createBankAccountHandler( + repo createBankAccountRepository, + publisher message.Publisher, +) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -128,6 +135,19 @@ func createBankAccountHandler(repo createBankAccountRepository) http.HandlerFunc handleStorageErrors(w, r, err) return } + + bankAccount.AccountID = &accountID + } + + if err := publisher.Publish( + events.TopicPayments, + publish.NewMessage( + r.Context(), + messages.NewEventSavedBankAccounts(bankAccount), + ), + ); err != nil { + api.InternalServerError(w, r, err) + return } data := &bankAccountResponse{ diff --git a/components/payments/cmd/connectors/internal/api/module.go b/components/payments/cmd/connectors/internal/api/module.go index 0b8aad7df7..855a6667f8 100644 --- a/components/payments/cmd/connectors/internal/api/module.go +++ b/components/payments/cmd/connectors/internal/api/module.go @@ -43,7 +43,7 @@ func HTTPModule(serviceInfo api.ServiceInfo, bind string) fx.Option { lc.Append(httpserver.NewHook(bind, m)) }), fx.Supply(serviceInfo), - fx.Provide(fx.Annotate(httpRouter, fx.ParamTags(``, ``, ``, `group:"connectorHandlers"`))), + fx.Provide(fx.Annotate(httpRouter, fx.ParamTags(``, ``, ``, ``, `group:"connectorHandlers"`))), addConnector[dummypay.Config](dummypay.NewLoader()), addConnector[modulr.Config](modulr.NewLoader()), addConnector[stripe.Config](stripe.NewLoader()), diff --git a/components/payments/cmd/connectors/internal/api/router.go b/components/payments/cmd/connectors/internal/api/router.go index eeca0ba688..5ca62da6ba 100644 --- a/components/payments/cmd/connectors/internal/api/router.go +++ b/components/payments/cmd/connectors/internal/api/router.go @@ -3,6 +3,7 @@ package api import ( "net/http" + "github.com/ThreeDotsLabs/watermill/message" "github.com/formancehq/payments/cmd/connectors/internal/integration" "github.com/formancehq/payments/cmd/connectors/internal/storage" "github.com/formancehq/payments/internal/models" @@ -17,6 +18,7 @@ func httpRouter( logger logging.Logger, store *storage.Storage, serviceInfo api.ServiceInfo, + publisher message.Publisher, connectorHandlers []connectorHandler, ) (*mux.Router, error) { rootMux := mux.NewRouter() @@ -46,16 +48,16 @@ func httpRouter( authGroup := subRouter.Name("authenticated").Subrouter() - authGroup.Path("/bank-accounts").Methods(http.MethodPost).Handler(createBankAccountHandler(store)) + authGroup.Path("/bank-accounts").Methods(http.MethodPost).Handler(createBankAccountHandler(store, publisher)) paymentsHandlers := make(map[models.ConnectorProvider]paymentHandler) for _, h := range connectorHandlers { paymentsHandlers[h.Provider] = h.initiatePayment } - authGroup.Path("/transfer-initiations").Methods(http.MethodPost).Handler(createTransferInitiationHandler(store, paymentsHandlers)) - authGroup.Path("/transfer-initiations/{transferID}/status").Methods(http.MethodPost).Handler(updateTransferInitiationStatusHandler(store, paymentsHandlers)) - authGroup.Path("/transfer-initiations/{transferID}/retry").Methods(http.MethodPost).Handler(retryTransferInitiationHandler(store, paymentsHandlers)) - authGroup.Path("/transfer-initiations/{transferID}").Methods(http.MethodDelete).Handler(deleteTransferInitiationHandler(store)) + authGroup.Path("/transfer-initiations").Methods(http.MethodPost).Handler(createTransferInitiationHandler(store, publisher, paymentsHandlers)) + authGroup.Path("/transfer-initiations/{transferID}/status").Methods(http.MethodPost).Handler(updateTransferInitiationStatusHandler(store, publisher, paymentsHandlers)) + authGroup.Path("/transfer-initiations/{transferID}/retry").Methods(http.MethodPost).Handler(retryTransferInitiationHandler(store, publisher, paymentsHandlers)) + authGroup.Path("/transfer-initiations/{transferID}").Methods(http.MethodDelete).Handler(deleteTransferInitiationHandler(store, publisher)) authGroup.HandleFunc("/connectors", readConnectorsHandler(store)) diff --git a/components/payments/cmd/connectors/internal/api/transfer_initiation.go b/components/payments/cmd/connectors/internal/api/transfer_initiation.go index 95313d4f7d..c185271cd9 100644 --- a/components/payments/cmd/connectors/internal/api/transfer_initiation.go +++ b/components/payments/cmd/connectors/internal/api/transfer_initiation.go @@ -8,8 +8,12 @@ import ( "net/http" "time" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/formancehq/payments/cmd/connectors/internal/messages" "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" "github.com/formancehq/stack/libs/go-libs/api" + "github.com/formancehq/stack/libs/go-libs/publish" "github.com/gorilla/mux" "github.com/pkg/errors" ) @@ -95,6 +99,7 @@ type createTransferInitiationRepository interface { func createTransferInitiationHandler( repo createTransferInitiationRepository, + publisher message.Publisher, paymentHandlers map[models.ConnectorProvider]paymentHandler, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { @@ -165,7 +170,17 @@ func createTransferInitiationHandler( if err := repo.CreateTransferInitiation(r.Context(), tf); err != nil { handleStorageErrors(w, r, err) + return + } + if err := publisher.Publish( + events.TopicPayments, + publish.NewMessage( + r.Context(), + messages.NewEventSavedTransferInitiations(tf), + ), + ); err != nil { + api.InternalServerError(w, r, err) return } @@ -221,6 +236,7 @@ type updateTransferInitiationStatusRequest struct { func updateTransferInitiationStatusHandler( repo udateTransferInitiationStatusRepository, + publisher message.Publisher, paymentHandlers map[models.ConnectorProvider]paymentHandler, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { @@ -273,6 +289,17 @@ func updateTransferInitiationStatusHandler( return } + if err := publisher.Publish( + events.TopicPayments, + publish.NewMessage( + r.Context(), + messages.NewEventSavedTransferInitiations(previousTransferInitiation), + ), + ); err != nil { + api.InternalServerError(w, r, err) + return + } + if status == models.TransferInitiationStatusValidated { f, ok := paymentHandlers[previousTransferInitiation.Provider] if !ok { @@ -298,6 +325,7 @@ type retryTransferInitiationRepository interface { func retryTransferInitiationHandler( repo retryTransferInitiationRepository, + publisher message.Publisher, paymentHandlers map[models.ConnectorProvider]paymentHandler, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { @@ -326,6 +354,17 @@ func retryTransferInitiationHandler( return } + if err := publisher.Publish( + events.TopicPayments, + publish.NewMessage( + r.Context(), + messages.NewEventSavedTransferInitiations(previousTransferInitiation), + ), + ); err != nil { + api.InternalServerError(w, r, err) + return + } + f, ok := paymentHandlers[previousTransferInitiation.Provider] if !ok { api.InternalServerError(w, r, errors.New("no payment handler for provider "+previousTransferInitiation.Provider.String())) @@ -347,7 +386,10 @@ type deleteTransferInitiationRepository interface { DeleteTransferInitiation(ctx context.Context, id models.TransferInitiationID) error } -func deleteTransferInitiationHandler(repo deleteTransferInitiationRepository) http.HandlerFunc { +func deleteTransferInitiationHandler( + repo deleteTransferInitiationRepository, + publisher message.Publisher, +) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { transferID, err := models.TransferInitiationIDFromString(mux.Vars(r)["transferID"]) if err != nil { @@ -373,6 +415,17 @@ func deleteTransferInitiationHandler(repo deleteTransferInitiationRepository) ht return } + if err := publisher.Publish( + events.TopicPayments, + publish.NewMessage( + r.Context(), + messages.NewEventDeleteTransferInitiation(tf.ID), + ), + ); err != nil { + api.InternalServerError(w, r, err) + return + } + w.WriteHeader(http.StatusNoContent) } } diff --git a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_payments.go b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_payments.go index db2a882a9c..d396eb6bab 100644 --- a/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/bankingcircle/task_payments.go @@ -35,28 +35,25 @@ func taskInitiatePayment(logger logging.Logger, bankingCircleClient *client.Clie logger.Info("initiate payment for transfer-initiation %s", transferID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) + transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) + if err != nil { + return err + } attrs := metric.WithAttributes(connectorAttrs...) - var err error var paymentID *models.PaymentID defer func() { if err != nil { ctx, cancel := contextutil.Detached(ctx) defer cancel() metricsRegistry.ConnectorObjectsErrors().Add(ctx, 1, attrs) - if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, err.Error(), 0, time.Now()); err != nil { + if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, err.Error(), transfer.Attempts, time.Now()); err != nil { logger.Error("failed to update transfer initiation status: %v", err) } } }() - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessing, "", 0, time.Now()) - if err != nil { - return err - } - - var transfer *models.TransferInitiation - transfer, err = getTransfer(ctx, storageReader, transferInitiationID, true) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessing, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -193,7 +190,7 @@ func taskInitiatePayment(logger logging.Logger, bankingCircleClient *client.Clie }, Provider: models.ConnectorProviderBankingCircle, } - err = ingester.AddTransferInitiationPaymentID(ctx, transferInitiationID, paymentID, time.Now()) + err = ingester.AddTransferInitiationPaymentID(ctx, transfer, paymentID, time.Now()) if err != nil { return err } @@ -306,7 +303,7 @@ func taskUpdatePaymentStatus( return err } case "Processed": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -315,7 +312,7 @@ func taskUpdatePaymentStatus( case "Unknown", "ScaExpired", "ScaFailed", "MissingFunding", "PendingCancellation", "PendingCancellationApproval", "DeclinedByApprover", "Rejected", "Cancelled", "Reversed", "ScaDeclined": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, "", transfer.Attempts, time.Now()) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_payments.go b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_payments.go index 33465408f7..db22fe25b5 100644 --- a/components/payments/cmd/connectors/internal/connectors/currencycloud/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/currencycloud/task_payments.go @@ -36,28 +36,25 @@ func taskInitiatePayment(logger logging.Logger, currencyCloudClient *client.Clie logger.Info("initiate payment for transfer-initiation %s", transferID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) + transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) + if err != nil { + return err + } attrs := metric.WithAttributes(connectorAttrs...) - var err error var paymentID *models.PaymentID defer func() { if err != nil { ctx, cancel := contextutil.Detached(ctx) defer cancel() metricsRegistry.ConnectorObjectsErrors().Add(ctx, 1, attrs) - if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, err.Error(), 0, time.Now()); err != nil { + if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, err.Error(), transfer.Attempts, time.Now()); err != nil { logger.Error("failed to update transfer initiation status: %v", err) } } }() - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessing, "", 0, time.Now()) - if err != nil { - return err - } - - var transfer *models.TransferInitiation - transfer, err = getTransfer(ctx, storageReader, transferInitiationID, true) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessing, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -151,7 +148,7 @@ func taskInitiatePayment(logger logging.Logger, currencyCloudClient *client.Clie }, Provider: models.ConnectorProviderCurrencyCloud, } - err = ingester.AddTransferInitiationPaymentID(ctx, transferInitiationID, paymentID, time.Now()) + err = ingester.AddTransferInitiationPaymentID(ctx, transfer, paymentID, time.Now()) if err != nil { return err } @@ -267,14 +264,14 @@ func taskUpdatePaymentStatus( return err } case "completed": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now()) if err != nil { return err } return nil case "cancelled": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, resultMessage, 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, resultMessage, transfer.Attempts, time.Now()) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/mangopay/task_payments.go b/components/payments/cmd/connectors/internal/connectors/mangopay/task_payments.go index aa6758f914..70b0493be0 100644 --- a/components/payments/cmd/connectors/internal/connectors/mangopay/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/mangopay/task_payments.go @@ -37,28 +37,25 @@ func taskInitiatePayment(logger logging.Logger, mangopayClient *client.Client, t logger.Info("initiate payment for transfer-initiation %s", transferID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) + transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) + if err != nil { + return err + } attrs := metric.WithAttributes(connectorAttrs...) - var err error var paymentID *models.PaymentID defer func() { if err != nil { ctx, cancel := contextutil.Detached(ctx) defer cancel() metricsRegistry.ConnectorObjectsErrors().Add(ctx, 1, attrs) - if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, err.Error(), 0, time.Now()); err != nil { + if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, err.Error(), transfer.Attempts, time.Now()); err != nil { logger.Error("failed to update transfer initiation status: %v", err) } } }() - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessing, "", 0, time.Now()) - if err != nil { - return err - } - - var transfer *models.TransferInitiation - transfer, err = getTransfer(ctx, storageReader, transferInitiationID, true) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessing, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -162,7 +159,7 @@ func taskInitiatePayment(logger logging.Logger, mangopayClient *client.Client, t }, Provider: models.ConnectorProviderMangopay, } - err = ingester.AddTransferInitiationPaymentID(ctx, transferInitiationID, paymentID, time.Now()) + err = ingester.AddTransferInitiationPaymentID(ctx, transfer, paymentID, time.Now()) if err != nil { return err } @@ -278,14 +275,14 @@ func taskUpdatePaymentStatus( return err } case "SUCCEEDED": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now()) if err != nil { return err } return nil case "FAILED": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, resultMessage, 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, resultMessage, transfer.Attempts, time.Now()) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/modulr/task_payments.go b/components/payments/cmd/connectors/internal/connectors/modulr/task_payments.go index 64c248741c..1175721c66 100644 --- a/components/payments/cmd/connectors/internal/connectors/modulr/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/modulr/task_payments.go @@ -37,28 +37,25 @@ func taskInitiatePayment(logger logging.Logger, modulrClient *client.Client, tra logger.Info("initiate payment for transfer-initiation %s", transferID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) + transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) + if err != nil { + return err + } attrs := metric.WithAttributes(connectorAttrs...) - var err error var paymentID *models.PaymentID defer func() { if err != nil { ctx, cancel := contextutil.Detached(ctx) defer cancel() metricsRegistry.ConnectorObjectsErrors().Add(ctx, 1, attrs) - if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, err.Error(), 0, time.Now()); err != nil { + if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, err.Error(), transfer.Attempts, time.Now()); err != nil { logger.Error("failed to update transfer initiation status: %v", err) } } }() - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessing, "", 0, time.Now()) - if err != nil { - return err - } - - var transfer *models.TransferInitiation - transfer, err = getTransfer(ctx, storageReader, transferInitiationID, true) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessing, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -155,7 +152,7 @@ func taskInitiatePayment(logger logging.Logger, modulrClient *client.Client, tra }, Provider: models.ConnectorProviderMoneycorp, } - err = ingester.AddTransferInitiationPaymentID(ctx, transferInitiationID, paymentID, time.Now()) + err = ingester.AddTransferInitiationPaymentID(ctx, transfer, paymentID, time.Now()) if err != nil { return err } @@ -271,14 +268,14 @@ func taskUpdatePaymentStatus( return err } case "EXT_PROC", "PROCESSED", "RECONCILED": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now()) if err != nil { return err } return nil default: - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, resultMessage, 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, resultMessage, transfer.Attempts, time.Now()) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_payments.go b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_payments.go index 204e0ffe09..210b52d67b 100644 --- a/components/payments/cmd/connectors/internal/connectors/moneycorp/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/moneycorp/task_payments.go @@ -37,28 +37,25 @@ func taskInitiatePayment(logger logging.Logger, moneycorpClient *client.Client, logger.Info("initiate payment for transfer-initiation %s", transferID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) + transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) + if err != nil { + return err + } attrs := metric.WithAttributes(connectorAttrs...) - var err error var paymentID *models.PaymentID defer func() { if err != nil { ctx, cancel := contextutil.Detached(ctx) defer cancel() metricsRegistry.ConnectorObjectsErrors().Add(ctx, 1, attrs) - if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, err.Error(), 0, time.Now()); err != nil { + if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, err.Error(), transfer.Attempts, time.Now()); err != nil { logger.Error("failed to update transfer initiation status: %v", err) } } }() - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessing, "", 0, time.Now()) - if err != nil { - return err - } - - var transfer *models.TransferInitiation - transfer, err = getTransfer(ctx, storageReader, transferInitiationID, true) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessing, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -149,7 +146,7 @@ func taskInitiatePayment(logger logging.Logger, moneycorpClient *client.Client, }, Provider: models.ConnectorProviderMoneycorp, } - err = ingester.AddTransferInitiationPaymentID(ctx, transferInitiationID, paymentID, time.Now()) + err = ingester.AddTransferInitiationPaymentID(ctx, transfer, paymentID, time.Now()) if err != nil { return err } @@ -263,14 +260,14 @@ func taskUpdatePaymentStatus( return err } case "Cleared", "Sent": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now()) if err != nil { return err } return nil case "Unauthorised", "Failed", "Cancelled": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, resultMessage, 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, resultMessage, transfer.Attempts, time.Now()) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/stripe/task_payments.go b/components/payments/cmd/connectors/internal/connectors/stripe/task_payments.go index 9224bb7505..512b32eb28 100644 --- a/components/payments/cmd/connectors/internal/connectors/stripe/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/stripe/task_payments.go @@ -36,28 +36,25 @@ func InitiatePaymentTask(logger logging.Logger, transferID string, stripeClient logger.Info("initiate payment for transfer-initiation %s", transferID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) + transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) + if err != nil { + return err + } attrs := metric.WithAttributes(connectorAttrs...) - var err error var paymentID *models.PaymentID defer func() { if err != nil { ctx, cancel := contextutil.Detached(ctx) defer cancel() metricsRegistry.ConnectorObjectsErrors().Add(ctx, 1, attrs) - if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, err.Error(), 0, time.Now()); err != nil { + if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, err.Error(), transfer.Attempts, time.Now()); err != nil { logger.Error("failed to update transfer initiation status: %v", err) } } }() - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessing, "", 0, time.Now()) - if err != nil { - return err - } - - var transfer *models.TransferInitiation - transfer, err = getTransfer(ctx, storageReader, transferInitiationID, true) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessing, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -139,7 +136,7 @@ func InitiatePaymentTask(logger logging.Logger, transferID string, stripeClient }, Provider: models.ConnectorProviderStripe, } - err = ingester.AddTransferInitiationPaymentID(ctx, transferInitiationID, paymentID, time.Now()) + err = ingester.AddTransferInitiationPaymentID(ctx, transfer, paymentID, time.Now()) if err != nil { return err } @@ -230,7 +227,7 @@ func UpdatePaymentStatusTask( } if status == "" { - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -238,7 +235,7 @@ func UpdatePaymentStatusTask( return nil } - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, resultMessage, 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, resultMessage, transfer.Attempts, time.Now()) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/connectors/wise/task_payments.go b/components/payments/cmd/connectors/internal/connectors/wise/task_payments.go index c2aa50c375..b4a9a05755 100644 --- a/components/payments/cmd/connectors/internal/connectors/wise/task_payments.go +++ b/components/payments/cmd/connectors/internal/connectors/wise/task_payments.go @@ -38,28 +38,26 @@ func taskInitiatePayment(logger logging.Logger, wiseClient *client.Client, trans logger.Info("initiate payment for transfer-initiation %s", transferID) transferInitiationID := models.MustTransferInitiationIDFromString(transferID) + transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true) + if err != nil { + return err + } attrs := metric.WithAttributes(connectorAttrs...) - var err error var paymentID *models.PaymentID defer func() { if err != nil { ctx, cancel := contextutil.Detached(ctx) defer cancel() metricsRegistry.ConnectorObjectsErrors().Add(ctx, 1, attrs) - if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, err.Error(), 0, time.Now()); err != nil { + transfer.Status = models.TransferInitiationStatusFailed + if err := ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, err.Error(), transfer.Attempts, time.Now()); err != nil { logger.Error("failed to update transfer initiation status: %v", err) } } }() - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessing, "", 0, time.Now()) - if err != nil { - return err - } - - var transfer *models.TransferInitiation - transfer, err = getTransfer(ctx, storageReader, transferInitiationID, true) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessing, "", transfer.Attempts, time.Now()) if err != nil { return err } @@ -152,7 +150,7 @@ func taskInitiatePayment(logger logging.Logger, wiseClient *client.Client, trans }, Provider: models.ConnectorProviderWise, } - err = ingester.AddTransferInitiationPaymentID(ctx, transferInitiationID, paymentID, time.Now()) + err = ingester.AddTransferInitiationPaymentID(ctx, transfer, paymentID, time.Now()) if err != nil { return err } @@ -270,14 +268,14 @@ func taskUpdatePaymentStatus( return err } case "outgoing_payment_sent", "funds_refunded": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusProcessed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now()) if err != nil { return err } return nil case "charged_back", "cancelled": - err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transferInitiationID, paymentID, models.TransferInitiationStatusFailed, "", 0, time.Now()) + err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusFailed, "", transfer.Attempts, time.Now()) if err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/ingestion/accounts.go b/components/payments/cmd/connectors/internal/ingestion/accounts.go index 82ce4a6b88..cc942f0e1c 100644 --- a/components/payments/cmd/connectors/internal/ingestion/accounts.go +++ b/components/payments/cmd/connectors/internal/ingestion/accounts.go @@ -32,10 +32,16 @@ func (i *DefaultIngester) IngestAccounts(ctx context.Context, batch AccountBatch return fmt.Errorf("error upserting accounts: %w", err) } - err := i.publisher.Publish(events.TopicPayments, - publish.NewMessage(ctx, messages.NewEventSavedAccounts(batch))) - if err != nil { - logging.FromContext(ctx).Errorf("Publishing message: %w", err) + for _, account := range batch { + if err := i.publisher.Publish( + events.TopicPayments, + publish.NewMessage( + ctx, + messages.NewEventSavedAccounts(account), + ), + ); err != nil { + logging.FromContext(ctx).Errorf("Publishing message: %w", err) + } } endedAt := time.Now() diff --git a/components/payments/cmd/connectors/internal/ingestion/balances.go b/components/payments/cmd/connectors/internal/ingestion/balances.go index bca235cef9..ac980b661f 100644 --- a/components/payments/cmd/connectors/internal/ingestion/balances.go +++ b/components/payments/cmd/connectors/internal/ingestion/balances.go @@ -32,9 +32,16 @@ func (i *DefaultIngester) IngestBalances(ctx context.Context, batch BalanceBatch return fmt.Errorf("error inserting balances: %w", err) } - if err := i.publisher.Publish(events.TopicPayments, - publish.NewMessage(ctx, messages.NewEventSavedBalances(batch))); err != nil { - logging.FromContext(ctx).Errorf("Publishing message: %w", err) + for _, balance := range batch { + if err := i.publisher.Publish( + events.TopicPayments, + publish.NewMessage( + ctx, + messages.NewEventSavedBalances(balance, i.provider), + ), + ); err != nil { + logging.FromContext(ctx).Errorf("Publishing message: %w", err) + } } endedAt := time.Now() diff --git a/components/payments/cmd/connectors/internal/ingestion/ingester.go b/components/payments/cmd/connectors/internal/ingestion/ingester.go index bf5305b648..9c53ce18c9 100644 --- a/components/payments/cmd/connectors/internal/ingestion/ingester.go +++ b/components/payments/cmd/connectors/internal/ingestion/ingester.go @@ -13,8 +13,8 @@ type Ingester interface { IngestAccounts(ctx context.Context, batch AccountBatch) error IngestPayments(ctx context.Context, batch PaymentBatch, commitState any) error IngestBalances(ctx context.Context, batch BalanceBatch, checkIfAccountExists bool) error - UpdateTransferInitiationPaymentsStatus(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, attempts int, updatedAt time.Time) error - AddTransferInitiationPaymentID(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, updatedAt time.Time) error + UpdateTransferInitiationPaymentsStatus(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, attempts int, updatedAt time.Time) error + AddTransferInitiationPaymentID(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, updatedAt time.Time) error } type DefaultIngester struct { diff --git a/components/payments/cmd/connectors/internal/ingestion/transfer_initiation.go b/components/payments/cmd/connectors/internal/ingestion/transfer_initiation.go index 86e8e5e1d8..68577dceb4 100644 --- a/components/payments/cmd/connectors/internal/ingestion/transfer_initiation.go +++ b/components/payments/cmd/connectors/internal/ingestion/transfer_initiation.go @@ -2,15 +2,63 @@ package ingestion import ( "context" + "fmt" "time" + "github.com/formancehq/payments/cmd/connectors/internal/messages" "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/go-libs/publish" ) -func (i *DefaultIngester) UpdateTransferInitiationPaymentsStatus(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, attempts int, updatedAt time.Time) error { - return i.repo.UpdateTransferInitiationPaymentsStatus(ctx, id, paymentID, status, errorMessage, attempts, updatedAt) +func (i *DefaultIngester) UpdateTransferInitiationPaymentsStatus(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, attempts int, updatedAt time.Time) error { + tf.Status = status + tf.Error = errorMessage + tf.Attempts = attempts + tf.UpdatedAt = updatedAt + + if err := i.repo.UpdateTransferInitiationPaymentsStatus(ctx, tf.ID, paymentID, tf.Status, tf.Error, tf.Attempts, tf.UpdatedAt); err != nil { + return err + } + + if err := i.publisher.Publish( + events.TopicPayments, + publish.NewMessage( + ctx, + messages.NewEventSavedTransferInitiations(tf), + ), + ); err != nil { + return err + } + + return nil } -func (i *DefaultIngester) AddTransferInitiationPaymentID(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, updatedAt time.Time) error { - return i.repo.AddTransferInitiationPaymentID(ctx, id, paymentID, updatedAt) +func (i *DefaultIngester) AddTransferInitiationPaymentID(ctx context.Context, tf *models.TransferInitiation, paymentID *models.PaymentID, updatedAt time.Time) error { + if paymentID == nil { + return fmt.Errorf("payment id is nil") + } + + tf.RelatedPayments = append(tf.RelatedPayments, &models.TransferInitiationPayments{ + TransferInitiationID: tf.ID, + PaymentID: *paymentID, + CreatedAt: updatedAt, + Status: models.TransferInitiationStatusProcessing, + }) + + if err := i.repo.AddTransferInitiationPaymentID(ctx, tf.ID, paymentID, updatedAt); err != nil { + return err + } + + if err := i.publisher.Publish( + events.TopicPayments, + publish.NewMessage( + ctx, + messages.NewEventSavedTransferInitiations(tf), + ), + ); err != nil { + return err + } + + return nil } diff --git a/components/payments/cmd/connectors/internal/messages/accounts.go b/components/payments/cmd/connectors/internal/messages/accounts.go index c73f0a107f..df872d9276 100644 --- a/components/payments/cmd/connectors/internal/messages/accounts.go +++ b/components/payments/cmd/connectors/internal/messages/accounts.go @@ -17,19 +17,15 @@ type accountMessagePayload struct { Type string `json:"type"` } -func NewEventSavedAccounts(accounts []*models.Account) events.EventMessage { - payload := make([]accountMessagePayload, len(accounts)) - - for accountIdx, account := range accounts { - payload[accountIdx] = accountMessagePayload{ - ID: account.ID.String(), - CreatedAt: account.CreatedAt, - Reference: account.Reference, - Provider: account.Provider.String(), - DefaultAsset: account.DefaultAsset.String(), - AccountName: account.AccountName, - Type: string(account.Type), - } +func NewEventSavedAccounts(account *models.Account) events.EventMessage { + payload := accountMessagePayload{ + ID: account.ID.String(), + CreatedAt: account.CreatedAt, + Reference: account.Reference, + Provider: account.Provider.String(), + DefaultAsset: account.DefaultAsset.String(), + AccountName: account.AccountName, + Type: string(account.Type), } return events.EventMessage{ diff --git a/components/payments/cmd/connectors/internal/messages/balances.go b/components/payments/cmd/connectors/internal/messages/balances.go index 2b1aef2347..f97df45abe 100644 --- a/components/payments/cmd/connectors/internal/messages/balances.go +++ b/components/payments/cmd/connectors/internal/messages/balances.go @@ -10,21 +10,19 @@ import ( type balanceMessagePayload struct { CreatedAt time.Time `json:"createdAt"` + Provider string `json:"provider"` AccountID string `json:"accountID"` Asset string `json:"asset"` Balance *big.Int `json:"balance"` } -func NewEventSavedBalances(balances []*models.Balance) events.EventMessage { - payload := make([]balanceMessagePayload, len(balances)) - - for balanceIdx, balance := range balances { - payload[balanceIdx] = balanceMessagePayload{ - CreatedAt: balance.CreatedAt, - AccountID: balance.AccountID.String(), - Asset: balance.Asset.String(), - Balance: balance.Balance, - } +func NewEventSavedBalances(balance *models.Balance, provider models.ConnectorProvider) events.EventMessage { + payload := balanceMessagePayload{ + CreatedAt: balance.CreatedAt, + Provider: provider.String(), + AccountID: balance.AccountID.String(), + Asset: balance.Asset.String(), + Balance: balance.Balance, } return events.EventMessage{ diff --git a/components/payments/cmd/connectors/internal/messages/bank_account.go b/components/payments/cmd/connectors/internal/messages/bank_account.go new file mode 100644 index 0000000000..4f6724ca0e --- /dev/null +++ b/components/payments/cmd/connectors/internal/messages/bank_account.go @@ -0,0 +1,44 @@ +package messages + +import ( + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" +) + +type bankAccountMessagePayload struct { + ID string `json:"id"` + CreatedAt time.Time `json:"createdAt"` + Provider string `json:"provider"` + Name string `json:"name"` + AccountNumber string `json:"accountNumber"` + IBAN string `json:"iban"` + SwiftBicCode string `json:"swiftBicCode"` + Country string `json:"country"` + AccountID string `json:"accountId"` +} + +func NewEventSavedBankAccounts(bankAccount *models.BankAccount) events.EventMessage { + bankAccount.Offuscate() + + payload := bankAccountMessagePayload{ + ID: bankAccount.ID.String(), + CreatedAt: bankAccount.CreatedAt, + Provider: bankAccount.Provider.String(), + Name: bankAccount.Name, + AccountNumber: bankAccount.AccountNumber, + IBAN: bankAccount.IBAN, + SwiftBicCode: bankAccount.SwiftBicCode, + Country: bankAccount.Country, + AccountID: bankAccount.AccountID.String(), + } + + return events.EventMessage{ + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeSavedBankAccount, + Payload: payload, + } +} diff --git a/components/payments/cmd/connectors/internal/messages/transfer_initiations.go b/components/payments/cmd/connectors/internal/messages/transfer_initiations.go new file mode 100644 index 0000000000..2f4d5359f1 --- /dev/null +++ b/components/payments/cmd/connectors/internal/messages/transfer_initiations.go @@ -0,0 +1,91 @@ +package messages + +import ( + "math/big" + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" +) + +type transferInitiationsPaymentsMessagePayload struct { + TransferInitiationID string `json:"transferInitiationId"` + PaymentID string `json:"paymentId"` + CreatedAt time.Time `json:"createdAt"` + Status string `json:"status"` + Error string `json:"error"` +} + +type transferInitiationsMessagePayload struct { + ID string `json:"id"` + CreatedAt time.Time `json:"createdAt"` + ScheduleAt time.Time `json:"scheduledAt"` + UpdatedAt time.Time `json:"updatedAt"` + Provider string `json:"provider"` + Description string `json:"description"` + Type string `json:"type"` + SourceAccountID string `json:"sourceAccountId"` + DestinationAccountID string `json:"destinationAccountId"` + Amount *big.Int `json:"amount"` + Asset models.Asset `json:"asset"` + Attempts int `json:"attempts"` + Status string `json:"status"` + Error string `json:"error"` + RelatedPayments []*transferInitiationsPaymentsMessagePayload `json:"relatedPayments"` +} + +func NewEventSavedTransferInitiations(tf *models.TransferInitiation) events.EventMessage { + payload := transferInitiationsMessagePayload{ + ID: tf.ID.String(), + CreatedAt: tf.CreatedAt, + ScheduleAt: tf.ScheduledAt, + UpdatedAt: tf.UpdatedAt, + Provider: tf.Provider.String(), + Description: tf.Description, + Type: tf.Type.String(), + SourceAccountID: tf.SourceAccountID.String(), + DestinationAccountID: tf.DestinationAccountID.String(), + Amount: tf.Amount, + Asset: tf.Asset, + Attempts: tf.Attempts, + Status: tf.Status.String(), + Error: tf.Error, + } + + payload.RelatedPayments = make([]*transferInitiationsPaymentsMessagePayload, len(tf.RelatedPayments)) + for i, p := range tf.RelatedPayments { + payload.RelatedPayments[i] = &transferInitiationsPaymentsMessagePayload{ + TransferInitiationID: p.TransferInitiationID.String(), + PaymentID: p.PaymentID.String(), + CreatedAt: p.CreatedAt, + Status: p.Status.String(), + Error: p.Error, + } + } + + return events.EventMessage{ + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeSavedTransferInitiation, + Payload: payload, + } +} + +type deleteTransferInitiationMessagePayload struct { + CreatedAt time.Time `json:"createdAt"` + ID string `json:"id"` +} + +func NewEventDeleteTransferInitiation(id models.TransferInitiationID) events.EventMessage { + return events.EventMessage{ + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeDeleteTransferInitiation, + Payload: deleteTransferInitiationMessagePayload{ + CreatedAt: time.Now().UTC(), + ID: id.String(), + }, + } +} diff --git a/components/payments/pkg/events/event.go b/components/payments/pkg/events/event.go index 55b26912b8..fd876f59c9 100644 --- a/components/payments/pkg/events/event.go +++ b/components/payments/pkg/events/event.go @@ -11,10 +11,13 @@ const ( EventVersion = "v1" EventApp = "payments" - EventTypeSavedPayments = "SAVED_PAYMENT" - EventTypeSavedAccounts = "SAVED_ACCOUNT" - EventTypeSavedBalances = "SAVED_BALANCE" - EventTypeConnectorReset = "CONNECTOR_RESET" + EventTypeSavedPayments = "SAVED_PAYMENT" + EventTypeSavedAccounts = "SAVED_ACCOUNT" + EventTypeSavedBalances = "SAVED_BALANCE" + EventTypeSavedBankAccount = "SAVED_BANK_ACCOUNT" + EventTypeSavedTransferInitiation = "SAVED_TRANSFER_INITIATION" + EventTypeDeleteTransferInitiation = "DELETED_TRANSFER_INITIATION" + EventTypeConnectorReset = "CONNECTOR_RESET" ) type EventMessage struct { diff --git a/components/search/benthos/streams/payments_deletion.yaml b/components/search/benthos/streams/payments_deletion.yaml new file mode 100644 index 0000000000..0dccc841ba --- /dev/null +++ b/components/search/benthos/streams/payments_deletion.yaml @@ -0,0 +1,99 @@ +input: + event_bus: + topic: payments + consumer_group: search-payments-resets + +pipeline: + processors: + - switch_event_type: + events: + - label: CONNECTOR_RESET + processors: + - bloblang: | + root = { + "query": { + "bool": { + "must": [ + { + "bool": { + "should": [ + { + "match": { + "kind": "PAYMENT" + }, + }, + { + "match": { + "kind": "PAYMENT_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_BALANCE" + }, + }, + { + "match": { + "kind": "PAYMENT_BANK_ACCOUNT" + }, + }, + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + }, + } + ] + } + }, + { + "match": { + "indexed.provider": this.payload.connector + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } + } + + - label: DELETED_TRANSFER_INITIATION + processors: + - bloblang: | + root = { + "query": { + "bool": { + "must": [ + { + "match": { + "kind": "PAYMENT_TRANSFER_INITIATION" + } + }, + { + "match": { + "indexed.id": this.payload.id + } + }, + { + "match": { + "stack": env("STACK") + } + } + ] + } + } + } + +output: + http_client: + url: ${OPENSEARCH_URL}/${OPENSEARCH_INDEX}/_delete_by_query + verb: POST + headers: + Content-Type: application/json + basic_auth: + enabled: ${BASIC_AUTH_ENABLED} + username: ${BASIC_AUTH_USERNAME} + password: ${BASIC_AUTH_PASSWORD} diff --git a/components/search/benthos/streams/payments_ingestion.yaml b/components/search/benthos/streams/payments_ingestion.yaml index 2c17f9e13b..363af609cb 100644 --- a/components/search/benthos/streams/payments_ingestion.yaml +++ b/components/search/benthos/streams/payments_ingestion.yaml @@ -27,6 +27,91 @@ pipeline: } meta action = "index" meta id = "PAYMENT-%s".format(this.payload.id) + - label: SAVED_ACCOUNT + processors: + - bloblang: | + root = { + "data": this.payload, + "indexed": { + "provider": this.payload.provider, + "reference": this.payload.reference, + "createdAt": this.payload.createdAt, + "id": this.payload.id, + "type": this.payload.type, + "defaultAsset": this.payload.defaultAsset, + "accountName": this.payload.accountName + }, + "kind": "PAYMENT_ACCOUNT", + "when": this.date + } + meta action = "index" + meta id = "PAYMENT-ACCOUNT-%s".format(this.payload.id) + - label: SAVED_BALANCE + processors: + - bloblang: | + root = { + "data": this.payload, + "indexed": { + "provider": this.payload.provider, + "createdAt": this.payload.createdAt, + "accountId": this.payload.accountID, + "asset": this.payload.asset, + "balance": this.payload.balance + }, + "kind": "PAYMENT_BALANCE", + "when": this.date + } + meta action = "index" + meta id = "PAYMENT-BALANCE-%s-%s".format(this.payload.accountId, this.payload.asset) + - label: SAVED_BANK_ACCOUNT + processors: + - bloblang: | + root = { + "data": this.payload, + "indexed": { + "id": this.payload.id, + "createdAt": this.payload.createdAt, + "provider": this.payload.provider, + "name": this.payload.name, + "accountNumber": this.payload.accountNumber, + "iban": this.payload.iban, + "swiftBicCode": this.payload.swiftBicCode, + "country": this.payload.country, + "accountId": this.payload.accountID + }, + "kind": "PAYMENT_BANK_ACCOUNT", + "when": this.date + } + meta action = "index" + meta id = "PAYMENT-BANK-ACCOUNT-%s".format(this.payload.id) + - label: SAVED_TRANSFER_INITIATION + processors: + - bloblang: | + root = { + "data": this.payload, + "indexed": { + "id": this.payload.id, + "createdAt": this.payload.createdAt, + "scheduledAt": this.payload.scheduledAt, + "updatedAt": this.payload.updatedAt, + "provider": this.payload.provider, + "description": this.payload.description, + "type": this.payload.type, + "sourceAccountId": this.payload.sourceAccountID, + "destinationAccountId": this.payload.destinationAccountID, + "amount": this.payload.amount, + "asset": this.payload.asset, + "attempts": this.payload.attempts, + "status": this.payload.status, + "error": this.payload.error, + "relatedPayment": this.payload.relatedPayment + }, + "kind": "PAYMENT_TRANSFER_INITIATION", + "when": this.date + } + meta action = "index" + meta id = "PAYMENT-TRANSFER-INITIATION-%s".format(this.payload.id) + output: resource: elasticsearch diff --git a/components/search/benthos/streams/payments_reset.yaml b/components/search/benthos/streams/payments_reset.yaml deleted file mode 100644 index 6a23998d8d..0000000000 --- a/components/search/benthos/streams/payments_reset.yaml +++ /dev/null @@ -1,46 +0,0 @@ -input: - event_bus: - topic: payments - consumer_group: search-payments-resets - -pipeline: - processors: - - switch_event_type: - events: - - label: CONNECTOR_RESET - processors: - - bloblang: | - root = { - "query": { - "bool": { - "must": [ - { - "match": { - "kind": "PAYMENT" - } - }, - { - "match": { - "indexed.provider": this.payload.connector - } - }, - { - "match": { - "stack": env("STACK") - } - } - ] - } - } - } - -output: - http_client: - url: ${OPENSEARCH_URL}/${OPENSEARCH_INDEX}/_delete_by_query - verb: POST - headers: - Content-Type: application/json - basic_auth: - enabled: ${BASIC_AUTH_ENABLED} - username: ${BASIC_AUTH_USERNAME} - password: ${BASIC_AUTH_PASSWORD} diff --git a/components/search/pkg/searchhttp/http.go b/components/search/pkg/searchhttp/http.go index aa3559ba91..f06db88c1a 100644 --- a/components/search/pkg/searchhttp/http.go +++ b/components/search/pkg/searchhttp/http.go @@ -128,6 +128,12 @@ func resolveQuery(r *http.Request) (*cursorTokenInfo, BaseQuery, error) { qq.WithSort("txid", esquery.OrderDesc) case "PAYMENT": qq.WithSort("reference", esquery.OrderDesc) + case "PAYMENT_TRANSFER_INITIATION": + qq.WithSort("id", esquery.OrderDesc) + case "PAYMENT_BALANCE": + qq.WithSort("createdAt", esquery.OrderDesc) + case "PAYMENT_BANK_ACCOUNT": + qq.WithSort("id", esquery.OrderDesc) } } }