Skip to content

Commit

Permalink
feat(payments): insert all data in search via benthos (#783)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Nov 2, 2023
1 parent ccb1f22 commit b29735c
Show file tree
Hide file tree
Showing 40 changed files with 1,116 additions and 514 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,50 +299,70 @@ data:
output:
resource: elasticsearch
payments_ingestion.yaml: |
payments_deletion.yaml: |
input:
event_bus:
topic: payments
consumer_group: search
consumer_group: search-payments-resets
pipeline:
processors:
- switch_event_type:
events:
- label: SAVED_PAYMENT
- label: CONNECTOR_RESET
processors:
- bloblang: |
root = {
"data": this.payload,
"indexed": {
"provider": this.payload.provider,
"reference": this.payload.reference,
"scheme": this.payload.scheme,
"type": this.payload.type,
"status": this.payload.status,
"id": this.payload.id,
"initialAmount": this.payload.initialAmount,
"createdAt": this.payload.createdAt
},
"kind": "PAYMENT",
"when": this.date
"query": {
"bool": {
"must": [
{
"bool": {
"should": [
{
"match": {
"kind": "PAYMENT"
},
},
{
"match": {
"kind": "PAYMENT_ACCOUNT"
},
},
{
"match": {
"kind": "PAYMENT_BALANCE"
},
},
{
"match": {
"kind": "PAYMENT_BANK_ACCOUNT"
},
},
{
"match": {
"kind": "PAYMENT_TRANSFER_INITIATION"
},
}
]
}
},
{
"match": {
"indexed.provider": this.payload.connector
}
},
{
"match": {
"stack": env("STACK")
}
}
]
}
}
}
meta action = "index"
meta id = "PAYMENT-%s".format(this.payload.id)
output:
resource: elasticsearch
payments_reset.yaml: |
input:
event_bus:
topic: payments
consumer_group: search-payments-resets
pipeline:
processors:
- switch_event_type:
events:
- label: CONNECTOR_RESET
- label: DELETED_TRANSFER_INITIATION
processors:
- bloblang: |
root = {
Expand All @@ -351,12 +371,12 @@ data:
"must": [
{
"match": {
"kind": "PAYMENT"
"kind": "PAYMENT_TRANSFER_INITIATION"
}
},
{
"match": {
"indexed.provider": this.payload.connector
"indexed.id": this.payload.id
}
},
{
Expand All @@ -379,6 +399,7 @@ data:
enabled: ${BASIC_AUTH_ENABLED}
username: ${BASIC_AUTH_USERNAME}
password: ${BASIC_AUTH_PASSWORD}
payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n"
kind: ConfigMap
metadata:
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,50 +299,70 @@ data:
output:
resource: elasticsearch
payments_ingestion.yaml: |
payments_deletion.yaml: |
input:
event_bus:
topic: payments
consumer_group: search
consumer_group: search-payments-resets
pipeline:
processors:
- switch_event_type:
events:
- label: SAVED_PAYMENT
- label: CONNECTOR_RESET
processors:
- bloblang: |
root = {
"data": this.payload,
"indexed": {
"provider": this.payload.provider,
"reference": this.payload.reference,
"scheme": this.payload.scheme,
"type": this.payload.type,
"status": this.payload.status,
"id": this.payload.id,
"initialAmount": this.payload.initialAmount,
"createdAt": this.payload.createdAt
},
"kind": "PAYMENT",
"when": this.date
"query": {
"bool": {
"must": [
{
"bool": {
"should": [
{
"match": {
"kind": "PAYMENT"
},
},
{
"match": {
"kind": "PAYMENT_ACCOUNT"
},
},
{
"match": {
"kind": "PAYMENT_BALANCE"
},
},
{
"match": {
"kind": "PAYMENT_BANK_ACCOUNT"
},
},
{
"match": {
"kind": "PAYMENT_TRANSFER_INITIATION"
},
}
]
}
},
{
"match": {
"indexed.provider": this.payload.connector
}
},
{
"match": {
"stack": env("STACK")
}
}
]
}
}
}
meta action = "index"
meta id = "PAYMENT-%s".format(this.payload.id)
output:
resource: elasticsearch
payments_reset.yaml: |
input:
event_bus:
topic: payments
consumer_group: search-payments-resets
pipeline:
processors:
- switch_event_type:
events:
- label: CONNECTOR_RESET
- label: DELETED_TRANSFER_INITIATION
processors:
- bloblang: |
root = {
Expand All @@ -351,12 +371,12 @@ data:
"must": [
{
"match": {
"kind": "PAYMENT"
"kind": "PAYMENT_TRANSFER_INITIATION"
}
},
{
"match": {
"indexed.provider": this.payload.connector
"indexed.id": this.payload.id
}
},
{
Expand All @@ -379,6 +399,7 @@ data:
enabled: ${BASIC_AUTH_ENABLED}
username: ${BASIC_AUTH_USERNAME}
password: ${BASIC_AUTH_PASSWORD}
payments_ingestion.yaml: "input:\n event_bus:\n topic: payments\n consumer_group: search\n\npipeline:\n processors:\n - switch_event_type:\n events:\n - label: SAVED_PAYMENT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"scheme\": this.payload.scheme,\n \"type\": this.payload.type,\n \"status\": this.payload.status,\n \"id\": this.payload.id,\n \"initialAmount\": this.payload.initialAmount,\n \"createdAt\": this.payload.createdAt\n },\n \"kind\": \"PAYMENT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-%s\".format(this.payload.id)\n - label: SAVED_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"reference\": this.payload.reference,\n \"createdAt\": this.payload.createdAt,\n \"id\": this.payload.id,\n \"type\": this.payload.type,\n \"defaultAsset\": this.payload.defaultAsset,\n \"accountName\": this.payload.accountName\n },\n \"kind\": \"PAYMENT_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_BALANCE\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"provider\": this.payload.provider,\n \"createdAt\": this.payload.createdAt,\n \"accountId\": this.payload.accountID,\n \"asset\": this.payload.asset,\n \"balance\": this.payload.balance\n },\n \"kind\": \"PAYMENT_BALANCE\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BALANCE-%s-%s\".format(this.payload.accountId, this.payload.asset)\n - label: SAVED_BANK_ACCOUNT\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"provider\": this.payload.provider,\n \"name\": this.payload.name,\n \"accountNumber\": this.payload.accountNumber,\n \"iban\": this.payload.iban,\n \"swiftBicCode\": this.payload.swiftBicCode,\n \"country\": this.payload.country,\n \"accountId\": this.payload.accountID\n },\n \"kind\": \"PAYMENT_BANK_ACCOUNT\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-BANK-ACCOUNT-%s\".format(this.payload.id)\n - label: SAVED_TRANSFER_INITIATION\n processors:\n - bloblang: |\n root = {\n \"data\": this.payload,\n \"indexed\": {\n \"id\": this.payload.id,\n \"createdAt\": this.payload.createdAt,\n \"scheduledAt\": this.payload.scheduledAt,\n \"updatedAt\": this.payload.updatedAt,\n \"provider\": this.payload.provider,\n \"description\": this.payload.description,\n \"type\": this.payload.type,\n \"sourceAccountId\": this.payload.sourceAccountID,\n \"destinationAccountId\": this.payload.destinationAccountID,\n \"amount\": this.payload.amount,\n \"asset\": this.payload.asset,\n \"attempts\": this.payload.attempts,\n \"status\": this.payload.status,\n \"error\": this.payload.error,\n \"relatedPayment\": this.payload.relatedPayment\n },\n \"kind\": \"PAYMENT_TRANSFER_INITIATION\",\n \"when\": this.date\n }\n meta action = \"index\"\n meta id = \"PAYMENT-TRANSFER-INITIATION-%s\".format(this.payload.id)\n \n\noutput:\n resource: elasticsearch\n"
kind: ConfigMap
metadata:
labels:
Expand Down
Loading

1 comment on commit b29735c

@vercel
Copy link

@vercel vercel bot commented on b29735c Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.