diff --git a/CHANGELOG.md b/CHANGELOG.md index 49086e4..dff2fda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Change Log +## [2.6.0](https://github.com/bakdata/kafka-large-message-serde/tree/2.6.0) (2024-01-05) +[View commits](https://github.com/bakdata/kafka-large-message-serde/compare/2.5.1...2.6.0) + +**Closed issues:** + +- Invalid value null for configuration key serializer exception [\#44](https://github.com/bakdata/kafka-large-message-serde/issues/44) + +**Merged pull requests:** + +- Update dependencies and switch to Java 11 [\#46](https://github.com/bakdata/kafka-large-message-serde/pull/46) ([@philipp94831](https://github.com/philipp94831)) + ## [2.5.1](https://github.com/bakdata/kafka-large-message-serde/tree/2.5.1) (2023-01-05) [View commits](https://github.com/bakdata/kafka-large-message-serde/compare/2.5.0...2.5.1) diff --git a/build.gradle.kts b/build.gradle.kts index 3c69ddb..32e50ca 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -46,11 +46,12 @@ configure { subprojects { apply(plugin = "java-library") + apply(plugin = "java-test-fixtures") apply(plugin = "io.freefair.lombok") configure { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } } diff --git a/gradle.properties b/gradle.properties index 8fc3c92..ba17a6b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,11 +1,11 @@ -version=2.5.2-SNAPSHOT +version=2.6.1-SNAPSHOT org.gradle.caching=true org.gradle.parallel=true org.gradle.jvmargs=-Xmx2048m -kafkaVersion=3.3.1 -confluentVersion=7.3.1 -junitVersion=5.9.1 -log4jVersion=2.19.0 -assertJVersion=3.23.1 -s3MockVersion=2.4.16 +kafkaVersion=3.5.2 +confluentVersion=7.5.1 +junitVersion=5.10.1 +log4jVersion=2.22.1 +assertJVersion=3.25.1 joolVersion=0.9.14 +testContainersVersion=1.19.3 diff --git a/large-message-connect/build.gradle.kts b/large-message-connect/build.gradle.kts index 8943ff4..fab4603 100644 --- a/large-message-connect/build.gradle.kts +++ b/large-message-connect/build.gradle.kts @@ -40,17 +40,13 @@ dependencies { val assertJVersion: String by project testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion) - val s3MockVersion: String by project - testImplementation(group = "com.adobe.testing", name = "s3mock-junit5", version = s3MockVersion) { - exclude(group = "ch.qos.logback") - exclude(group = "org.apache.logging.log4j", module = "log4j-to-slf4j") - } val log4jVersion: String by project - testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = log4jVersion) + testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) val joolVersion: String by project testImplementation(group = "org.jooq", name = "jool-java-8", version = joolVersion) - testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.3.0") { + testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.5.0") { exclude(group = "org.slf4j", module = "slf4j-log4j12") } testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion) + testImplementation(testFixtures(project(":large-message-core"))) } diff --git a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java index 5eb02df..f97cd33 100644 --- a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java +++ b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,7 +27,6 @@ import static net.mguenther.kafka.junit.Wait.delay; import static org.assertj.core.api.Assertions.assertThat; -import com.adobe.testing.s3mock.junit5.S3MockExtension; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -50,13 +49,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; -class LargeMessageConverterIntegrationTest { - @RegisterExtension - static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent() - .withSecureConnection(false).build(); +class LargeMessageConverterIntegrationTest extends AmazonS3IntegrationTest { private static final String BUCKET_NAME = "testbucket"; private static final String S3_KEY_NAME = "contentkey"; private static final String TOPIC = "input"; @@ -65,30 +61,11 @@ class LargeMessageConverterIntegrationTest { private EmbeddedKafkaCluster kafkaCluster; private Path outputFile; - static void configureS3HTTPService() { - System.setProperty(SdkSystemSetting.SYNC_HTTP_SERVICE_IMPL.property(), - "software.amazon.awssdk.http.apache.ApacheSdkHttpService"); - } - - private static Properties createS3BackedProperties() { - final Properties properties = new Properties(); - properties.put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, "http://localhost:" + S3_MOCK.getHttpPort()); - properties.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1"); - properties.put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo"); - properties.put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar"); - properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName()); - properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName()); - properties.put( - AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/%s", BUCKET_NAME, S3_KEY_NAME)); - properties.put(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); - return properties; - } - @BeforeEach void setUp() throws IOException { this.outputFile = Files.createTempFile("test", "temp"); - S3_MOCK.createS3Client().createBucket(BUCKET_NAME); - configureS3HTTPService(); + final S3Client s3 = this.getS3Client(); + s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build()); this.kafkaCluster = this.createCluster(); this.kafkaCluster.start(); } @@ -126,6 +103,17 @@ private EmbeddedKafkaCluster createCluster() { .build()); } + private Properties createS3BackedProperties() { + final Properties properties = new Properties(); + properties.putAll(this.getLargeMessageConfig()); + properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName()); + properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName()); + properties.put( + AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/%s", BUCKET_NAME, S3_KEY_NAME)); + properties.setProperty(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + return properties; + } + private Properties config() { final Properties properties = new Properties(); properties.put(ConnectorConfig.NAME_CONFIG, "test"); @@ -134,7 +122,7 @@ private Properties config() { properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString()); properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, LargeMessageConverter.class.getName()); - createS3BackedProperties().forEach( + this.createS3BackedProperties().forEach( (key, value) -> properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key, value)); return properties; } @@ -146,7 +134,7 @@ private Properties createProducerProperties(final boolean shouldBack) { properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList()); properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, Integer.toString(shouldBack ? 0 : Integer.MAX_VALUE)); - properties.putAll(createS3BackedProperties()); + properties.putAll(this.createS3BackedProperties()); return properties; } } diff --git a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterTest.java b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterTest.java index fc13b5f..e1328fd 100644 --- a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterTest.java +++ b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,22 +27,15 @@ import static com.bakdata.kafka.ByteFlagLargeMessagePayloadProtocol.stripFlag; import static com.bakdata.kafka.HeaderLargeMessagePayloadProtocol.getHeaderName; -import static com.bakdata.kafka.LargeMessageConverterIntegrationTest.configureS3HTTPService; import static com.bakdata.kafka.LargeMessagePayload.ofBytes; import static com.bakdata.kafka.LargeMessagePayload.ofUri; import static com.bakdata.kafka.LargeMessageRetrievingClient.deserializeUri; import static org.assertj.core.api.Assertions.assertThat; -import com.adobe.testing.s3mock.junit5.S3MockExtension; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.google.common.collect.ImmutableMap; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Map; -import org.apache.commons.io.IOUtils; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Deserializer; @@ -52,30 +45,23 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.StringConverter; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; -class LargeMessageConverterTest { - @RegisterExtension - static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent() - .withSecureConnection(false).build(); +class LargeMessageConverterTest extends AmazonS3IntegrationTest { private static final String TOPIC = "topic"; private static final Converter STRING_CONVERTER = new StringConverter(); private static final Serializer STRING_SERIALIZER = new StringSerializer(); private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); private static final LargeMessagePayloadProtocol HEADER_PROTOCOL = new HeaderLargeMessagePayloadProtocol(); private static final LargeMessagePayloadProtocol BYTE_FLAG_PROTOCOL = new ByteFlagLargeMessagePayloadProtocol(); - private final AmazonS3 s3Client = S3_MOCK.createS3Client(); private LargeMessageConverter converter = null; - @BeforeAll - static void setUp() { - configureS3HTTPService(); - } - private static byte[] serialize(final String uri) { return BYTE_FLAG_PROTOCOL.serialize(ofUri(uri), new RecordHeaders(), false); } @@ -103,29 +89,6 @@ private static byte[] createBackedText(final String bucket, final String key, fi return serialize(uri, headers, isKey); } - private static byte[] readBytes(final BlobStorageURI uri) { - try (final S3Object object = S3_MOCK.createS3Client().getObject(uri.getBucket(), uri.getKey()); - final S3ObjectInputStream objectContent = object.getObjectContent()) { - return IOUtils.toByteArray(objectContent); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - private static Map createProperties(final int maxSize, final String basePath, - final boolean useHeaders) { - return ImmutableMap.builder() - .put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, "http://localhost:" + S3_MOCK.getHttpPort()) - .put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1") - .put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo") - .put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar") - .put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, Integer.toString(maxSize)) - .put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath) - .put(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName()) - .put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, Boolean.toString(useHeaders)) - .build(); - } - private static SchemaAndValue toConnectData(final String text) { return STRING_CONVERTER.toConnectData(null, text.getBytes()); } @@ -143,27 +106,6 @@ private static BlobStorageURI deserializeUriWithFlag(final byte[] data) { return deserializeUri(uriBytes); } - private static void expectBackedText(final String basePath, final String expected, final byte[] s3BackedText, - final String type) { - final BlobStorageURI uri = deserializeUriWithFlag(s3BackedText); - expectBackedText(uri, basePath, type, expected); - } - - private static void expectBackedText(final String basePath, final String expected, final byte[] s3BackedText, - final String type, final Headers headers, final boolean isKey) { - final BlobStorageURI uri = deserializeUri(s3BackedText); - expectBackedText(uri, basePath, type, expected); - assertHasHeader(headers, isKey); - } - - private static void expectBackedText(final BlobStorageURI uri, final String basePath, final String type, - final String expected) { - assertThat(uri).asString().startsWith(basePath + TOPIC + "/" + type + "/"); - final byte[] bytes = readBytes(uri); - final String deserialized = STRING_DESERIALIZER.deserialize(null, bytes); - assertThat(deserialized).isEqualTo(expected); - } - private static void expectNonBackedText(final String expected, final byte[] s3BackedText) { assertThat(STRING_DESERIALIZER.deserialize(null, stripFlag(s3BackedText))) .isInstanceOf(String.class) @@ -242,7 +184,7 @@ void shouldConvertBackedToConnectData(final boolean isKey) { final String bucket = "bucket"; final String key = "key"; final String text = "test"; - this.s3Client.createBucket("bucket"); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final SchemaAndValue expected = toConnectData(text); this.store(bucket, key, text, TOPIC); final SchemaAndValue schemaAndValue = @@ -257,7 +199,7 @@ void shouldConvertBackedToConnectDataWithoutHeaders(final boolean isKey) { final String bucket = "bucket"; final String key = "key"; final String text = "test"; - this.s3Client.createBucket("bucket"); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final SchemaAndValue expected = toConnectData(text); this.store(bucket, key, text, TOPIC); final SchemaAndValue schemaAndValue = @@ -272,7 +214,7 @@ void shouldConvertBackedToConnectDataWithHeaders(final boolean isKey) { final String bucket = "bucket"; final String key = "key"; final String text = "test"; - this.s3Client.createBucket("bucket"); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final SchemaAndValue expected = toConnectData(text); this.store(bucket, key, text, TOPIC); final Headers headers = new RecordHeaders(); @@ -308,10 +250,10 @@ void shouldCreateBackedDataKey() { final String text = "test"; final SchemaAndValue data = toConnectData(text); - this.s3Client.createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final byte[] bytes = this.converter.fromConnectData(TOPIC, new RecordHeaders(), data.schema(), data.value()); - expectBackedText(basePath, text, bytes, "keys"); + this.expectBackedText(basePath, text, bytes, "keys"); } @Test @@ -322,10 +264,10 @@ void shouldCreateBackedDataKeyWithoutHeaders() { final String text = "test"; final SchemaAndValue data = toConnectData(text); - this.s3Client.createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final byte[] bytes = this.converter.fromConnectData(TOPIC, data.schema(), data.value()); - expectBackedText(basePath, text, bytes, "keys"); + this.expectBackedText(basePath, text, bytes, "keys"); } @Test @@ -336,11 +278,11 @@ void shouldCreateBackedDataKeyWithHeaders() { final String text = "test"; final SchemaAndValue data = toConnectData(text); - this.s3Client.createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final Headers headers = new RecordHeaders(); final byte[] bytes = this.converter.fromConnectData(TOPIC, headers, data.schema(), data.value()); - expectBackedText(basePath, text, bytes, "keys", headers, true); + this.expectBackedText(basePath, text, bytes, "keys", headers, true); } @Test @@ -351,10 +293,10 @@ void shouldCreateBackedDataValue() { final String text = "test"; final SchemaAndValue data = toConnectData(text); - this.s3Client.createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final byte[] bytes = this.converter.fromConnectData(TOPIC, new RecordHeaders(), data.schema(), data.value()); - expectBackedText(basePath, text, bytes, "values"); + this.expectBackedText(basePath, text, bytes, "values"); } @Test @@ -365,10 +307,10 @@ void shouldCreateBackedDataValueWithoutHeaders() { final String text = "test"; final SchemaAndValue data = toConnectData(text); - this.s3Client.createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final byte[] bytes = this.converter.fromConnectData(TOPIC, data.schema(), data.value()); - expectBackedText(basePath, text, bytes, "values"); + this.expectBackedText(basePath, text, bytes, "values"); } @Test @@ -379,11 +321,11 @@ void shouldCreateBackedDataValueWithHeaders() { final String text = "test"; final SchemaAndValue data = toConnectData(text); - this.s3Client.createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final Headers headers = new RecordHeaders(); final byte[] bytes = this.converter.fromConnectData(TOPIC, headers, data.schema(), data.value()); - expectBackedText(basePath, text, bytes, "values", headers, false); + this.expectBackedText(basePath, text, bytes, "values", headers, false); } @ParameterizedTest @@ -460,13 +402,60 @@ void shouldCreateNonBackedNullDataWithoutHeaders(final boolean isKey) { assertThat(bytes).isNull(); } + private byte[] readBytes(final BlobStorageURI uri) { + final GetObjectRequest request = GetObjectRequest.builder() + .bucket(uri.getBucket()) + .key(uri.getKey()) + .build(); + try (final InputStream objectContent = this.getS3Client().getObject(request)) { + return objectContent.readAllBytes(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private Map createProperties(final int maxSize, final String basePath, + final boolean useHeaders) { + return ImmutableMap.builder() + .putAll(this.getLargeMessageConfig()) + .put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, Integer.toString(maxSize)) + .put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath) + .put(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName()) + .put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, Boolean.toString(useHeaders)) + .build(); + } + + private void expectBackedText(final String basePath, final String expected, final byte[] s3BackedText, + final String type) { + final BlobStorageURI uri = deserializeUriWithFlag(s3BackedText); + this.expectBackedText(uri, basePath, type, expected); + } + + private void expectBackedText(final String basePath, final String expected, final byte[] s3BackedText, + final String type, final Headers headers, final boolean isKey) { + final BlobStorageURI uri = deserializeUri(s3BackedText); + this.expectBackedText(uri, basePath, type, expected); + assertHasHeader(headers, isKey); + } + + private void expectBackedText(final BlobStorageURI uri, final String basePath, final String type, + final String expected) { + assertThat(uri).asString().startsWith(basePath + TOPIC + "/" + type + "/"); + final byte[] bytes = this.readBytes(uri); + final String deserialized = STRING_DESERIALIZER.deserialize(null, bytes); + assertThat(deserialized).isEqualTo(expected); + } + private void store(final String bucket, final String key, final String s, final String topic) { - this.s3Client.putObject(bucket, key, new ByteArrayInputStream(STRING_SERIALIZER.serialize(topic, s)), - new ObjectMetadata()); + final PutObjectRequest request = PutObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + this.getS3Client().putObject(request, RequestBody.fromBytes(STRING_SERIALIZER.serialize(topic, s))); } private void initSetup(final boolean isKey, final int maxSize, final String basePath, final boolean useHeaders) { - final Map properties = createProperties(maxSize, basePath, useHeaders); + final Map properties = this.createProperties(maxSize, basePath, useHeaders); this.converter = new LargeMessageConverter(); this.converter.configure(properties, isKey); } diff --git a/large-message-core/build.gradle.kts b/large-message-core/build.gradle.kts index 1dcf805..a7edaf3 100644 --- a/large-message-core/build.gradle.kts +++ b/large-message-core/build.gradle.kts @@ -33,13 +33,13 @@ dependencies { val confluentVersion: String by project api(group = "io.confluent", name = "common-config", version = confluentVersion) - implementation(group = "org.slf4j", name = "slf4j-api", version = "1.7.36") - val awsVersion = "2.17.295" + implementation(group = "org.slf4j", name = "slf4j-api", version = "2.0.10") + val awsVersion = "2.22.10" api(group = "software.amazon.awssdk", name = "s3", version = awsVersion) api(group = "software.amazon.awssdk", name = "sts", version = awsVersion) - api(group = "com.azure", name = "azure-storage-blob", version = "12.20.1") - api(group = "com.google.cloud", name = "google-cloud-storage", version = "2.16.0") - implementation(group = "com.google.guava", name = "guava", version = "31.1-jre") + api(group = "com.azure", name = "azure-storage-blob", version = "12.25.1") + api(group = "com.google.cloud", name = "google-cloud-storage", version = "2.30.1") + implementation(group = "com.google.guava", name = "guava", version = "33.0.0-jre") val junitVersion: String by project testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion) @@ -47,16 +47,14 @@ dependencies { testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) val assertJVersion: String by project testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion) - val mockitoVersion = "4.11.0" + val mockitoVersion = "5.8.0" testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion) testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) val log4jVersion: String by project - testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = log4jVersion) - val testContainersVersion = "1.17.6" - testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) - testImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion) - // for localstack - testRuntimeOnly(group = "com.amazonaws", name = "aws-java-sdk-core", version = "1.12.376") - testImplementation(group = "com.google.cloud", name = "google-cloud-nio", version = "0.126.0") + testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) + val testContainersVersion: String by project + testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) + testFixturesImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion) + testImplementation(group = "com.google.cloud", name = "google-cloud-nio", version = "0.127.8") } diff --git a/large-message-core/src/main/java/com/bakdata/kafka/AmazonS3Client.java b/large-message-core/src/main/java/com/bakdata/kafka/AmazonS3Client.java index 810b5d7..8af81d4 100644 --- a/large-message-core/src/main/java/com/bakdata/kafka/AmazonS3Client.java +++ b/large-message-core/src/main/java/com/bakdata/kafka/AmazonS3Client.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,6 +25,7 @@ package com.bakdata.kafka; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -32,14 +33,12 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.SerializationException; -import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest; import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; @@ -50,7 +49,6 @@ import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; -import software.amazon.awssdk.utils.IoUtils; /** * Implementation of {@link BlobStorageClient} for Amazon S3. @@ -115,9 +113,12 @@ public String putObject(final byte[] bytes, final String bucket, final String ke @Override public byte[] getObject(final String bucket, final String key) { final String s3URI = asURI(bucket, key); - try (final ResponseInputStream s3Object = this.s3.getObject( - GetObjectRequest.builder().bucket(bucket).key(key).build())) { - return IoUtils.toByteArray(s3Object); + final GetObjectRequest request = GetObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + try (final InputStream s3Object = this.s3.getObject(request)) { + return s3Object.readAllBytes(); } catch (final SdkException | IOException e) { throw new SerializationException("Cannot handle S3 backed message: " + s3URI, e); } diff --git a/large-message-core/src/main/java/com/bakdata/kafka/CompressionType.java b/large-message-core/src/main/java/com/bakdata/kafka/CompressionType.java index 68c910f..1113cea 100644 --- a/large-message-core/src/main/java/com/bakdata/kafka/CompressionType.java +++ b/large-message-core/src/main/java/com/bakdata/kafka/CompressionType.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -34,7 +34,6 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import software.amazon.awssdk.utils.IoUtils; /** * This enum specifies the various allowed compression types and their implementation. @@ -163,7 +162,7 @@ private static byte[] decompress(final org.apache.kafka.common.record.Compressio final byte[] bytes) { try (final InputStream stream = compressionType.wrapForInput(ByteBuffer.wrap(bytes), RecordBatch.MAGIC_VALUE_V2, BUFFER_SUPPLIER)) { - return IoUtils.toByteArray(stream); + return stream.readAllBytes(); } catch (final IOException e) { throw new SerializationException("Failed to compress with type " + compressionType, e); } diff --git a/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3LargeMessageClientRoundtripTest.java b/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3LargeMessageClientRoundtripTest.java index a90f86a..9122f0f 100644 --- a/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3LargeMessageClientRoundtripTest.java +++ b/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3LargeMessageClientRoundtripTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -38,7 +38,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @@ -88,13 +87,9 @@ void shouldRoundtrip(final boolean isKey, final String compressionType) { } private Map createStorerProperties(final Map properties) { - final AwsBasicCredentials credentials = this.getCredentials(); return ImmutableMap.builder() .putAll(properties) - .put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, this.getEndpointOverride().toString()) - .put(AbstractLargeMessageConfig.S3_REGION_CONFIG, this.getRegion().id()) - .put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId()) - .put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()) + .putAll(this.getLargeMessageConfig()) .build(); } @@ -104,18 +99,8 @@ private LargeMessageStoringClient createStorer(final Map basePro return config.getStorer(); } - private Map createRetrieverProperties() { - final AwsBasicCredentials credentials = this.getCredentials(); - return ImmutableMap.builder() - .put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, this.getEndpointOverride().toString()) - .put(AbstractLargeMessageConfig.S3_REGION_CONFIG, this.getRegion().id()) - .put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId()) - .put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()) - .build(); - } - private LargeMessageRetrievingClient createRetriever() { - final Map properties = this.createRetrieverProperties(); + final Map properties = this.getLargeMessageConfig(); final ConfigDef configDef = AbstractLargeMessageConfig.baseConfigDef(); final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(configDef, properties); return config.getRetriever(); diff --git a/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageRetrievingClientS3IntegrationTest.java b/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageRetrievingClientS3IntegrationTest.java index 3c7727c..b882822 100644 --- a/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageRetrievingClientS3IntegrationTest.java +++ b/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageRetrievingClientS3IntegrationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,14 +27,12 @@ import static com.bakdata.kafka.LargeMessageRetrievingClientTest.serializeUri; import static org.assertj.core.api.Assertions.assertThat; -import com.google.common.collect.ImmutableMap; import io.confluent.common.config.ConfigDef; import java.util.Map; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @@ -62,22 +60,12 @@ void shouldReadBackedText() { } private LargeMessageRetrievingClient createRetriever() { - final Map properties = this.createProperties(); + final Map properties = this.getLargeMessageConfig(); final ConfigDef configDef = AbstractLargeMessageConfig.baseConfigDef(); final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(configDef, properties); return config.getRetriever(); } - private Map createProperties() { - final AwsBasicCredentials credentials = this.getCredentials(); - return ImmutableMap.builder() - .put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, this.getEndpointOverride().toString()) - .put(AbstractLargeMessageConfig.S3_REGION_CONFIG, this.getRegion().id()) - .put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId()) - .put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()) - .build(); - } - private void store(final String bucket, final String key, final String s) { this.getS3Client().putObject(PutObjectRequest.builder() .bucket(bucket) diff --git a/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageStoringClientS3IntegrationTest.java b/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageStoringClientS3IntegrationTest.java index 68549e7..2c08e49 100644 --- a/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageStoringClientS3IntegrationTest.java +++ b/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageStoringClientS3IntegrationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.io.InputStream; import java.util.Map; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Deserializer; @@ -44,13 +45,9 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.utils.IoUtils; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -125,13 +122,9 @@ void shouldUseConfiguredIdGenerator() { } private Map createProperties(final Map properties) { - final AwsBasicCredentials credentials = this.getCredentials(); return ImmutableMap.builder() .putAll(properties) - .put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, this.getEndpointOverride().toString()) - .put(AbstractLargeMessageConfig.S3_REGION_CONFIG, this.getRegion().id()) - .put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId()) - .put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()) + .putAll(this.getLargeMessageConfig()) .build(); } @@ -150,12 +143,12 @@ private void expectBackedText(final String basePath, final String expected, fina } private byte[] readBytes(final BlobStorageURI uri) { - try (final ResponseInputStream objectContent = this.getS3Client().getObject( - GetObjectRequest.builder() - .bucket(uri.getBucket()) - .key(uri.getKey()) - .build())) { - return IoUtils.toByteArray(objectContent); + final GetObjectRequest request = GetObjectRequest.builder() + .bucket(uri.getBucket()) + .key(uri.getKey()) + .build(); + try (final InputStream objectContent = this.getS3Client().getObject(request)) { + return objectContent.readAllBytes(); } catch (final IOException e) { throw new RuntimeException(e); } diff --git a/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3IntegrationTest.java b/large-message-core/src/testFixtures/java/com/bakdata/kafka/AmazonS3IntegrationTest.java similarity index 76% rename from large-message-core/src/test/java/com/bakdata/kafka/AmazonS3IntegrationTest.java rename to large-message-core/src/testFixtures/java/com/bakdata/kafka/AmazonS3IntegrationTest.java index 40fdef2..46d9e25 100644 --- a/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3IntegrationTest.java +++ b/large-message-core/src/testFixtures/java/com/bakdata/kafka/AmazonS3IntegrationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,7 +24,9 @@ package com.bakdata.kafka; +import com.google.common.collect.ImmutableMap; import java.net.URI; +import java.util.Map; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.containers.localstack.LocalStackContainer.Service; import org.testcontainers.junit.jupiter.Container; @@ -52,17 +54,27 @@ S3Client getS3Client() { .build(); } - Region getRegion() { + Map getLargeMessageConfig() { + final AwsBasicCredentials credentials = this.getCredentials(); + return ImmutableMap.builder() + .put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, this.getEndpointOverride().toString()) + .put(AbstractLargeMessageConfig.S3_REGION_CONFIG, this.getRegion().id()) + .put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId()) + .put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()) + .build(); + } + + private Region getRegion() { return Region.of(this.localStackContainer.getRegion()); } - AwsBasicCredentials getCredentials() { + private AwsBasicCredentials getCredentials() { return AwsBasicCredentials.create( this.localStackContainer.getAccessKey(), this.localStackContainer.getSecretKey() ); } - URI getEndpointOverride() { + private URI getEndpointOverride() { return this.localStackContainer.getEndpointOverride(Service.S3); } } diff --git a/large-message-serde/build.gradle.kts b/large-message-serde/build.gradle.kts index cc18546..4ce6017 100644 --- a/large-message-serde/build.gradle.kts +++ b/large-message-serde/build.gradle.kts @@ -39,15 +39,11 @@ dependencies { testImplementation( group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", - version = "2.8.1" + version = "2.11.1" ) - val s3MockVersion: String by project - testImplementation(group = "com.adobe.testing", name = "s3mock-junit5", version = s3MockVersion) { - exclude(group = "ch.qos.logback") - exclude(group = "org.apache.logging.log4j", module = "log4j-to-slf4j") - } val log4jVersion: String by project - testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = log4jVersion) + testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) val joolVersion: String by project - testImplementation(group = "org.jooq", name = "jool-java-8", version = joolVersion) + testImplementation(group = "org.jooq", name = "jool", version = joolVersion) + testImplementation(testFixtures(project(":large-message-core"))) } diff --git a/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageDeserializerTest.java b/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageDeserializerTest.java index e7185bc..6791660 100644 --- a/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageDeserializerTest.java +++ b/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageDeserializerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,14 +27,10 @@ import static com.bakdata.kafka.HeaderLargeMessagePayloadProtocol.getHeaderName; import static com.bakdata.kafka.LargeMessagePayload.ofBytes; import static com.bakdata.kafka.LargeMessagePayload.ofUri; -import static com.bakdata.kafka.LargeMessageSerializerTest.configureS3HTTPService; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import com.adobe.testing.s3mock.junit5.S3MockExtension; -import com.amazonaws.services.s3.model.ObjectMetadata; import com.bakdata.fluent_kafka_streams_tests.TestTopology; -import java.io.ByteArrayInputStream; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -56,28 +52,21 @@ import org.apache.kafka.streams.kstream.Produced; import org.jooq.lambda.Seq; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; -class LargeMessageDeserializerTest { +class LargeMessageDeserializerTest extends AmazonS3IntegrationTest { - @RegisterExtension - static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent() - .withSecureConnection(false).build(); private static final String INPUT_TOPIC = "input"; private static final String OUTPUT_TOPIC = "output"; private static final LargeMessagePayloadProtocol HEADER_PROTOCOL = new HeaderLargeMessagePayloadProtocol(); private static final LargeMessagePayloadProtocol BYTE_FLAG_PROTOCOL = new ByteFlagLargeMessagePayloadProtocol(); private TestTopology topology = null; - @BeforeAll - static void setUp() { - configureS3HTTPService(); - } - private static byte[] serializeUri(final String uri) { return BYTE_FLAG_PROTOCOL.serialize(ofUri(uri), new RecordHeaders(), false); } @@ -94,29 +83,6 @@ private static byte[] serialize(final byte[] bytes, final Headers headers, final return HEADER_PROTOCOL.serialize(ofBytes(bytes), headers, isKey); } - private static Properties createProperties() { - final Map endpointConfig = getEndpointConfig(); - final Properties properties = new Properties(); - properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker"); - properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test"); - properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); - properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); - properties.putAll(endpointConfig); - properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - return properties; - } - - private static Map getEndpointConfig() { - final Map largeMessageConfig = new HashMap<>(); - largeMessageConfig.put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, - "http://localhost:" + S3_MOCK.getHttpPort()); - largeMessageConfig.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1"); - largeMessageConfig.put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo"); - largeMessageConfig.put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar"); - return largeMessageConfig; - } - private static Topology createKeyTopology(final Properties properties) { final StreamsBuilder builder = new StreamsBuilder(); final Serde serde = new LargeMessageSerde<>(); @@ -146,10 +112,6 @@ private static Topology createKeyAndValueTopology(final Properties properties) { return builder.build(); } - private static void store(final String bucket, final String key, final String s) { - S3_MOCK.createS3Client().putObject(bucket, key, new ByteArrayInputStream(s.getBytes()), new ObjectMetadata()); - } - private static byte[] createNonBackedText(final String text) { return serialize(serialize(text)); } @@ -173,22 +135,6 @@ private static byte[] createBackedText(final String bucket, final String key, fi return serializeUri(uri, headers, isKey); } - private static void assertCorrectSerializationExceptionBehavior(final boolean isKey, - final MessageFactory messageFactory) { - try (final Deserializer deserializer = new LargeMessageDeserializer<>()) { - final Headers headers = new RecordHeaders(); - final Map config = new HashMap<>(getEndpointConfig()); - config.put(isKey ? LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG - : LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, IntegerSerde.class); - deserializer.configure(config, isKey); - final byte[] message = messageFactory.apply("foo", headers, isKey); - assertThatThrownBy(() -> deserializer.deserialize(null, headers, message)) - .isInstanceOf(SerializationException.class) - .hasMessage("Size of data received by IntegerDeserializer is not 4"); - assertThat(headers.headers(getHeaderName(isKey))).hasSize(1); - } - } - @AfterEach void tearDown() { if (this.topology != null) { @@ -307,9 +253,9 @@ void shouldReadNullKey() { @Test void shouldReadBackedTextValue() { final String bucket = "bucket"; - S3_MOCK.createS3Client().createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final String key = "key"; - store(bucket, key, "foo"); + this.store(bucket, key, "foo"); this.createTopology(LargeMessageDeserializerTest::createValueTopology); this.topology.input() .withKeySerde(Serdes.Integer()) @@ -328,9 +274,9 @@ void shouldReadBackedTextValue() { @Test void shouldReadBackedTextValueWithHeaders() { final String bucket = "bucket"; - S3_MOCK.createS3Client().createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final String key = "key"; - store(bucket, key, "foo"); + this.store(bucket, key, "foo"); this.createTopology(LargeMessageDeserializerTest::createValueTopology); final Headers headers = new RecordHeaders(); final byte[] value = createBackedText(bucket, key, headers, false); @@ -355,9 +301,9 @@ void shouldReadBackedTextValueWithHeaders() { @Test void shouldReadBackedTextKey() { final String bucket = "bucket"; - S3_MOCK.createS3Client().createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final String key = "key"; - store(bucket, key, "foo"); + this.store(bucket, key, "foo"); this.createTopology(LargeMessageDeserializerTest::createKeyTopology); this.topology.input() .withKeySerde(Serdes.ByteArray()) @@ -376,9 +322,9 @@ void shouldReadBackedTextKey() { @Test void shouldReadBackedTextKeyWithHeaders() { final String bucket = "bucket"; - S3_MOCK.createS3Client().createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final String key = "key"; - store(bucket, key, "foo"); + this.store(bucket, key, "foo"); this.createTopology(LargeMessageDeserializerTest::createKeyTopology); final Headers headers = new RecordHeaders(); this.topology.input() @@ -401,10 +347,10 @@ void shouldReadBackedTextKeyWithHeaders() { @ValueSource(booleans = {true, false}) void shouldRetainBackedHeadersOnSerializationException(final boolean isKey) { final String bucket = "bucket"; - S3_MOCK.createS3Client().createBucket(bucket); - assertCorrectSerializationExceptionBehavior(isKey, (content, headers, _isKey) -> { + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); + this.assertCorrectSerializationExceptionBehavior(isKey, (content, headers, _isKey) -> { final String key = "key"; - store(bucket, key, content); + this.store(bucket, key, content); return createBackedText(bucket, key, headers, _isKey); }); } @@ -412,15 +358,15 @@ void shouldRetainBackedHeadersOnSerializationException(final boolean isKey) { @ParameterizedTest @ValueSource(booleans = {true, false}) void shouldRetainNonBackedHeadersOnSerializationException(final boolean isKey) { - assertCorrectSerializationExceptionBehavior(isKey, LargeMessageDeserializerTest::createNonBackedText); + this.assertCorrectSerializationExceptionBehavior(isKey, LargeMessageDeserializerTest::createNonBackedText); } @Test void shouldReadNonBackedTextKeyAndBackedValueWithHeaders() { final String bucket = "bucket"; - S3_MOCK.createS3Client().createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final String key = "key"; - store(bucket, key, "bar"); + this.store(bucket, key, "bar"); this.createTopology(LargeMessageDeserializerTest::createKeyAndValueTopology); final Headers headers = new RecordHeaders(); this.topology.input() @@ -443,9 +389,9 @@ void shouldReadNonBackedTextKeyAndBackedValueWithHeaders() { @Test void shouldReadBackedTextKeyAndNonBackedValueWithHeaders() { final String bucket = "bucket"; - S3_MOCK.createS3Client().createBucket(bucket); + this.getS3Client().createBucket(CreateBucketRequest.builder().bucket(bucket).build()); final String key = "key"; - store(bucket, key, "foo"); + this.store(bucket, key, "foo"); this.createTopology(LargeMessageDeserializerTest::createKeyAndValueTopology); final Headers headers = new RecordHeaders(); this.topology.input() @@ -465,8 +411,45 @@ void shouldReadBackedTextKeyAndNonBackedValueWithHeaders() { }); } + private Properties createProperties() { + final Map endpointConfig = this.getLargeMessageConfig(); + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker"); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test"); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); + properties.putAll(endpointConfig); + properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + return properties; + } + + private void store(final String bucket, final String key, final String s) { + final PutObjectRequest request = PutObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + this.getS3Client().putObject(request, RequestBody.fromBytes(s.getBytes())); + } + + private void assertCorrectSerializationExceptionBehavior(final boolean isKey, + final MessageFactory messageFactory) { + try (final Deserializer deserializer = new LargeMessageDeserializer<>()) { + final Headers headers = new RecordHeaders(); + final Map config = new HashMap<>(this.getLargeMessageConfig()); + config.put(isKey ? LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG + : LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, IntegerSerde.class); + deserializer.configure(config, isKey); + final byte[] message = messageFactory.apply("foo", headers, isKey); + assertThatThrownBy(() -> deserializer.deserialize(null, headers, message)) + .isInstanceOf(SerializationException.class) + .hasMessage("Size of data received by IntegerDeserializer is not 4"); + assertThat(headers.headers(getHeaderName(isKey))).hasSize(1); + } + } + private void createTopology(final Function topologyFactory) { - this.topology = new TestTopology<>(topologyFactory, createProperties()); + this.topology = new TestTopology<>(topologyFactory, this.createProperties()); this.topology.start(); } diff --git a/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageSerdeTest.java b/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageSerdeTest.java index 1901b9b..6b0694b 100644 --- a/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageSerdeTest.java +++ b/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageSerdeTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -26,11 +26,9 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.adobe.testing.s3mock.junit5.S3MockExtension; import com.bakdata.fluent_kafka_streams_tests.TestInput; import com.bakdata.fluent_kafka_streams_tests.TestOutput; -import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; -import java.util.HashMap; +import com.bakdata.fluent_kafka_streams_tests.TestTopology; import java.util.List; import java.util.Map; import java.util.Properties; @@ -43,44 +41,16 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import org.jooq.lambda.Seq; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -class LargeMessageSerdeTest { +class LargeMessageSerdeTest extends AmazonS3IntegrationTest { - @RegisterExtension - static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent() - .withSecureConnection(false).build(); private static final String INPUT_TOPIC_1 = "input1"; private static final String INPUT_TOPIC_2 = "input2"; private static final String OUTPUT_TOPIC = "output"; - @RegisterExtension - TestTopologyExtension topology = - new TestTopologyExtension<>(LargeMessageSerdeTest::createTopology, createProperties()); - - private static Properties createProperties() { - final Map endpointConfig = getEndpointConfig(); - final Properties properties = new Properties(); - properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker"); - properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test"); - properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, LargeMessageSerde.class); - properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, LargeMessageSerde.class); - properties.putAll(endpointConfig); - properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - properties.put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, true); - return properties; - } - - private static Map getEndpointConfig() { - final Map largeMessageConfig = new HashMap<>(); - largeMessageConfig.put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, - "http://localhost:" + S3_MOCK.getHttpPort()); - largeMessageConfig.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1"); - largeMessageConfig.put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo"); - largeMessageConfig.put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar"); - return largeMessageConfig; - } + private TestTopology topology; private static Topology createTopology() { final StreamsBuilder builder = new StreamsBuilder(); @@ -98,6 +68,19 @@ private static Topology createTopology() { return builder.build(); } + @BeforeEach + void setup() { + this.topology = new TestTopology<>(LargeMessageSerdeTest::createTopology, this.createLargeMessageProperties()); + this.topology.start(); + } + + @AfterEach + void tearDown() { + if (this.topology != null) { + this.topology.stop(); + } + } + @Test void shouldJoin() { // this test creates a topology with a changelog store. The changelog store uses the Serde without headers @@ -114,6 +97,20 @@ void shouldJoin() { }); } + private Properties createLargeMessageProperties() { + final Map endpointConfig = this.getLargeMessageConfig(); + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker"); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test"); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, LargeMessageSerde.class); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, LargeMessageSerde.class); + properties.putAll(endpointConfig); + properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, true); + return properties; + } + private TestOutput getOutput() { return this.topology.streamOutput() .withKeySerde(Serdes.String()) diff --git a/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageSerializerTest.java b/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageSerializerTest.java index b04217c..be8634c 100644 --- a/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageSerializerTest.java +++ b/large-message-serde/src/test/java/com/bakdata/kafka/LargeMessageSerializerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,16 +29,9 @@ import static com.bakdata.kafka.LargeMessageRetrievingClient.deserializeUri; import static org.assertj.core.api.Assertions.assertThat; -import com.adobe.testing.s3mock.junit5.S3MockExtension; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.amazonaws.util.IOUtils; import com.bakdata.fluent_kafka_streams_tests.TestTopology; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.Properties; @@ -56,51 +49,23 @@ import org.apache.kafka.streams.kstream.Produced; import org.jooq.lambda.Seq; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; -class LargeMessageSerializerTest { +class LargeMessageSerializerTest extends AmazonS3IntegrationTest { - @RegisterExtension - static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent() - .withSecureConnection(false).build(); private static final String INPUT_TOPIC = "input"; private static final String OUTPUT_TOPIC = "output"; private static final Deserializer STRING_DESERIALIZER = Serdes.String().deserializer(); private TestTopology topology = null; - @BeforeAll - static void setUp() { - configureS3HTTPService(); - } - - static String configureS3HTTPService() { - return System.setProperty(SdkSystemSetting.SYNC_HTTP_SERVICE_IMPL.property(), - "software.amazon.awssdk.http.apache.ApacheSdkHttpService"); - } - private static BlobStorageURI deserializeUriWithFlag(final byte[] data) { final byte[] uriBytes = stripFlag(data); return deserializeUri(uriBytes); } - private static Properties createProperties(final Properties properties) { - properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker"); - properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test"); - properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); - properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); - properties.setProperty(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, - "http://localhost:" + S3_MOCK.getHttpPort()); - properties.setProperty(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1"); - properties.setProperty(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo"); - properties.setProperty(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar"); - properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - return properties; - } - private static Topology createValueTopology(final Properties properties) { final StreamsBuilder builder = new StreamsBuilder(); final Map configs = new StreamsConfig(properties).originals(); @@ -123,36 +88,6 @@ private static Topology createKeyTopology(final Properties properties) { return builder.build(); } - private static void expectBackedText(final String basePath, final String expected, final byte[] s3BackedText, - final String type) { - final BlobStorageURI uri = deserializeUriWithFlag(s3BackedText); - expectBackedText(uri, basePath, type, expected); - } - - private static void expectBackedText(final String basePath, final String expected, final byte[] s3BackedText, - final String type, final Headers headers, final boolean isKey) { - final BlobStorageURI uri = deserializeUri(s3BackedText); - expectBackedText(uri, basePath, type, expected); - assertThat(headers.headers(getHeaderName(isKey))).hasSize(1); - } - - private static void expectBackedText(final BlobStorageURI uri, final String basePath, final String type, - final String expected) { - assertThat(uri).asString().startsWith(basePath + OUTPUT_TOPIC + "/" + type + "/"); - final byte[] bytes = readBytes(uri); - final String deserialized = STRING_DESERIALIZER.deserialize(null, bytes); - assertThat(deserialized).isEqualTo(expected); - } - - private static byte[] readBytes(final BlobStorageURI uri) { - try (final S3Object object = S3_MOCK.createS3Client().getObject(uri.getBucket(), uri.getKey()); - final S3ObjectInputStream objectContent = object.getObjectContent()) { - return IOUtils.toByteArray(objectContent); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - private static void expectNonBackedText(final String expected, final byte[] s3BackedText) { assertThat(STRING_DESERIALIZER.deserialize(null, stripFlag(s3BackedText))) .isInstanceOf(String.class) @@ -167,30 +102,6 @@ private static void expectNonBackedText(final String expected, final byte[] s3Ba assertThat(headers.headers(getHeaderName(isKey))).hasSize(1); } - private static void deleteBucket(final String bucket, final AmazonS3 s3Client) { - ObjectListing objectListing = s3Client.listObjects(bucket); - while (true) { - final String[] keys = objectListing.getObjectSummaries().stream() - .map(S3ObjectSummary::getKey) - .toArray(String[]::new); - if (keys.length > 0) { - s3Client.deleteObjects(new DeleteObjectsRequest(bucket) - .withKeys(keys)); - } - - // If the bucket contains many objects, the listObjects() call - // might not return all of the objects in the first listing. Check to - // see whether the listing was truncated. If so, retrieve the next page of objects - // and delete them. - if (objectListing.isTruncated()) { - objectListing = s3Client.listNextBatchOfObjects(objectListing); - } else { - break; - } - } - s3Client.deleteBucket(bucket); - } - @AfterEach void tearDown() { if (this.topology != null) { @@ -320,8 +231,8 @@ void shouldWriteBackedTextKey() { properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, 0); properties.setProperty(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath); this.createTopology(LargeMessageSerializerTest::createKeyTopology, properties); - final AmazonS3 s3Client = S3_MOCK.createS3Client(); - s3Client.createBucket(bucket); + final S3Client s3Client = this.getS3Client(); + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); this.topology.input() .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Integer()) @@ -334,7 +245,6 @@ void shouldWriteBackedTextKey() { .hasSize(1) .extracting(ProducerRecord::key) .anySatisfy(s3BackedText -> expectBackedText(basePath, "foo", s3BackedText, "keys")); - deleteBucket(bucket, s3Client); } @Test @@ -346,8 +256,8 @@ void shouldWriteBackedTextKeyWithHeaders() { properties.setProperty(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath); properties.put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, true); this.createTopology(LargeMessageSerializerTest::createKeyTopology, properties); - final AmazonS3 s3Client = S3_MOCK.createS3Client(); - s3Client.createBucket(bucket); + final S3Client s3Client = this.getS3Client(); + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); this.topology.input() .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Integer()) @@ -358,8 +268,8 @@ void shouldWriteBackedTextKeyWithHeaders() { .toList(); assertThat(records) .hasSize(1) - .anySatisfy(record -> expectBackedText(basePath, "foo", record.key(), "keys", record.headers(), true)); - deleteBucket(bucket, s3Client); + .anySatisfy( + record -> this.expectBackedText(basePath, "foo", record.key(), "keys", record.headers(), true)); } @Test @@ -389,8 +299,8 @@ void shouldWriteBackedTextValue() { properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, 0); properties.setProperty(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath); this.createTopology(LargeMessageSerializerTest::createValueTopology, properties); - final AmazonS3 s3Client = S3_MOCK.createS3Client(); - s3Client.createBucket(bucket); + final S3Client s3Client = this.getS3Client(); + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); this.topology.input() .withKeySerde(Serdes.Integer()) .withValueSerde(Serdes.String()) @@ -403,7 +313,6 @@ void shouldWriteBackedTextValue() { .hasSize(1) .extracting(ProducerRecord::value) .anySatisfy(s3BackedText -> expectBackedText(basePath, "foo", s3BackedText, "values")); - deleteBucket(bucket, s3Client); } @Test @@ -415,8 +324,8 @@ void shouldWriteBackedTextValueWithHeaders() { properties.setProperty(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath); properties.put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, true); this.createTopology(LargeMessageSerializerTest::createValueTopology, properties); - final AmazonS3 s3Client = S3_MOCK.createS3Client(); - s3Client.createBucket(bucket); + final S3Client s3Client = this.getS3Client(); + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); this.topology.input() .withKeySerde(Serdes.Integer()) .withValueSerde(Serdes.String()) @@ -428,8 +337,8 @@ void shouldWriteBackedTextValueWithHeaders() { assertThat(records) .hasSize(1) .anySatisfy( - record -> expectBackedText(basePath, "foo", record.value(), "values", record.headers(), false)); - deleteBucket(bucket, s3Client); + record -> this.expectBackedText(basePath, "foo", record.value(), "values", record.headers(), + false)); } @Test @@ -451,9 +360,53 @@ void shouldWriteBackedNullValue() { .anySatisfy(s3BackedText -> assertThat(s3BackedText).isNull()); } + private Properties createProperties(final Properties properties) { + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker"); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test"); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); + properties.putAll(this.getLargeMessageConfig()); + properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + return properties; + } + + private void expectBackedText(final String basePath, final String expected, final byte[] s3BackedText, + final String type) { + final BlobStorageURI uri = deserializeUriWithFlag(s3BackedText); + this.expectBackedText(uri, basePath, type, expected); + } + + private void expectBackedText(final String basePath, final String expected, final byte[] s3BackedText, + final String type, final Headers headers, final boolean isKey) { + final BlobStorageURI uri = deserializeUri(s3BackedText); + this.expectBackedText(uri, basePath, type, expected); + assertThat(headers.headers(getHeaderName(isKey))).hasSize(1); + } + + private void expectBackedText(final BlobStorageURI uri, final String basePath, final String type, + final String expected) { + assertThat(uri).asString().startsWith(basePath + OUTPUT_TOPIC + "/" + type + "/"); + final byte[] bytes = this.readBytes(uri); + final String deserialized = STRING_DESERIALIZER.deserialize(null, bytes); + assertThat(deserialized).isEqualTo(expected); + } + + private byte[] readBytes(final BlobStorageURI uri) { + final GetObjectRequest request = GetObjectRequest.builder() + .bucket(uri.getBucket()) + .key(uri.getKey()) + .build(); + try (final InputStream objectContent = this.getS3Client().getObject(request)) { + return objectContent.readAllBytes(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + private void createTopology(final Function topologyFactory, final Properties properties) { - this.topology = new TestTopology<>(topologyFactory, createProperties(properties)); + this.topology = new TestTopology<>(topologyFactory, this.createProperties(properties)); this.topology.start(); }