Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: allowing worker to be optionally a string reference to an exist…
Browse files Browse the repository at this point in the history
…ing function.
jiw-mh committed Jan 21, 2025
1 parent d3bc25e commit 3259f65
Showing 5 changed files with 115 additions and 12 deletions.
50 changes: 38 additions & 12 deletions src/constructs/aws/Queue.ts
Original file line number Diff line number Diff line change
@@ -32,11 +32,16 @@ const QUEUE_DEFINITION = {
properties: {
type: { const: "queue" },
worker: {
type: "object",
properties: {
timeout: { type: "number" },
},
additionalProperties: true,
oneOf: [
{ type: "string" },
{
type: "object",
properties: {
timeout: { type: "number" },
},
additionalProperties: true,
},
] as const,
},
maxRetries: { type: "number" },
alarm: { type: "string" },
@@ -127,6 +132,7 @@ export class Queue extends AwsConstruct {
private readonly queueArnOutput: CfnOutput;
private readonly queueUrlOutput: CfnOutput;
private readonly dlqUrlOutput: CfnOutput;
private readonly workerName: string;

constructor(
scope: CdkConstruct,
@@ -146,8 +152,24 @@ export class Queue extends AwsConstruct {
);
}

let functionConfig: number | undefined;
if (typeof configuration.worker === "string") {
this.workerName = configuration.worker;
const slsFunction = provider.getFunction(this.workerName);
if (!slsFunction) {
throw new ServerlessError(
`Invalid configuration in 'constructs.${this.id}': 'workerRef' needs to point to an existing function.`,
"LIFT_INVALID_CONSTRUCT_CONFIGURATION"
);
}
functionConfig = slsFunction.timeout;
} else {
this.workerName = `${this.id}Worker`;
functionConfig = configuration.worker.timeout;
}

// The default function timeout is 6 seconds in the Serverless Framework
const functionTimeout = configuration.worker.timeout ?? 6;
const functionTimeout = functionConfig ?? 6;

// This should be 6 times the lambda function's timeout + MaximumBatchingWindowInSeconds
// See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
@@ -326,10 +348,15 @@ export class Queue extends AwsConstruct {
const maximumBatchingWindow = this.getMaximumBatchingWindow();
const maximumConcurrency = this.configuration.maxConcurrency;

// Override events for the worker
this.configuration.worker.events = [
// Subscribe the worker to the SQS queue
{
if (typeof this.configuration.worker !== "string") {
// Add the worker, if it is not a reference.
this.provider.addFunction(this.workerName, this.configuration.worker);
}

// Subscribe the worker to the SQS queue
this.provider.addFunctionEvent({
functionName: this.workerName,
event: {
sqs: {
arn: this.queue.queueArn,
batchSize: batchSize,
@@ -338,8 +365,7 @@ export class Queue extends AwsConstruct {
functionResponseType: "ReportBatchItemFailures",
},
},
];
this.provider.addFunction(`${this.id}Worker`, this.configuration.worker);
});
}

private async getQueueUrl(): Promise<string | undefined> {
33 changes: 33 additions & 0 deletions src/providers/AwsProvider.ts
Original file line number Diff line number Diff line change
@@ -25,6 +25,14 @@ const AWS_DEFINITION = {
additionalProperties: false,
} as const;

type ValueType<T extends { [key: string | number | symbol]: unknown }> = T extends {
[key: string | number | symbol]: infer V;
}
? V
: never;

type ArrayType<T extends Array<unknown>> = T extends Array<infer V> ? V : never;

export class AwsProvider implements ProviderInterface {
public static type = "aws";
public static schema = AWS_DEFINITION;
@@ -112,6 +120,31 @@ export class AwsProvider implements ProviderInterface {
this.serverless.service.setFunctionNames(this.serverless.processedInput.options);
}

getFunction(functionName: string) {
if (!this.serverless.service.functions) {
return null;
}

return this.serverless.service.functions[functionName];
}

addFunctionEvent({
functionName,
event,
}: {
functionName: string;
event: ArrayType<Required<ValueType<Required<Serverless["service"]>["functions"]>>["events"]>;
}) {
const slsFunction = this.getFunction(functionName);
if (!slsFunction) {
throw new Error(`Serverless function ${functionName} doesn't exit, can not add an event.`);
}
if (!slsFunction.events) {
slsFunction.events = [];
}
slsFunction.events.push(event);
}

/**
* @internal
*/
16 changes: 16 additions & 0 deletions test/fixtures/queuesWorkerRef/serverless.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
service: app
configValidationMode: error

provider:
name: aws
# To avoid versions with random names (easier diffs)
versionFunctions: false

functions:
foo:
handler: worker.handler

constructs:
emails:
type: queue
workerRef: foo
Empty file.
28 changes: 28 additions & 0 deletions test/unit/queues.test.ts
Original file line number Diff line number Diff line change
@@ -750,4 +750,32 @@ describe("queues", () => {
);
}
});
it("should use a function if the function is defined", async () => {
const awsMock = mockAws();
sinon.stub(CloudFormationHelpers, "getStackOutput").resolves("queue-url");
const sendSpy = awsMock.mockService("SQS", "sendMessage").resolves();

await runServerless({
fixture: "queuesWorkerRef",
configExt: merge({}, pluginConfigExt, {
constructs: {
emails: {
fifo: true,
},
},
}),
command: "emails:send",
options: {
body: "Message body",
"group-id": "123",
},
});

expect(sendSpy.callCount).toBe(1);
expect(sendSpy.firstCall.firstArg).toStrictEqual({
QueueUrl: "queue-url",
MessageGroupId: "123",
MessageBody: "Message body",
});
});
});

0 comments on commit 3259f65

Please sign in to comment.