Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master Build Fixes - AclAuthorizer and TransformValues Replacement #10645

Open
wants to merge 58 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
87aa49e
Use assertEquals instead
VedarthConfluent Dec 6, 2024
b4ec319
Fix assert statement
VedarthConfluent Dec 6, 2024
c10bc81
Fix asser equals
VedarthConfluent Dec 6, 2024
dbeaef1
Fix tests
VedarthConfluent Dec 9, 2024
ecd9e6a
Fixes a test
VedarthConfluent Dec 9, 2024
600b73d
Revert test updates
VedarthConfluent Dec 11, 2024
dd790e6
Encode the test url
VedarthConfluent Dec 13, 2024
700a0d6
Revert url encoding
VedarthConfluent Dec 13, 2024
0d4da08
fix: replaced transformer with processor for sinkbuilder
hrishabhg Dec 19, 2024
788a093
refactor: renamed vars
hrishabhg Dec 19, 2024
31e0b56
refactor: reverted changes
hrishabhg Dec 19, 2024
2ed9f5c
refactor: renamed parameter with better name
hrishabhg Dec 20, 2024
71dba6d
fix: replaced ksqltransformer with ksqlprocessor
hrishabhg Dec 20, 2024
fe41cff
fix: replaced ksqltransformer with ksqlprocessor
hrishabhg Dec 20, 2024
525f2f3
fix: kstream transformer replaced with processor
hrishabhg Dec 22, 2024
adca69f
fix: fixed tests
hrishabhg Dec 23, 2024
15687d3
fix: fixed tests for streambuilder
hrishabhg Dec 23, 2024
fdec55e
fix: fixed tests for streambuilder
hrishabhg Dec 23, 2024
e634d5f
fix: fixed test
hrishabhg Dec 23, 2024
4c17c6f
refactor: simplified ksprocessor
hrishabhg Dec 24, 2024
f16b882
refactor: interface implementation simplified
hrishabhg Dec 24, 2024
04a5e31
refactor: renamed transform with process
hrishabhg Dec 26, 2024
afdf679
refactor: renamed with kstream convention
hrishabhg Dec 26, 2024
fd05218
fix: added test for ksprocessor and ksfixedkeyprocessor
hrishabhg Dec 26, 2024
08fb0d2
fix: updated the fakestream impl
hrishabhg Dec 26, 2024
8515e5c
fix: updated stream topology collection in test
hrishabhg Dec 26, 2024
72f9419
fix: fixed tests
hrishabhg Dec 26, 2024
9f21f2a
fix: ignore postAggregationMapper Tes
hrishabhg Dec 27, 2024
b35ee9c
fix: fixed pre-agg test
hrishabhg Dec 27, 2024
d225a84
fix: fixed pre-agg test
hrishabhg Dec 27, 2024
fa8d6df
fix: checkstyle for unused import
hrishabhg Dec 27, 2024
6c4c4ff
attempt to fix backward incompatibility
mjsax Dec 28, 2024
4ca4f40
fix: burn index by peek
hrishabhg Dec 30, 2024
8ed2f1a
fix: checkstyle-removed unused imports
hrishabhg Dec 30, 2024
88f5c83
fix: test
hrishabhg Dec 30, 2024
9ef8c4a
fix: test
hrishabhg Dec 30, 2024
4cbeb83
fix: checkstyle fix
hrishabhg Jan 2, 2025
d086480
fix: fixed historical plan tests
hrishabhg Jan 2, 2025
fff5571
fix: reverted semaphore
hrishabhg Jan 2, 2025
06523fe
fix: fixed the error message check
hrishabhg Jan 2, 2025
1757a55
fix: updated historical plans version
hrishabhg Jan 3, 2025
7f7c9bf
Merge branch 'master' into master-build-failure-18122024
hrishabhg Jan 3, 2025
a3033d6
fix: removed ksqlprocessingctx usage
hrishabhg Jan 3, 2025
43344d5
fix: removed rowtime checks
hrishabhg Jan 3, 2025
24655f6
refactor: refactored for consistency
hrishabhg Jan 7, 2025
ecbbdaa
Merge branch 'master' into transformer-migration-2
hrishabhg Jan 7, 2025
6810fef
refactor: imported namespace FixedKeyProcessorContext
hrishabhg Jan 15, 2025
2b3ff5f
fix: replaced aclauthorizer with standardauthorizer for tests
hrishabhg Jan 15, 2025
f355cdc
fix: replaced stream.transformValues with stream.processValues
hrishabhg Jan 15, 2025
6186a19
fix: tests errors fixed
hrishabhg Jan 15, 2025
ceda846
fix: fixed tests
hrishabhg Jan 15, 2025
224e94c
Merge branch 'master' into fix-master-build-jan-2025
hrishabhg Jan 15, 2025
47c0d60
fix: fixed tests
hrishabhg Jan 15, 2025
c4c82df
fix: replaced transformvalues with processvalues for test
hrishabhg Jan 16, 2025
5537aa5
fix: replaced transformvalues with processvalues for test
hrishabhg Jan 16, 2025
d25923a
fix: added stream.peek hack for maintaining the state
hrishabhg Jan 16, 2025
1df46ba
fix: added named.class in test
hrishabhg Jan 16, 2025
80dd096
fix: added named.class in test
hrishabhg Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.LimitedProxyBuilder;
import io.confluent.ksql.util.MetaStoreFixture;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
Expand Down Expand Up @@ -145,7 +146,8 @@ public void shouldBuildSourceNode() {
.collect(Collectors.toList());

assertThat(sourceNode.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList(TRANSFORM_NODE)));
assertThat(successors, equalTo(Arrays.asList("KSTREAM-PEEK-0000000001",
TRANSFORM_NODE)));
assertThat(sourceNode.topicSet(), equalTo(ImmutableSet.of("test1")));
}

Expand Down Expand Up @@ -495,11 +497,6 @@ KStream createProxy() {
.forward("mapValues", methodParams(ValueMapperWithKey.class), this)
.forward("mapValues", methodParams(ValueMapperWithKey.class, Named.class),
this)
.forward("transformValues",
methodParams(ValueTransformerWithKeySupplier.class, String[].class), this)
.forward("transformValues",
methodParams(ValueTransformerWithKeySupplier.class, Named.class, String[].class),
this)
.forward("process",
methodParams(ProcessorSupplier.class, String[].class), this)
.forward("process",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -210,7 +211,8 @@ public void shouldBuildSourceNode() {
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(realBuilder.build(), PlanTestUtil.SOURCE_NODE);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList(PlanTestUtil.TRANSFORM_NODE)));
assertThat(successors, equalTo(Arrays.asList(PlanTestUtil.PEEK_NODE,
PlanTestUtil.TRANSFORM_NODE)));
assertThat(node.topicSet(), equalTo(ImmutableSet.of("topic")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void shouldBuildSourceNode() {
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name)
.collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList(TRANSFORM_NODE)));
assertThat(successors, equalTo(Arrays.asList("KSTREAM-PEEK-0000000001", TRANSFORM_NODE)));
assertThat(node.topicSet(), equalTo(ImmutableSet.of("test1")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

final class PlanTestUtil {

static final String PEEK_NODE = "KSTREAM-PEEK-0000000001";
static final String TRANSFORM_NODE = "KSTREAM-TRANSFORMVALUES-0000000001";
static final String SOURCE_NODE = "KSTREAM-SOURCE-0000000000";
static final String SOURCE_NODE_FORCE_CHANGELOG = "KSTREAM-SOURCE-0000000001";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
--> KSTREAM-PEEK-0000000001, KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> WhereFilter
<-- KSTREAM-SOURCE-0000000000
Expand All @@ -11,6 +11,9 @@ Topologies:
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000004
<-- WhereFilter
Processor: KSTREAM-PEEK-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000003 (stores: [])
--> none
<-- WhereFilter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
--> KSTREAM-PEEK-0000000001, KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> WhereFilter
<-- KSTREAM-SOURCE-0000000000
Expand All @@ -11,6 +11,9 @@ Topologies:
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000004
<-- WhereFilter
Processor: KSTREAM-PEEK-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000003 (stores: [])
--> none
<-- WhereFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.RecordMetadata;

final class SourceBuilderUtils {

Expand Down Expand Up @@ -403,4 +407,74 @@ public void close() {
};
}
}

/**
* Note: It is duplicate code of AddKeyAndPseudoColumn's transformer
* @param <K> the key type
*/
static class AddKeyAndPseudoColumnsProcessor<K>
implements FixedKeyProcessor<K, GenericRow, GenericRow> {

private final Function<K, Collection<?>> keyGenerator;
private final int pseudoColumnVersion;
private final List<Column> headerColumns;
private FixedKeyProcessorContext<K, GenericRow> processorContext;

AddKeyAndPseudoColumnsProcessor(
final Function<K, Collection<?>> keyGenerator,
final int pseudoColumnVersion,
final List<Column> headerColumns
) {
this.keyGenerator = requireNonNull(keyGenerator, "keyGenerator");
this.pseudoColumnVersion = pseudoColumnVersion;
this.headerColumns = headerColumns;
}

@Override
public void init(final FixedKeyProcessorContext<K, GenericRow> processorContext) {
this.processorContext = requireNonNull(processorContext, "processorContext");
}

@Override
public void process(final FixedKeyRecord<K, GenericRow> record) {
final K key = record.key();
final GenericRow row = record.value();

if (row == null) {
processorContext.forward(record);
return;
}

final Collection<?> keyColumns = keyGenerator.apply(key);

final int numPseudoColumns = SystemColumns
.pseudoColumnNames(pseudoColumnVersion).size();

row.ensureAdditionalCapacity(numPseudoColumns + keyColumns.size() + headerColumns.size());

for (final Column col : headerColumns) {
if (col.headerKey().isPresent()) {
row.append(extractHeader(record.headers(), col.headerKey().get()));
} else {
row.append(createHeaderData(record.headers()));
}
}

if (pseudoColumnVersion >= SystemColumns.ROWTIME_PSEUDOCOLUMN_VERSION) {
final long timestamp = record.timestamp();
row.append(timestamp);
}

if (pseudoColumnVersion >= SystemColumns.ROWPARTITION_ROWOFFSET_PSEUDOCOLUMN_VERSION) {
final RecordMetadata recordMetadata = processorContext.recordMetadata().get();
final int partition = recordMetadata.partition();
final long offset = recordMetadata.offset();
row.append(partition);
row.append(offset);
}

row.appendAll(keyColumns);
processorContext.forward(record.withValue(row));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.confluent.ksql.execution.plan.WindowedTableSource;
import io.confluent.ksql.execution.runtime.MaterializedFactory;
import io.confluent.ksql.execution.runtime.RuntimeBuildContext;
import io.confluent.ksql.execution.streams.SourceBuilderUtils.AddKeyAndPseudoColumnsProcessor;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.StaticTopicSerde;
Expand All @@ -55,6 +56,7 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueStore;

Expand Down Expand Up @@ -300,9 +302,10 @@ private <K> KStream<K, GenericRow> buildKStream(
.stream(streamSource.getTopicName(), consumed);

final int pseudoColumnVersion = streamSource.getPseudoColumnVersion();
return stream
.transformValues(new AddKeyAndPseudoColumns<>(
keyGenerator, pseudoColumnVersion, streamSource.getSourceSchema().headers()));
stream.peek((k, v) -> { });
return stream.processValues(() -> new AddKeyAndPseudoColumnsProcessor<>(
keyGenerator, pseudoColumnVersion, streamSource.getSourceSchema().headers()),
Named.as("KSTREAM-TRANSFORMVALUES-0000000001"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this node named explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done as per test case requirement. I have asked kstream team if this looks okay. It was generating KSTREAM-PROCESSVALUES-0000000001

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Processor name by themself should actually not be critical if they change. We need to worry about topic and state store names though.

What break if the processor name changes? If it's really only the name, it should be ok to accept the change and update the tests. If it has side-effects, setting the name here would sound rights.

The other question is, if we name explicitly, could it cause side-effect with regard to auto-naming of downstream processor -- if we name expliclity, we might not us the next index, and create different name later, what could also lead to undesried side effects.

}

private static Function<GenericKey, Collection<?>> nonWindowedKeyGenerator(
Expand Down
Loading