diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java index 30effa38e0..b5ac22ee99 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,16 @@ package org.springframework.kafka.support; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; +import org.assertj.core.util.Streams; import org.springframework.messaging.MessageHeaders; @@ -36,7 +40,6 @@ * * @author Gary Russell * @since 2.1.3 - * */ public class SimpleKafkaHeaderMapper extends AbstractKafkaHeaderMapper { @@ -69,6 +72,7 @@ public SimpleKafkaHeaderMapper() { * generally should not map the {@code "id" and "timestamp"} headers. Note: * most of the headers in {@link KafkaHeaders} are never mapped as headers since they * represent data in consumer/producer records. + * * @param patterns the patterns. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ @@ -82,6 +86,7 @@ private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) { /** * Create an instance for inbound mapping only with pattern matching. + * * @param patterns the patterns to match. * @return the header mapper. * @since 2.8.8 @@ -94,27 +99,40 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte public void fromHeaders(MessageHeaders headers, Headers target) { headers.forEach((key, value) -> { if (!NEVER.contains(key)) { - Object valueToAdd = headerValueToAddOut(key, value); - if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { - target.add(new RecordHeader(key, (byte[]) valueToAdd)); + if (value instanceof Collection values) { + values.forEach(v -> mapIfMatched(target, key, v)); + } else { + mapIfMatched(target, key, value); } } }); } + private void mapIfMatched(Headers target, String key, Object value) { + Object valueToAdd = headerValueToAddOut(key, value); + if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { + target.add(new RecordHeader(key, (byte[]) valueToAdd)); + } + } + @Override public void toHeaders(Headers source, Map target) { - source.forEach(header -> { - String headerName = header.key(); - if (matchesForInbound(headerName)) { - if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) { - target.put(headerName, ByteBuffer.wrap(header.value()).getInt()); - } - else { - target.put(headerName, headerValueToAddIn(header)); - } - } - }); + Streams.stream(source) + .collect(Collectors.groupingBy(Header::key)) + .forEach((headerName, headers) -> { + if (matchesForInbound(headerName)) { + if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) { + target.put(headerName, ByteBuffer.wrap(headers.get(headers.size() - 1).value()).getInt()); + } else { + var values = headers.stream().map(super::headerValueToAddIn).toList(); + if (values.size() == 1) { + target.put(headerName, values.get(0)); + } else { + target.put(headerName, values); + } + } + } + }); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java index 77caa3d81a..d37043adcf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.common.header.Header; @@ -65,6 +66,26 @@ public void testSpecificStringConvert() { entry("neverConverted", "baz".getBytes())); } + @Test + void testIterableHeaderConvert() { + SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); + Map rawMappedHeaders = new HashMap<>(); + rawMappedHeaders.put("stringHeader", true); + mapper.setRawMappedHeaders(rawMappedHeaders); + Map headersMap = new HashMap<>(); + headersMap.put("stringHeader", List.of("firstValue", "secondValue")); + MessageHeaders headers = new MessageHeaders(headersMap); + Headers target = new RecordHeaders(); + mapper.fromHeaders(headers, target); + assertThat(target).containsExactlyInAnyOrder( + new RecordHeader("stringHeader", "firstValue".getBytes()), + new RecordHeader("stringHeader", "secondValue".getBytes()) + ); + headersMap.clear(); + mapper.toHeaders(target, headersMap); + assertThat(headersMap).contains(entry("stringHeader", List.of("firstValue", "secondValue"))); + } + @Test public void testNotStringConvert() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper();