Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jan 24, 2025
1 parent 154a308 commit 279c4fb
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
* an internally generated name, and "-repartition" is a fixed suffix.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* <p>
* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Optional;

import static java.util.Objects.requireNonNull;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

Expand Down Expand Up @@ -77,7 +78,7 @@ public void init(final ProcessorContext<K1, VOut> context) {
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
internalProcessorContext = (InternalProcessorContext<K1, VOut>) context;
internalProcessorContext = asInternalProcessorContext(context);
if (useBuffer) {
if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
* We need to clean this all up (https://issues.apache.org/jira/browse/KAFKA-17131) and mark the interface
* deprecated afterward.
*/
@SuppressWarnings("deprecation")
@SuppressWarnings("deprecation") // Not deprecating the old context, since it is used by Transformers. See KAFKA-10603.
/*
* When we deprecate `ProcessorContext` can also deprecate `To` class,
* as it is only used in the `ProcessorContext#forward` method.
*/
public interface ProcessorContext {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.time.Duration;

/**
* A functional interface used as an argument to {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
* A functional interface used as an argument to
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
*
* @see Cancellable
*/
Expand All @@ -30,14 +31,16 @@ public interface Punctuator {
/**
* Perform the scheduled periodic operation.
*
* <p> If this method accesses {@link ProcessorContext} or
* <p> If this method accesses {@link org.apache.kafka.streams.processor.api.ProcessorContext} or
* {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record metadata like topic,
* partition, and offset or {@link org.apache.kafka.streams.processor.api.RecordMetadata} won't
* be available.
*
* <p> Furthermore, for any record that is sent downstream via {@link ProcessorContext#forward(Object, Object)}
* <p> Furthermore, for any record that is sent downstream via
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}
* or {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there
* won't be any record metadata. If {@link ProcessorContext#forward(Object, Object)} is used,
* won't be any record metadata. If
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} is used,
* it's also not possible to set records headers.
*
* @param timestamp when the operation is being called, depending on {@link PunctuationType}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

import java.util.Map;
Expand Down Expand Up @@ -63,8 +64,9 @@ public static String topicNamePrefix(final Map<String, Object> configs, final St
}
}

@SuppressWarnings("unchecked")
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(final StateStoreContext context) {
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(
final ProcessorContext<K, V> context
) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext<K, V>) context;
} else {
Expand All @@ -73,4 +75,14 @@ public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(f
);
}
}

public static InternalProcessorContext<?, ?> asInternalProcessorContext(final StateStoreContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext<?, ?>) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,4 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() {
void log(final Bytes key, final byte[] value, final long timestamp) {
internalContext.logChange(name(), key, value, timestamp, wrapped().getPosition());
}
}
}
Loading

0 comments on commit 279c4fb

Please sign in to comment.