Skip to content

Commit

Permalink
feat(lambda-event-sources): starting position as Date for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
nikovirtala committed Jan 10, 2025
1 parent 7d96065 commit b8aec1b
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 12 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@
"S509448A1": {
"Type": "AWS::SecretsManager::Secret",
"Properties": {
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----'\\n\"}"
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\"}"
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"SC0855C491": {
"Type": "AWS::SecretsManager::Secret",
"Properties": {
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----'\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}"
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}"
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
startingPositionTimestamp: 1730270400,
}));

const fn5 = new TestFunction(this, 'F5');
rootCASecret.grantRead(fn5);
clientCertificatesSecret.grantRead(fn5);

fn5.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers,
topic: 'my-test-topic5',
consumerGroupId: 'myTestConsumerGroup5',
secret: clientCertificatesSecret,
authenticationMethod:
AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH,
rootCACertificate: rootCASecret,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
startingPositionTimestamp: new Date('2024-10-30T06:40:00.000Z'),
}));
}
}

Expand Down
12 changes: 8 additions & 4 deletions packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
readonly onFailure?: lambda.IEventSourceDlq;

/**
* The time from which to start reading, in Unix time seconds.
* The time from which to start reading. Unix time seconds or `Date`
*
* @default - no timestamp
*/
readonly startingPositionTimestamp?: number;
readonly startingPositionTimestamp?: number | Date;
}

/**
Expand Down Expand Up @@ -175,7 +175,9 @@ export class ManagedKafkaEventSource extends StreamEventSource {
filters: this.innerProps.filters,
filterEncryption: this.innerProps.filterEncryption,
startingPosition: this.innerProps.startingPosition,
startingPositionTimestamp: this.innerProps.startingPositionTimestamp,
startingPositionTimestamp: this.innerProps.startingPositionTimestamp instanceof Date
? Math.floor(this.innerProps.startingPositionTimestamp.getTime() / 1000)
: this.innerProps.startingPositionTimestamp,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
Expand Down Expand Up @@ -281,7 +283,9 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
startingPosition: this.innerProps.startingPosition,
startingPositionTimestamp: this.innerProps.startingPositionTimestamp,
startingPositionTimestamp: this.innerProps.startingPositionTimestamp instanceof Date
? Math.floor(this.innerProps.startingPositionTimestamp.getTime() / 1000)
: this.innerProps.startingPositionTimestamp,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
onFailure: this.innerProps.onFailure,
supportS3OnFailureDestination: true,
Expand Down
24 changes: 23 additions & 1 deletion packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ describe('KafkaEventSource', () => {
}))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/);
});

test('AT_TIMESTAMP starting position', () => {
test('AT_TIMESTAMP starting position (number)', () => {
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const bootstrapServers = ['kafka-broker:9092'];
Expand All @@ -1251,6 +1251,28 @@ describe('KafkaEventSource', () => {
});
});

test('AT_TIMESTAMP starting position (Date)', () => {
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const bootstrapServers = ['kafka-broker:9092'];
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const kafkaTopic = 'some-topic';

fn.addEventSource(new sources.SelfManagedKafkaEventSource({
bootstrapServers,
secret: secret,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
startingPositionTimestamp: new Date('2022-01-01T00:00:00.000Z'),
}),
);

Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
StartingPosition: 'AT_TIMESTAMP',
StartingPositionTimestamp: 1640995200,
});
});

test('startingPositionTimestamp missing throws error', () => {
const stack = new cdk.Stack();
const bootstrapServers = ['kafka-broker:9092'];
Expand Down

0 comments on commit b8aec1b

Please sign in to comment.