Skip to content

Commit

Permalink
fix: burn index by peek
Browse files Browse the repository at this point in the history
  • Loading branch information
hrishabhg committed Dec 30, 2024
1 parent 6c4c4ff commit 4ca4f40
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
Expand Down Expand Up @@ -263,7 +264,7 @@ public void shouldHaveSourceNodeForSecondSubTopologyWithKsqlNameForRepartition()
builder.build(), "Aggregate-GroupBy-repartition-source");
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList("KSTREAM-AGGREGATE-0000000004")));
assertThat(successors, equalTo(Collections.singletonList("KSTREAM-AGGREGATE-0000000005")));
assertThat(node.topicSet(), containsInAnyOrder("Aggregate-GroupBy-repartition"));
}

Expand All @@ -279,7 +280,7 @@ public void shouldHaveSourceNodeForSecondSubTopologyWithKsqlNameForRepartition()
public void shouldHaveKsqlNameForAggregationStateStore() {
build();
final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(
builder.build(), "KSTREAM-AGGREGATE-0000000003");
builder.build(), "KSTREAM-AGGREGATE-0000000004");
assertThat(node.stores(), hasItem(equalTo("Aggregate-Aggregate-Materialize")));
}

Expand Down Expand Up @@ -509,6 +510,7 @@ KStream createProxy() {
.forward("filter", methodParams(Predicate.class), this)
.forward("groupByKey", methodParams(Grouped.class), this)
.forward("groupBy", methodParams(KeyValueMapper.class, Grouped.class), this)
.forward("peek", methodParams(ForeachAction.class), this)
.build();
}

Expand Down Expand Up @@ -603,6 +605,11 @@ private KStream processValues(
return stream.createProxy();
}

@SuppressWarnings("unused") // Invoked via reflection.
private KStream peek(final ForeachAction action) {
return new FakeKStream().createProxy();
}

@SuppressWarnings("unused") // Invoked via reflection.
private KStream filter(final Predicate predicate) {
final FakeKStream stream = new FakeKStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,44 +81,33 @@ public static <K> KStreamHolder<K> build(

final Named selectName =
Named.as(StreamsUtil.buildOpName(queryContext));

// Due to a KS backward incompatibility, we need to burn an index number for the operation.
// The old `transform[Values]()` used one more index compared to the new `api.process[Values]()`
final KStream<K, GenericRow> stream = streamHolder.getStream();
try {
final Field internalStreamsBuilderField = AbstractStream.class.getDeclaredField("builder");
internalStreamsBuilderField.setAccessible(true);

final InternalStreamsBuilder internalStreamsBuilder =
(InternalStreamsBuilder) internalStreamsBuilderField.get(stream);
internalStreamsBuilder.newProcessorName(""); // burn one index number
} catch (final NoSuchFieldException | IllegalAccessException fatal) {
throw new KsqlException("Internal error.", fatal);
}

stream.peek((k, v) -> { });
if (selectedKeys.isPresent() && !selectedKeys.get().containsAll(
sourceSchema.key().stream().map(Column::name).collect(ImmutableList.toImmutableList())
)) {
return streamHolder.withStream(
stream.process(() -> new KsProcessor<>(
(readOnlyKey, value, ctx) -> {
if (keyIndices.isEmpty()) {
return null;
}
(readOnlyKey, value, ctx) -> {
if (keyIndices.isEmpty()) {
return null;
}

if (readOnlyKey instanceof GenericKey) {
final GenericKey keys = (GenericKey) readOnlyKey;
final Builder resultKeys = GenericKey.builder(keyIndices.size());
if (readOnlyKey instanceof GenericKey) {
final GenericKey keys = (GenericKey) readOnlyKey;
final Builder resultKeys = GenericKey.builder(keyIndices.size());

for (final int keyIndex : keyIndices) {
resultKeys.append(keys.get(keyIndex));
}
for (final int keyIndex : keyIndices) {
resultKeys.append(keys.get(keyIndex));
}

return (K) resultKeys.build();
} else {
throw new UnsupportedOperationException();
}
},
return (K) resultKeys.build();
} else {
throw new UnsupportedOperationException();
}
},
selectMapper.getTransformer(logger)), selectName),
selection.getSchema()
);
Expand Down

0 comments on commit 4ca4f40

Please sign in to comment.