Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13722: remove internal usage of old ProcessorContext #18698

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
* an internally generated name, and "-repartition" is a fixed suffix.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* <p>
* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
private final Optional<Duration> gracePeriod;
private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private InternalProcessorContext internalProcessorContext;
private InternalProcessorContext<K1, VOut> internalProcessorContext;
private final boolean useBuffer;
private final String storeName;

Expand All @@ -78,7 +78,7 @@ public void init(final ProcessorContext<K1, VOut> context) {
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
internalProcessorContext = asInternalProcessorContext(context);
if (useBuffer) {
if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
Expand All @@ -90,7 +90,6 @@ public void init(final ProcessorContext<K1, VOut> context) {

@Override
public void process(final Record<K1, V1> record) {
internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
updateObservedStreamTime(record.timestamp());
if (maybeDropRecord(record)) {
return;
Expand Down Expand Up @@ -123,7 +122,6 @@ protected void updateObservedStreamTime(final long timestamp) {
observedStreamTime = Math.max(observedStreamTime, timestamp);
}

@SuppressWarnings("unchecked")
private void doJoin(final Record<K1, V1> record) {
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final V2 value2 = getValue2(record, mappedKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
* We need to clean this all up (https://issues.apache.org/jira/browse/KAFKA-17131) and mark the interface
* deprecated afterward.
*/
@SuppressWarnings("deprecation")
@SuppressWarnings("deprecation") // Not deprecating the old context, since it is used by Transformers. See KAFKA-10603.
/*
* When we deprecate `ProcessorContext` can also deprecate `To` class,
* as it is only used in the `ProcessorContext#forward` method.
*/
public interface ProcessorContext {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.time.Duration;

/**
* A functional interface used as an argument to {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
* A functional interface used as an argument to
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
*
* @see Cancellable
*/
Expand All @@ -30,14 +31,16 @@ public interface Punctuator {
/**
* Perform the scheduled periodic operation.
*
* <p> If this method accesses {@link ProcessorContext} or
* <p> If this method accesses {@link org.apache.kafka.streams.processor.api.ProcessorContext} or
* {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record metadata like topic,
* partition, and offset or {@link org.apache.kafka.streams.processor.api.RecordMetadata} won't
* be available.
*
* <p> Furthermore, for any record that is sent downstream via {@link ProcessorContext#forward(Object, Object)}
* <p> Furthermore, for any record that is sent downstream via
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}
* or {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there
* won't be any record metadata. If {@link ProcessorContext#forward(Object, Object)} is used,
* won't be any record metadata. If
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} is used,
* it's also not possible to set records headers.
*
* @param timestamp when the operation is being called, depending on {@link PunctuationType}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

import java.util.Map;
Expand Down Expand Up @@ -64,8 +64,9 @@ public static String topicNamePrefix(final Map<String, Object> configs, final St
}
}

@SuppressWarnings("unchecked")
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(final ProcessorContext context) {
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(
final ProcessorContext<K, V> context
) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext<K, V>) context;
} else {
Expand All @@ -75,10 +76,9 @@ public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(f
}
}

@SuppressWarnings("unchecked")
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(final StateStoreContext context) {
public static InternalProcessorContext<?, ?> asInternalProcessorContext(final StateStoreContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext<K, V>) context;
return (InternalProcessorContext<?, ?>) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se

private final String name;
private final AbstractSegments<S> segments;
private final String metricScope;
private final long retentionPeriod;
private final KeySchema keySchema;

private InternalProcessorContext internalProcessorContext;
private InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
Expand All @@ -66,12 +65,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private volatile boolean open;

AbstractRocksDBSegmentedBytesStore(final String name,
final String metricScope,
final long retentionPeriod,
final KeySchema keySchema,
final AbstractSegments<S> segments) {
this.name = name;
this.metricScope = metricScope;
this.retentionPeriod = retentionPeriod;
this.keySchema = keySchema;
this.segments = segments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,4 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() {
void log(final Bytes key, final byte[] value, final long timestamp) {
internalContext.logChange(name(), key, value, timestamp, wrapped().getPosition());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends AbstractRocksDBSegmentedBytesSto
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
super(name, metricsScope, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
super(name, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends AbstractRocksDBSegmen
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
super(name, metricsScope, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
super(name, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
}
}
Loading
Loading