diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 1c8fb3fea3983..7082355eb45c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -1054,7 +1054,7 @@ KTable transformValues(final ValueTransformerWithKeySupplierYou can retrieve all generated internal topic names via {@link Topology#describe()}. * *

* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index e81877c99e747..637d870ee7e88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -53,7 +53,7 @@ class KStreamKTableJoinProcessor extends ContextualProcess private final Optional gracePeriod; private TimeOrderedKeyValueBuffer buffer; protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; - private InternalProcessorContext internalProcessorContext; + private InternalProcessorContext internalProcessorContext; private final boolean useBuffer; private final String storeName; @@ -78,7 +78,7 @@ public void init(final ProcessorContext context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); - internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); + internalProcessorContext = asInternalProcessorContext(context); if (useBuffer) { if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); @@ -90,7 +90,6 @@ public void init(final ProcessorContext context) { @Override public void process(final Record record) { - internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context()); updateObservedStreamTime(record.timestamp()); if (maybeDropRecord(record)) { return; @@ -123,7 +122,6 @@ protected void updateObservedStreamTime(final long timestamp) { observedStreamTime = Math.max(observedStreamTime, timestamp); } - @SuppressWarnings("unchecked") private void doJoin(final Record record) { final K2 mappedKey = keyMapper.apply(record.key(), record.value()); final V2 value2 = getValue2(record, mappedKey); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 3d057c5ce2b47..d65244bcc86dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -41,7 +41,11 @@ * We need to clean this all up (https://issues.apache.org/jira/browse/KAFKA-17131) and mark the interface * deprecated afterward. */ -@SuppressWarnings("deprecation") +@SuppressWarnings("deprecation") // Not deprecating the old context, since it is used by Transformers. See KAFKA-10603. +/* + * When we deprecate `ProcessorContext` can also deprecate `To` class, + * as it is only used in the `ProcessorContext#forward` method. + */ public interface ProcessorContext { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java index dd533ad7ba263..9b76962f3b9de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java @@ -21,7 +21,8 @@ import java.time.Duration; /** - * A functional interface used as an argument to {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}. + * A functional interface used as an argument to + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}. * * @see Cancellable */ @@ -30,14 +31,16 @@ public interface Punctuator { /** * Perform the scheduled periodic operation. * - *

If this method accesses {@link ProcessorContext} or + *

If this method accesses {@link org.apache.kafka.streams.processor.api.ProcessorContext} or * {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record metadata like topic, * partition, and offset or {@link org.apache.kafka.streams.processor.api.RecordMetadata} won't * be available. * - *

Furthermore, for any record that is sent downstream via {@link ProcessorContext#forward(Object, Object)} + *

Furthermore, for any record that is sent downstream via + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} * or {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there - * won't be any record metadata. If {@link ProcessorContext#forward(Object, Object)} is used, + * won't be any record metadata. If + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} is used, * it's also not possible to set records headers. * * @param timestamp when the operation is being called, depending on {@link PunctuationType} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index 20890088999fb..0515f8718aa51 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import java.util.Map; @@ -64,8 +64,9 @@ public static String topicNamePrefix(final Map configs, final St } } - @SuppressWarnings("unchecked") - public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) { + public static InternalProcessorContext asInternalProcessorContext( + final ProcessorContext context + ) { if (context instanceof InternalProcessorContext) { return (InternalProcessorContext) context; } else { @@ -75,10 +76,9 @@ public static InternalProcessorContext asInternalProcessorContext(f } } - @SuppressWarnings("unchecked") - public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) { + public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) { if (context instanceof InternalProcessorContext) { - return (InternalProcessorContext) context; + return (InternalProcessorContext) context; } else { throw new IllegalArgumentException( "This component requires internal features of Kafka Streams and must be disabled for unit tests." diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index f5b4366ae9839..bde8d8319197d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -53,11 +53,10 @@ public class AbstractRocksDBSegmentedBytesStore implements Se private final String name; private final AbstractSegments segments; - private final String metricScope; private final long retentionPeriod; private final KeySchema keySchema; - private InternalProcessorContext internalProcessorContext; + private InternalProcessorContext internalProcessorContext; private Sensor expiredRecordSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; private boolean consistencyEnabled = false; @@ -66,12 +65,10 @@ public class AbstractRocksDBSegmentedBytesStore implements Se private volatile boolean open; AbstractRocksDBSegmentedBytesStore(final String name, - final String metricScope, final long retentionPeriod, final KeySchema keySchema, final AbstractSegments segments) { this.name = name; - this.metricScope = metricScope; this.retentionPeriod = retentionPeriod; this.keySchema = keySchema; this.segments = segments; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 5405ad9a71caa..9c1c3f9ae7639 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -131,4 +131,4 @@ public KeyValueIterator reverseAll() { void log(final Bytes key, final byte[] value, final long timestamp) { internalContext.logChange(name(), key, value, timestamp, wrapped().getPosition()); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index e7b7198d1cfb9..33e787adb5319 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends AbstractRocksDBSegmentedBytesSto final long retention, final long segmentInterval, final KeySchema keySchema) { - super(name, metricsScope, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval)); + super(name, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval)); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java index 39f493c761bed..2f6bcc5c052e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java @@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends AbstractRocksDBSegmen final long retention, final long segmentInterval, final KeySchema keySchema) { - super(name, metricsScope, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval)); + super(name, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index ebc068196316d..a293625dc308c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -233,7 +233,7 @@ public void shouldPreserveSerdesForOperators() { final ValueMapper mapper = value -> value; final ValueJoiner joiner = (value1, value2) -> value1; final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = - () -> new ValueTransformerWithKey() { + () -> new ValueTransformerWithKey<>() { @Override public void init(final ProcessorContext context) {} @@ -247,103 +247,103 @@ public void close() {} }; assertEquals( - ((AbstractStream) table1.filter((key, value) -> false)).keySerde(), + ((AbstractStream) table1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), + ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), + ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), + ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.mapValues(mapper)).keySerde(), + ((AbstractStream) table1.mapValues(mapper)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); + assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); assertEquals( - ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.toStream()).keySerde(), + ((AbstractStream) table1.toStream()).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.toStream()).valueSerde(), + ((AbstractStream) table1.toStream()).valueSerde(), consumedInternal.valueSerde()); - assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); + assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); assertEquals( - ((AbstractStream) table1.toStream(selector)).valueSerde(), + ((AbstractStream) table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), + ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); + assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); assertEquals( - ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); - assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), + assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); - assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde()); - assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde()); + assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde()); + assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde()); assertEquals( - ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.join(table1, joiner)).keySerde(), + ((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), + ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), + ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); } @@ -462,25 +462,25 @@ public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws Except assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007"); assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008"); - final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")) + final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")) .getClass() .getDeclaredField("valSerializer"); - final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")) + final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")) .getClass() .getDeclaredField("valDeserializer"); valSerializerField.setAccessible(true); valDeserializerField.setAccessible(true); - assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner()); - assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner()); - assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner()); - assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner()); + assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner()); + assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner()); + assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner()); + assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner()); } } @Test public void shouldNotAllowNullSelectorOnToStream() { - assertThrows(NullPointerException.class, () -> table.toStream((KeyValueMapper) null)); + assertThrows(NullPointerException.class, () -> table.toStream((KeyValueMapper) null)); } @Test @@ -495,12 +495,12 @@ public void shouldNotAllowNullPredicateOnFilterNot() { @Test public void shouldNotAllowNullMapperOnMapValues() { - assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapper) null)); + assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapper) null)); } @Test public void shouldNotAllowNullMapperOnMapValueWithKey() { - assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapperWithKey) null)); + assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapperWithKey) null)); } @Test @@ -545,27 +545,42 @@ public void shouldNotAllowNullOtherTableOnLeftJoin() { @Test public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.filter((key, value) -> false, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.filter((key, value) -> false, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.filterNot((key, value) -> false, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.filterNot((key, value) -> false, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test @@ -573,12 +588,11 @@ public void shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplie assertThrows(NullPointerException.class, () -> table.transformValues(null)); } - @SuppressWarnings("unchecked") @Test public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() { final ValueTransformerWithKeySupplier valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class); - assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized) null)); + assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized>) null)); } @Test