-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(Statement): Add alternative publish to queue logic without using…
… Redis (LLC-2162) (#890)
- Loading branch information
1 parent
22ea51f
commit 393bad4
Showing
17 changed files
with
583 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import RedisFactoryConfig from './utils/redisEvents/FactoryConfig'; | ||
import SQSFactoryConfig from './utils/sqsEvents/FactoryConfig'; | ||
|
||
export default interface FactoryConfig { | ||
readonly facade?: string; | ||
readonly redis?: RedisFactoryConfig; | ||
readonly sqs?: SQSFactoryConfig; | ||
} |
70 changes: 70 additions & 0 deletions
70
src/apps/statements/repo/eventsRepo/emitNewStatements/sqs.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import { | ||
GetQueueUrlCommand, | ||
SQSClient, | ||
SendMessageBatchCommand, | ||
SendMessageBatchRequestEntry, | ||
} from '@aws-sdk/client-sqs'; | ||
import { v4 } from 'uuid'; | ||
import { getPrefixWithProcessingPriority } from '../utils/getPrefixWithProcessingPriority'; | ||
import { StatementProcessingPriority } from '../../../enums/statementProcessingPriority.enum'; | ||
import FacadeConfig from '../utils/sqsEvents/FacadeConfig'; | ||
import { STATEMENT_QUEUE } from '../utils/constants'; | ||
import Signature from './Signature'; | ||
|
||
const MAX_BATCH_SIZE = 10; | ||
|
||
let queueUrl: string | undefined; | ||
|
||
const publishMessages = async (sqsClient: SQSClient, statementProperties: string[]) => { | ||
const statementPropertiesBatchRequest = statementProperties.map( | ||
(statementProperty): SendMessageBatchRequestEntry => ({ | ||
Id: v4(), | ||
MessageBody: statementProperty, | ||
}), | ||
); | ||
|
||
for (let index = 0; index < statementPropertiesBatchRequest.length; index += MAX_BATCH_SIZE) { | ||
await sqsClient.send( | ||
new SendMessageBatchCommand({ | ||
QueueUrl: queueUrl, | ||
Entries: statementPropertiesBatchRequest.slice(index, index + MAX_BATCH_SIZE), | ||
}), | ||
); | ||
} | ||
}; | ||
|
||
const getQueueUrl = async ( | ||
sqsClient: SQSClient, | ||
prefix: string, | ||
priority: StatementProcessingPriority, | ||
isQueuePriorityEnabled: boolean, | ||
) => { | ||
if (queueUrl) { | ||
return queueUrl; | ||
} | ||
|
||
const prefixWithPriority = getPrefixWithProcessingPriority( | ||
prefix, | ||
priority, | ||
isQueuePriorityEnabled, | ||
); | ||
|
||
const getQueueUrlCommand = new GetQueueUrlCommand({ | ||
QueueName: `${prefixWithPriority}_${STATEMENT_QUEUE}`, | ||
}); | ||
|
||
const commandResult = await sqsClient.send(getQueueUrlCommand); | ||
|
||
queueUrl = commandResult.QueueUrl; | ||
|
||
return queueUrl; | ||
}; | ||
|
||
export default (config: FacadeConfig): Signature => { | ||
return async ({ statementProperties, priority }) => { | ||
const sqsClient = await config.client(); | ||
|
||
await getQueueUrl(sqsClient, config.prefix, priority, config.isQueuePriorityEnabled); | ||
await publishMessages(sqsClient, statementProperties); | ||
}; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,14 @@ | ||
import Facade from './Facade'; | ||
import FactoryConfig from './FactoryConfig'; | ||
import redisFactory from './utils/redisEvents/factory'; | ||
import sqsFactory from './utils/sqsEvents/factory'; | ||
|
||
export default (config: FactoryConfig): Facade => { | ||
switch (config.facade) { | ||
default: | ||
case 'redis': | ||
return redisFactory(config.redis); | ||
case 'sqs': | ||
return sqsFactory(config.sqs); | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
export const EVENT_NAME = 'statement.new'; | ||
export const CHANNEL_NAME = 'statement.notify'; | ||
export const STATEMENT_QUEUE = 'STATEMENT_QUEUE'; |
7 changes: 7 additions & 0 deletions
7
src/apps/statements/repo/eventsRepo/utils/sqsEvents/FacadeConfig.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
import { SQSClient } from '@aws-sdk/client-sqs'; | ||
|
||
export default interface FacadeConfig { | ||
readonly client: () => Promise<SQSClient>; | ||
readonly prefix: string; | ||
readonly isQueuePriorityEnabled: boolean; | ||
} |
7 changes: 7 additions & 0 deletions
7
src/apps/statements/repo/eventsRepo/utils/sqsEvents/FactoryConfig.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
import { SQSClient } from '@aws-sdk/client-sqs'; | ||
|
||
export default interface FactoryConfig { | ||
readonly client?: () => Promise<SQSClient>; | ||
readonly prefix?: string; | ||
readonly isQueuePriorityEnabled?: boolean; | ||
} |
26 changes: 26 additions & 0 deletions
26
src/apps/statements/repo/eventsRepo/utils/sqsEvents/factory.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import { defaultTo } from 'lodash'; | ||
import emitNewStatements from '../../emitNewStatements/sqs'; | ||
import Facade from '../../Facade'; | ||
import connectToSQS from "../../../../../../utils/connectToSQS"; | ||
import FacadeConfig from './FacadeConfig'; | ||
import FactoryConfig from './FactoryConfig'; | ||
|
||
export default (factoryConfig: FactoryConfig = {}): Facade => { | ||
const facadeConfig: FacadeConfig = { | ||
client: defaultTo(factoryConfig.client, connectToSQS()), | ||
prefix: defaultTo(factoryConfig.prefix, 'xapistatements'), | ||
isQueuePriorityEnabled: defaultTo(factoryConfig.isQueuePriorityEnabled, false), | ||
}; | ||
return { | ||
emitNewStatements: emitNewStatements(facadeConfig), | ||
clearRepo: async () => { | ||
// Do nothing. | ||
}, | ||
migrate: async () => { | ||
// Do nothing. | ||
}, | ||
rollback: async () => { | ||
// Do nothing. | ||
}, | ||
}; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import { SQSClient } from '@aws-sdk/client-sqs'; | ||
import { once } from 'lodash'; | ||
import config from '../config'; | ||
import logger from '../logger'; | ||
|
||
export default once((): (() => Promise<SQSClient>) => { | ||
return once(async () => { | ||
logger.info('Creating SQS connection'); | ||
|
||
return new SQSClient({ | ||
...(config.aws.region ? { region: config.aws.region } : null), | ||
...(config.aws.accessKeyId && config.aws.secretAccessKey | ||
? { | ||
credentials: { | ||
accessKeyId: config.aws.accessKeyId as string, | ||
secretAccessKey: config.aws.secretAccessKey as string, | ||
}, | ||
} | ||
: null), | ||
}); | ||
}); | ||
}); |
Oops, something went wrong.