From 279c4fbeb88857f1a965c7f448711b98a9fa9056 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 19 Dec 2024 17:15:45 -0800 Subject: [PATCH] more cleanup --- .../apache/kafka/streams/kstream/KTable.java | 2 +- .../internals/KStreamKTableJoinProcessor.java | 3 +- .../streams/processor/ProcessorContext.java | 6 +- .../kafka/streams/processor/Punctuator.java | 11 +- .../internals/ProcessorContextUtils.java | 16 ++- .../ChangeLoggingKeyValueBytesStore.java | 2 +- .../kstream/internals/KTableImplTest.java | 120 ++++++++++-------- 7 files changed, 97 insertions(+), 63 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 1c8fb3fea3983..7082355eb45c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -1054,7 +1054,7 @@ KTable transformValues(final ValueTransformerWithKeySupplierYou can retrieve all generated internal topic names via {@link Topology#describe()}. * *

* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index cc5268153c69e..637d870ee7e88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -38,6 +38,7 @@ import java.util.Optional; import static java.util.Objects.requireNonNull; +import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; @@ -77,7 +78,7 @@ public void init(final ProcessorContext context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); - internalProcessorContext = (InternalProcessorContext) context; + internalProcessorContext = asInternalProcessorContext(context); if (useBuffer) { if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 3d057c5ce2b47..d65244bcc86dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -41,7 +41,11 @@ * We need to clean this all up (https://issues.apache.org/jira/browse/KAFKA-17131) and mark the interface * deprecated afterward. */ -@SuppressWarnings("deprecation") +@SuppressWarnings("deprecation") // Not deprecating the old context, since it is used by Transformers. See KAFKA-10603. +/* + * When we deprecate `ProcessorContext` can also deprecate `To` class, + * as it is only used in the `ProcessorContext#forward` method. + */ public interface ProcessorContext { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java index dd533ad7ba263..9b76962f3b9de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java @@ -21,7 +21,8 @@ import java.time.Duration; /** - * A functional interface used as an argument to {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}. + * A functional interface used as an argument to + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}. * * @see Cancellable */ @@ -30,14 +31,16 @@ public interface Punctuator { /** * Perform the scheduled periodic operation. * - *

If this method accesses {@link ProcessorContext} or + *

If this method accesses {@link org.apache.kafka.streams.processor.api.ProcessorContext} or * {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record metadata like topic, * partition, and offset or {@link org.apache.kafka.streams.processor.api.RecordMetadata} won't * be available. * - *

Furthermore, for any record that is sent downstream via {@link ProcessorContext#forward(Object, Object)} + *

Furthermore, for any record that is sent downstream via + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} * or {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there - * won't be any record metadata. If {@link ProcessorContext#forward(Object, Object)} is used, + * won't be any record metadata. If + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} is used, * it's also not possible to set records headers. * * @param timestamp when the operation is being called, depending on {@link PunctuationType} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index 0fcde6a8c469e..0515f8718aa51 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import java.util.Map; @@ -63,8 +64,9 @@ public static String topicNamePrefix(final Map configs, final St } } - @SuppressWarnings("unchecked") - public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) { + public static InternalProcessorContext asInternalProcessorContext( + final ProcessorContext context + ) { if (context instanceof InternalProcessorContext) { return (InternalProcessorContext) context; } else { @@ -73,4 +75,14 @@ public static InternalProcessorContext asInternalProcessorContext(f ); } } + + public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) { + if (context instanceof InternalProcessorContext) { + return (InternalProcessorContext) context; + } else { + throw new IllegalArgumentException( + "This component requires internal features of Kafka Streams and must be disabled for unit tests." + ); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 5405ad9a71caa..9c1c3f9ae7639 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -131,4 +131,4 @@ public KeyValueIterator reverseAll() { void log(final Bytes key, final byte[] value, final long timestamp) { internalContext.logChange(name(), key, value, timestamp, wrapped().getPosition()); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index ebc068196316d..a293625dc308c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -233,7 +233,7 @@ public void shouldPreserveSerdesForOperators() { final ValueMapper mapper = value -> value; final ValueJoiner joiner = (value1, value2) -> value1; final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = - () -> new ValueTransformerWithKey() { + () -> new ValueTransformerWithKey<>() { @Override public void init(final ProcessorContext context) {} @@ -247,103 +247,103 @@ public void close() {} }; assertEquals( - ((AbstractStream) table1.filter((key, value) -> false)).keySerde(), + ((AbstractStream) table1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), + ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), + ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), + ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.mapValues(mapper)).keySerde(), + ((AbstractStream) table1.mapValues(mapper)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); + assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); assertEquals( - ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.toStream()).keySerde(), + ((AbstractStream) table1.toStream()).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.toStream()).valueSerde(), + ((AbstractStream) table1.toStream()).valueSerde(), consumedInternal.valueSerde()); - assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); + assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); assertEquals( - ((AbstractStream) table1.toStream(selector)).valueSerde(), + ((AbstractStream) table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), + ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); + assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); assertEquals( - ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); - assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), + assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); - assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde()); - assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde()); + assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde()); + assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde()); assertEquals( - ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.join(table1, joiner)).keySerde(), + ((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), + ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), + ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); } @@ -462,25 +462,25 @@ public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws Except assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007"); assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008"); - final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")) + final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")) .getClass() .getDeclaredField("valSerializer"); - final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")) + final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")) .getClass() .getDeclaredField("valDeserializer"); valSerializerField.setAccessible(true); valDeserializerField.setAccessible(true); - assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner()); - assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner()); - assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner()); - assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner()); + assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner()); + assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner()); + assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner()); + assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner()); } } @Test public void shouldNotAllowNullSelectorOnToStream() { - assertThrows(NullPointerException.class, () -> table.toStream((KeyValueMapper) null)); + assertThrows(NullPointerException.class, () -> table.toStream((KeyValueMapper) null)); } @Test @@ -495,12 +495,12 @@ public void shouldNotAllowNullPredicateOnFilterNot() { @Test public void shouldNotAllowNullMapperOnMapValues() { - assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapper) null)); + assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapper) null)); } @Test public void shouldNotAllowNullMapperOnMapValueWithKey() { - assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapperWithKey) null)); + assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapperWithKey) null)); } @Test @@ -545,27 +545,42 @@ public void shouldNotAllowNullOtherTableOnLeftJoin() { @Test public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.filter((key, value) -> false, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.filter((key, value) -> false, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.filterNot((key, value) -> false, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.filterNot((key, value) -> false, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test @@ -573,12 +588,11 @@ public void shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplie assertThrows(NullPointerException.class, () -> table.transformValues(null)); } - @SuppressWarnings("unchecked") @Test public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() { final ValueTransformerWithKeySupplier valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class); - assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized) null)); + assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized>) null)); } @Test