Skip to content

Commit

Permalink
Fix NPE in Global Embedded Kafka
Browse files Browse the repository at this point in the history
The `EmbeddedKafkaBroker.brokerProperties(brokerProperties)` does not accept a `null`.

* Fix `brokerProperties` map extraction for a `.orElse(Map.of())` instead of `null`
* Modify `GlobalEmbeddedKafkaTestExecutionListenerTests` to ensure that `brokerProperties`
are covered both ways - present and missed
  • Loading branch information
artembilan authored and garyrussell committed Jul 25, 2022
1 parent ed6673e commit f4b73af
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testPlanExecutionStarted(TestPlan testPlan) {
Integer partitions = configurationParameters.get(PARTITIONS_PROPERTY_NAME, Integer::parseInt).orElse(2);
Map<String, String> brokerProperties =
configurationParameters.get(BROKER_PROPERTIES_LOCATION_PROPERTY_NAME, this::brokerProperties)
.orElse(null);
.orElse(Map.of());
String brokerListProperty = configurationParameters.get(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY)
.orElse(null);
int[] ports =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
Expand All @@ -63,11 +64,39 @@ static void setup() {
@AfterAll
static void tearDown() {
System.clearProperty(GlobalEmbeddedKafkaTestExecutionListener.LISTENER_ENABLED_PROPERTY_NAME);
}

@AfterEach
void cleanUp() {
System.clearProperty(GlobalEmbeddedKafkaTestExecutionListener.BROKER_PROPERTIES_LOCATION_PROPERTY_NAME);
}

@Test
void testGlobalEmbeddedKafkaTestExecutionListener() throws IOException {
void testGlobalEmbeddedKafkaTestExecutionListener() {
var discoveryRequest =
LauncherDiscoveryRequestBuilder.request()
.selectors(DiscoverySelectors.selectClass(TestClass1.class),
DiscoverySelectors.selectClass(TestClass2.class))
.build();

var summaryGeneratingListener = new SummaryGeneratingListener();
LauncherFactory.create().execute(discoveryRequest, summaryGeneratingListener);

var summary = summaryGeneratingListener.getSummary();

try {
assertThat(summary.getTestsStartedCount()).isEqualTo(2);
assertThat(summary.getTestsSucceededCount()).isEqualTo(2);
assertThat(summary.getTestsFailedCount()).isEqualTo(0);
}
catch (Exception ex) {
summary.printFailuresTo(new PrintWriter(System.out));
throw ex;
}
}

@Test
void testGlobalEmbeddedKafkaWithBrokerProperties() throws IOException {
var brokerProperties = new Properties();
brokerProperties.setProperty("auto.create.topics.enable", "false");

Expand Down Expand Up @@ -134,7 +163,7 @@ void testCannotAutoCreateTopic() throws ExecutionException, InterruptedException
System.getProperty("spring.kafka.bootstrap-servers"));
producerConfigs.put(ProducerConfig.RETRIES_CONFIG, 1);
producerConfigs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1);
producerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10);
producerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);

StringSerializer serializer = new StringSerializer();
try (var kafkaProducer = new KafkaProducer<>(producerConfigs, serializer, serializer)) {
Expand Down

0 comments on commit f4b73af

Please sign in to comment.