From 2114f643a558f9deca057ecd7df9e9f4e3f584a2 Mon Sep 17 00:00:00 2001 From: Niko Virtala Date: Fri, 13 Sep 2024 13:24:08 +0300 Subject: [PATCH] feat(lambda-event-sources): starting position timestamp for kafka --- ...efaultTestDeployAssertAF78BD0F.assets.json | 2 +- .../cdk.out | 2 +- .../integ.json | 2 +- ...vent-source-kafka-self-managed.assets.json | 6 +- ...nt-source-kafka-self-managed.template.json | 126 ++++++++++- .../manifest.json | 30 ++- .../tree.json | 202 +++++++++++++++++- .../test/integ.kafka-selfmanaged.ts | 16 ++ .../aws-lambda-event-sources/README.md | 11 +- .../aws-lambda-event-sources/lib/kafka.ts | 27 +++ .../test/kafka.test.ts | 93 ++++++++ 11 files changed, 501 insertions(+), 16 deletions(-) diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json index 18daf0934754d..ed62c09b1142d 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json @@ -1,5 +1,5 @@ { - "version": "38.0.1", + "version": "39.0.0", "files": { "21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22": { "source": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out index c6e612584e352..91e1a8b9901d5 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out @@ -1 +1 @@ -{"version":"38.0.1"} \ No newline at end of file +{"version":"39.0.0"} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json index b2142e0e738f8..9a339533e6b96 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json @@ -1,5 +1,5 @@ { - "version": "38.0.1", + "version": "39.0.0", "testCases": { "LambdaEventSourceKafkaSelfManagedTest/DefaultTest": { "stacks": [ diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json index 983f0b88ef58f..429c3680a7181 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json @@ -1,7 +1,7 @@ { - "version": "38.0.1", + "version": "39.0.0", "files": { - "cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1": { + "e43d733906aefa314ae3fa235f7e6ce12dc3267bbf65aedad4f66068eb527dd8": { "source": { "path": "lambda-event-source-kafka-self-managed.template.json", "packaging": "file" @@ -9,7 +9,7 @@ "destinations": { "current_account-current_region": { "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", - "objectKey": "cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1.json", + "objectKey": "e43d733906aefa314ae3fa235f7e6ce12dc3267bbf65aedad4f66068eb527dd8.json", "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" } } diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json index 5a5f9daa01865..9b24049233221 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json @@ -131,7 +131,7 @@ "S509448A1": { "Type": "AWS::SecretsManager::Secret", "Properties": { - "SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\"}" + "SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----'\\n\"}" }, "UpdateReplacePolicy": "Delete", "DeletionPolicy": "Delete" @@ -139,7 +139,7 @@ "SC0855C491": { "Type": "AWS::SecretsManager::Secret", "Properties": { - "SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}" + "SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----'\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}" }, "UpdateReplacePolicy": "Delete", "DeletionPolicy": "Delete" @@ -454,6 +454,128 @@ "my-test-topic3" ] } + }, + "F4ServiceRole100FF901": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "F4ServiceRoleDefaultPolicy1E98EC08": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": [ + { + "Ref": "S509448A1" + }, + { + "Ref": "SC0855C491" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "F4ServiceRoleDefaultPolicy1E98EC08", + "Roles": [ + { + "Ref": "F4ServiceRole100FF901" + } + ] + } + }, + "F4F1740A13": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "F4ServiceRole100FF901", + "Arn" + ] + }, + "Runtime": "nodejs18.x" + }, + "DependsOn": [ + "F4ServiceRoleDefaultPolicy1E98EC08", + "F4ServiceRole100FF901" + ] + }, + "F4KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic4F6589D62": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "BatchSize": 100, + "FunctionName": { + "Ref": "F4F1740A13" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "my-self-hosted-kafka-broker-1:9092", + "my-self-hosted-kafka-broker-2:9092", + "my-self-hosted-kafka-broker-3:9092" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "myTestConsumerGroup4" + }, + "SourceAccessConfigurations": [ + { + "Type": "CLIENT_CERTIFICATE_TLS_AUTH", + "URI": { + "Ref": "SC0855C491" + } + }, + { + "Type": "SERVER_ROOT_CA_CERTIFICATE", + "URI": { + "Ref": "S509448A1" + } + } + ], + "StartingPosition": "AT_TIMESTAMP", + "StartingPositionTimestamp": 1730270400, + "Topics": [ + "my-test-topic4" + ] + } } }, "Parameters": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json index d41cc1cc8586e..1643361e97634 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json @@ -1,5 +1,5 @@ { - "version": "38.0.1", + "version": "39.0.0", "artifacts": { "lambda-event-source-kafka-self-managed.assets": { "type": "cdk:asset-manifest", @@ -16,10 +16,9 @@ "templateFile": "lambda-event-source-kafka-self-managed.template.json", "terminationProtection": false, "validateOnSynth": false, - "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", - "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1.json", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/e43d733906aefa314ae3fa235f7e6ce12dc3267bbf65aedad4f66068eb527dd8.json", "requiresBootstrapStackVersion": 6, "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", "additionalDependencies": [ @@ -125,6 +124,30 @@ "data": "F3KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic3ED015F25" } ], + "/lambda-event-source-kafka-self-managed/F4/ServiceRole/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F4ServiceRole100FF901" + } + ], + "/lambda-event-source-kafka-self-managed/F4/ServiceRole/DefaultPolicy/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F4ServiceRoleDefaultPolicy1E98EC08" + } + ], + "/lambda-event-source-kafka-self-managed/F4/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F4F1740A13" + } + ], + "/lambda-event-source-kafka-self-managed/F4/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic4/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F4KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic4F6589D62" + } + ], "/lambda-event-source-kafka-self-managed/BootstrapVersion": [ { "type": "aws:cdk:logicalId", @@ -155,7 +178,6 @@ "templateFile": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.template.json", "terminationProtection": false, "validateOnSynth": false, - "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22.json", diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json index 168d1caad8aee..e52621e31c0e3 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json @@ -222,7 +222,7 @@ "attributes": { "aws:cdk:cloudformation:type": "AWS::SecretsManager::Secret", "aws:cdk:cloudformation:props": { - "secretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\"}" + "secretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----'\\n\"}" } }, "constructInfo": { @@ -246,7 +246,7 @@ "attributes": { "aws:cdk:cloudformation:type": "AWS::SecretsManager::Secret", "aws:cdk:cloudformation:props": { - "secretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}" + "secretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----'\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}" } }, "constructInfo": { @@ -739,6 +739,204 @@ "version": "0.0.0" } }, + "F4": { + "id": "F4", + "path": "lambda-event-source-kafka-self-managed/F4", + "children": { + "ServiceRole": { + "id": "ServiceRole", + "path": "lambda-event-source-kafka-self-managed/F4/ServiceRole", + "children": { + "ImportServiceRole": { + "id": "ImportServiceRole", + "path": "lambda-event-source-kafka-self-managed/F4/ServiceRole/ImportServiceRole", + "constructInfo": { + "fqn": "aws-cdk-lib.Resource", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F4/ServiceRole/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Role", + "aws:cdk:cloudformation:props": { + "assumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "managedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnRole", + "version": "0.0.0" + } + }, + "DefaultPolicy": { + "id": "DefaultPolicy", + "path": "lambda-event-source-kafka-self-managed/F4/ServiceRole/DefaultPolicy", + "children": { + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F4/ServiceRole/DefaultPolicy/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Policy", + "aws:cdk:cloudformation:props": { + "policyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": [ + { + "Ref": "S509448A1" + }, + { + "Ref": "SC0855C491" + } + ] + } + ], + "Version": "2012-10-17" + }, + "policyName": "F4ServiceRoleDefaultPolicy1E98EC08", + "roles": [ + { + "Ref": "F4ServiceRole100FF901" + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnPolicy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Policy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Role", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F4/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Lambda::Function", + "aws:cdk:cloudformation:props": { + "code": { + "zipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "handler": "index.handler", + "role": { + "Fn::GetAtt": [ + "F4ServiceRole100FF901", + "Arn" + ] + }, + "runtime": "nodejs18.x" + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.CfnFunction", + "version": "0.0.0" + } + }, + "KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic4": { + "id": "KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic4", + "path": "lambda-event-source-kafka-self-managed/F4/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic4", + "children": { + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F4/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic4/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Lambda::EventSourceMapping", + "aws:cdk:cloudformation:props": { + "batchSize": 100, + "functionName": { + "Ref": "F4F1740A13" + }, + "selfManagedEventSource": { + "endpoints": { + "kafkaBootstrapServers": [ + "my-self-hosted-kafka-broker-1:9092", + "my-self-hosted-kafka-broker-2:9092", + "my-self-hosted-kafka-broker-3:9092" + ] + } + }, + "selfManagedKafkaEventSourceConfig": { + "consumerGroupId": "myTestConsumerGroup4" + }, + "sourceAccessConfigurations": [ + { + "type": "CLIENT_CERTIFICATE_TLS_AUTH", + "uri": { + "Ref": "SC0855C491" + } + }, + { + "type": "SERVER_ROOT_CA_CERTIFICATE", + "uri": { + "Ref": "S509448A1" + } + } + ], + "startingPosition": "AT_TIMESTAMP", + "startingPositionTimestamp": 1730270400, + "topics": [ + "my-test-topic4" + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.CfnEventSourceMapping", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.EventSourceMapping", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.Function", + "version": "0.0.0" + } + }, "BootstrapVersion": { "id": "BootstrapVersion", "path": "lambda-event-source-kafka-self-managed/BootstrapVersion", diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts index a9a8b266a2029..09e9063b66b5c 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts @@ -110,6 +110,22 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== maximumPollers: 3, }, })); + + const fn4 = new TestFunction(this, 'F4'); + rootCASecret.grantRead(fn4); + clientCertificatesSecret.grantRead(fn4); + + fn4.addEventSource(new SelfManagedKafkaEventSource({ + bootstrapServers, + topic: 'my-test-topic4', + consumerGroupId: 'myTestConsumerGroup4', + secret: clientCertificatesSecret, + authenticationMethod: + AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH, + rootCACertificate: rootCASecret, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1730270400, + })); } } diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md index 1d4f12937a812..7f3fff8b2e8e9 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md @@ -232,7 +232,7 @@ behavior: * __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here * __parallelizationFactor__: The number of batches to concurrently process on each shard. * __retryAttempts__: The maximum number of times a record should be retried in the event of failure. -* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp. Note that 'AT_TIMESTAMP' is only supported for Amazon Kinesis streams. +* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp. * __startingPositionTimestamp__: The time stamp from which to start reading. Used in conjunction with __startingPosition__ when set to 'AT_TIMESTAMP'. * __tumblingWindow__: The duration in seconds of a processing window when using streams. * __enabled__: If the DynamoDB Streams event source mapping should be enabled. The default is true. @@ -252,7 +252,14 @@ myFunction.addEventSource(new KinesisEventSource(stream, { ## Kafka -You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster. +You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self-managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster. The following parameters will impact to the polling behavior: + +* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp. +* __startingPositionTimestamp__: The time stamp from which to start reading. Used in conjunction with __startingPosition__ when set to 'AT_TIMESTAMP'. +* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low). +* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of possibly delaying processing. +* __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here +* __enabled__: If the Kafka event source mapping should be enabled. The default is true. The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html). diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index c7a837b23ff16..1aad3a56a0d1a 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -55,6 +55,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps { * @default - discarded records are ignored */ readonly onFailure?: lambda.IEventSourceDlq; + + /** + * The time from which to start reading, in Unix time seconds. + * + * @default - no timestamp + */ + readonly startingPositionTimestamp?: number; } /** @@ -148,6 +155,15 @@ export class ManagedKafkaEventSource extends StreamEventSource { constructor(props: ManagedKafkaEventSourceProps) { super(props); + + if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP'); + } + + if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP'); + } + this.innerProps = props; } @@ -159,6 +175,7 @@ export class ManagedKafkaEventSource extends StreamEventSource { filters: this.innerProps.filters, filterEncryption: this.innerProps.filterEncryption, startingPosition: this.innerProps.startingPosition, + startingPositionTimestamp: this.innerProps.startingPositionTimestamp, sourceAccessConfigurations: this.sourceAccessConfigurations(), kafkaTopic: this.innerProps.topic, kafkaConsumerGroupId: this.innerProps.consumerGroupId, @@ -240,6 +257,15 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { } else if (!props.secret) { throw new Error('secret must be set if Kafka brokers accessed over Internet'); } + + if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP'); + } + + if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP'); + } + this.innerProps = props; } @@ -255,6 +281,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { kafkaTopic: this.innerProps.topic, kafkaConsumerGroupId: this.innerProps.consumerGroupId, startingPosition: this.innerProps.startingPosition, + startingPositionTimestamp: this.innerProps.startingPositionTimestamp, sourceAccessConfigurations: this.sourceAccessConfigurations(), onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index 5775a8ad1507f..3e3e5cab29663 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -407,6 +407,48 @@ describe('KafkaEventSource', () => { }, }))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); }); + test('AT_TIMESTAMP starting position', () => { + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + fn.addEventSource(new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + }), + ); + + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + }); + }); + + test('startingPositionTimestamp missing throws error', () => { + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + })).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/); + }); + + test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => { + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.LATEST, + startingPositionTimestamp: 1640995200, + })).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/); + }); }); describe('self-managed kafka', () => { @@ -1186,5 +1228,56 @@ describe('KafkaEventSource', () => { }, }))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); }); + + test('AT_TIMESTAMP starting position', () => { + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + fn.addEventSource(new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + }), + ); + + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + }); + }); + + test('startingPositionTimestamp missing throws error', () => { + const stack = new cdk.Stack(); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + expect(() => new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + })).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/); + }); + + test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => { + const stack = new cdk.Stack(); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + expect(() => new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.LATEST, + startingPositionTimestamp: 1640995200, + })).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/); + }); }); });