From f4b73afd53f9fa9b178120f4e9b1bd0a3e200a43 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 25 Jul 2022 11:46:17 -0400 Subject: [PATCH] Fix NPE in Global Embedded Kafka The `EmbeddedKafkaBroker.brokerProperties(brokerProperties)` does not accept a `null`. * Fix `brokerProperties` map extraction for a `.orElse(Map.of())` instead of `null` * Modify `GlobalEmbeddedKafkaTestExecutionListenerTests` to ensure that `brokerProperties` are covered both ways - present and missed --- ...balEmbeddedKafkaTestExecutionListener.java | 2 +- ...beddedKafkaTestExecutionListenerTests.java | 33 +++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java index 0edec15a36..203850a11d 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java @@ -98,7 +98,7 @@ public void testPlanExecutionStarted(TestPlan testPlan) { Integer partitions = configurationParameters.get(PARTITIONS_PROPERTY_NAME, Integer::parseInt).orElse(2); Map brokerProperties = configurationParameters.get(BROKER_PROPERTIES_LOCATION_PROPERTY_NAME, this::brokerProperties) - .orElse(null); + .orElse(Map.of()); String brokerListProperty = configurationParameters.get(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY) .orElse(null); int[] ports = diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListenerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListenerTests.java index f245e141e9..4137dcbd1f 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListenerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListenerTests.java @@ -38,6 +38,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; @@ -63,11 +64,39 @@ static void setup() { @AfterAll static void tearDown() { System.clearProperty(GlobalEmbeddedKafkaTestExecutionListener.LISTENER_ENABLED_PROPERTY_NAME); + } + + @AfterEach + void cleanUp() { System.clearProperty(GlobalEmbeddedKafkaTestExecutionListener.BROKER_PROPERTIES_LOCATION_PROPERTY_NAME); } @Test - void testGlobalEmbeddedKafkaTestExecutionListener() throws IOException { + void testGlobalEmbeddedKafkaTestExecutionListener() { + var discoveryRequest = + LauncherDiscoveryRequestBuilder.request() + .selectors(DiscoverySelectors.selectClass(TestClass1.class), + DiscoverySelectors.selectClass(TestClass2.class)) + .build(); + + var summaryGeneratingListener = new SummaryGeneratingListener(); + LauncherFactory.create().execute(discoveryRequest, summaryGeneratingListener); + + var summary = summaryGeneratingListener.getSummary(); + + try { + assertThat(summary.getTestsStartedCount()).isEqualTo(2); + assertThat(summary.getTestsSucceededCount()).isEqualTo(2); + assertThat(summary.getTestsFailedCount()).isEqualTo(0); + } + catch (Exception ex) { + summary.printFailuresTo(new PrintWriter(System.out)); + throw ex; + } + } + + @Test + void testGlobalEmbeddedKafkaWithBrokerProperties() throws IOException { var brokerProperties = new Properties(); brokerProperties.setProperty("auto.create.topics.enable", "false"); @@ -134,7 +163,7 @@ void testCannotAutoCreateTopic() throws ExecutionException, InterruptedException System.getProperty("spring.kafka.bootstrap-servers")); producerConfigs.put(ProducerConfig.RETRIES_CONFIG, 1); producerConfigs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1); - producerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10); + producerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); StringSerializer serializer = new StringSerializer(); try (var kafkaProducer = new KafkaProducer<>(producerConfigs, serializer, serializer)) {