Skip to content

Commit

Permalink
fix: kstream transformer replaced with processor
Browse files Browse the repository at this point in the history
  • Loading branch information
hrishabhg committed Dec 23, 2024
1 parent fe41cff commit 525f2f3
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 110 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.process.KsqlProcessor;
import io.confluent.ksql.execution.transform.ExpressionEvaluator;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.utils.FormatOptions;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.streams.processor.api.ProcessorContext;

public class SelectValueMapper<K> {

Expand All @@ -41,7 +41,7 @@ List<SelectInfo> getSelects() {
return selects;
}

public KsqlProcessor<K, GenericRow> getTransformer(
public KsqlTransformer<K, GenericRow> getTransformer(
final ProcessingLogger processingLogger
) {
return new SelectMapper<>(selects, processingLogger);
Expand Down Expand Up @@ -88,7 +88,7 @@ public int hashCode() {
}
}

private static final class SelectMapper<K> implements KsqlProcessor<K, GenericRow> {
private static final class SelectMapper<K> implements KsqlTransformer<K, GenericRow> {

private final ImmutableList<SelectInfo> selects;
private final ProcessingLogger processingLogger;
Expand All @@ -102,10 +102,10 @@ private SelectMapper(
}

@Override
public GenericRow process(
public GenericRow transform(
final K readOnlyKey,
final GenericRow value,
final ProcessorContext<K, GenericRow> ctx
final KsqlProcessingContext ctx
) {
if (value == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.codegen.CompiledExpression;
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
import io.confluent.ksql.execution.process.KsqlProcessor;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.name.ColumnName;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -63,9 +63,9 @@ public class SelectValueMapperTest {
@Mock
private ProcessingLogger processingLogger;
@Mock
private ProcessorContext<Object, GenericRow> ctx;
private KsqlProcessingContext ctx;

private KsqlProcessor<Object, GenericRow> processor;
private KsqlTransformer<Object, GenericRow> transformer;

@Before
public void setup() {
Expand All @@ -81,14 +81,14 @@ public void setup() {
)
);

processor = selectValueMapper.getTransformer(processingLogger);
transformer = selectValueMapper.getTransformer(processingLogger);
}

@SuppressWarnings("unchecked")
@Test
public void shouldInvokeEvaluatorsWithCorrectParams() {
// When:
processor.process(KEY, VALUE, ctx);
transformer.transform(KEY, VALUE, ctx);

// Then:
final ArgumentCaptor<Supplier<String>> errorMsgCaptor = ArgumentCaptor.forClass(Supplier.class);
Expand All @@ -115,7 +115,7 @@ public void shouldEvaluateExpressions() {
when(col2.evaluate(any(), any(), any(), any())).thenReturn(300);

// When:
final GenericRow result = processor.process(KEY, VALUE, ctx);
final GenericRow result = transformer.transform(KEY, VALUE, ctx);

// Then:
assertThat(result, equalTo(genericRow(100, 200, 300)));
Expand All @@ -124,7 +124,7 @@ public void shouldEvaluateExpressions() {
@Test
public void shouldHandleNullRows() {
// When:
final GenericRow result = processor.process(KEY, null, ctx);
final GenericRow result = transformer.transform(KEY, null, ctx);

// Then:
assertThat(result, is(nullValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ static class TimestampProcessorSupplier<K>
@Override
public Processor<K, GenericRow, K, GenericRow> get() {
return new Processor<K, GenericRow, K, GenericRow>() {
private ProcessorContext processorContext;
private ProcessorContext<K, GenericRow> processorContext;

@Override
public void init(final ProcessorContext<K, GenericRow> processorContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@
/**
* <p>
* This is the V2 version of SourceBuilder, which is used to build TableSource{V2} steps.
* The reason this was neccessary was due to a change in state store schema required for the
* addition of ROWPARTITION and ROWOFFSET pseudocolumns(see
* The reason this was necessary was due to a change in state store schema required for the
* addition of ROWPARTITION and ROWOFFSET pseudo columns(see
* https://github.com/confluentinc/ksql/pull/7990 and
* https://github.com/confluentinc/ksql/pull/8072).
* </p>
*
* <p>
* If we want to support joins on windowed tables in the future while supporting these new
* pseudocolumns, it will be neccessary to bump the version of WindowedTableSource and include
* pseudo columns, it will be neccessary to bump the version of WindowedTableSource and include
* similar logic. However, this was decided against doing in the short-term, as we currently do not
* truly support joins on windowed tables (see https://github.com/confluentinc/ksql/issues/805)
* </p>
Expand Down Expand Up @@ -103,7 +103,7 @@ <K> KTable<K, GenericRow> buildKTable(
final KTable<K, GenericRow> maybeMaterialized;

if (forceMaterialization) {
// besides materializing necessary pseudocolumns, we also materialize to prevent the
// besides materializing necessary pseudo columns, we also materialize to prevent the
// source-changelog optimization in kafka streams - we don't want this optimization to
// be enabled because we cannot require symmetric serialization between
// producer and KSQL (see https://issues.apache.org/jira/browse/KAFKA-10179
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import io.confluent.ksql.execution.plan.StreamSelect;
import io.confluent.ksql.execution.runtime.RuntimeBuildContext;
import io.confluent.ksql.execution.streams.process.KsProcessorSupplier;
import io.confluent.ksql.execution.streams.process.KsValueProcessor;
import io.confluent.ksql.execution.streams.process.KsValueProcessorSupplier;
import io.confluent.ksql.execution.streams.transform.KsValueTransformer;
import io.confluent.ksql.execution.transform.select.SelectValueMapper;
import io.confluent.ksql.execution.transform.select.Selection;
import io.confluent.ksql.logging.processing.ProcessingLogger;
Expand Down Expand Up @@ -104,7 +102,7 @@ public static <K> KStreamHolder<K> build(
selectMapper.getTransformer(logger)
);
return streamHolder.withStream(
streamHolder.getStream().process(supplier),
streamHolder.getStream().process(supplier, selectName),
selection.getSchema()
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public static <K> KTableHolder<K> build(

final KTable<K, GenericRow> transFormedTable = table.getTable().transformValues(
() -> new KsValueTransformer<>(selectMapper.getTransformer(logger)),
materialized
);
materialized);

return KTableHolder.materialized(
transFormedTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,40 @@
import static java.util.Objects.requireNonNull;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.process.KsqlProcessor;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;

public class KsProcessor<KInT, KOutT> implements Processor<KInT, GenericRow, KOutT, GenericRow> {
private ProcessorContext<KOutT, GenericRow> context;
private final KsqlProcessor<KInT, KOutT> keyDelegate;
private final KsqlProcessor<KInT, GenericRow> valueDelegate;
private KsqlProcessingContext context;
private ProcessorContext<KOutT, GenericRow> apiProcessorContext;
private final KsqlTransformer<KInT, KOutT> keyDelegate;
private final KsqlTransformer<KInT, GenericRow> valueDelegate;

public KsProcessor(final KsqlProcessor<KInT, KOutT> keyDelegate,
final KsqlProcessor<KInT, GenericRow> valueDelegate) {
public KsProcessor(final KsqlTransformer<KInT, KOutT> keyDelegate,
final KsqlTransformer<KInT, GenericRow> valueDelegate) {
this.keyDelegate = requireNonNull(keyDelegate, "keyDelegate");
this.valueDelegate = requireNonNull(valueDelegate, "valueDelegate");
}

@Override
public void init(final ProcessorContext<KOutT, GenericRow> context) {
this.context = context;
public void init(final ProcessorContext<KOutT, GenericRow> apiProcessContext) {
this.context = new KsStreamProcessingContext<>(apiProcessContext);
this.apiProcessorContext = apiProcessContext;
}

@Override
public void process(final Record<KInT, GenericRow> record) {
final KInT key = record.key();
final GenericRow value = record.value();
final Record<KOutT, GenericRow> newRecord = new Record<>(
keyDelegate.process(key, value, context),
valueDelegate.process(key, value, context),
keyDelegate.transform(key, value, context),
valueDelegate.transform(key, value, context),
record.timestamp()
);
context.forward(newRecord);
apiProcessorContext.forward(newRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.confluent.ksql.execution.streams.process;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.process.KsqlProcessor;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

Expand All @@ -26,8 +26,8 @@ public class KsProcessorSupplier<KInT, KOutT>
private final KsProcessor<KInT, KOutT> processor;

public KsProcessorSupplier(
final KsqlProcessor<KInT, KOutT> keyDelegate,
final KsqlProcessor<KInT, GenericRow> valueDelegate) {
final KsqlTransformer<KInT, KOutT> keyDelegate,
final KsqlTransformer<KInT, GenericRow> valueDelegate) {
this.processor = new KsProcessor<>(keyDelegate, valueDelegate);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.streams.process;

import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;

/**
* this implements the KsqlProcessingContext interface using {@link ProcessorContext} from
* {@link Processor} Api.
*
* @param <KForwardT> type of the output key
* @param <VForwardT> type of the output value
*/
public class KsStreamProcessingContext<KForwardT, VForwardT> implements KsqlProcessingContext {

private final ProcessorContext<KForwardT, VForwardT> context;

public KsStreamProcessingContext(final ProcessorContext<KForwardT, VForwardT> processingContext) {
this.context = processingContext;
}

@Override
public long getRowTime() {
return context.currentStreamTimeMs();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,46 @@
import static java.util.Objects.requireNonNull;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.process.KsqlProcessor;
import java.util.Optional;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;

/**
* A Kafka-streams value processor
*
* <p>Maps an implementation agnostic {@link KsqlProcessor} to an implementation specific {@link
* <p>Maps an implementation agnostic {@link KsqlTransformer} to an implementation specific {@link
* Processor}.
*
* @param <K> the type of the key
* @param <R> the return type
*/
public class KsValueProcessor<K, R> implements Processor<K, GenericRow, K, R> {
private final KsqlProcessor<K, R> delegate;
private ProcessorContext<K, R> context;
private final KsqlTransformer<K, R> delegate;
private ProcessorContext<K, R> apiContext;
private KsqlProcessingContext context;

public KsValueProcessor(final KsqlProcessor<K, R> delegate) {
public KsValueProcessor(final KsqlTransformer<K, R> delegate) {
this.delegate = requireNonNull(delegate, "delegate");
this.context = null;
this.apiContext = null;
}

@Override
public void init(final ProcessorContext<K, R> context) {
this.context = context;
this.apiContext = context;
this.context = new KsStreamProcessingContext<>(context);
}

@Override
public void process(final Record<K, GenericRow> record) {
final K key = record.key();
final GenericRow value = record.value();
final R result = delegate.process(
final R result = delegate.transform(
key,
value,
context
);
context.forward(new Record<>(key, result, record.timestamp()));
apiContext.forward(new Record<>(key, result, record.timestamp()));
}

}
Loading

0 comments on commit 525f2f3

Please sign in to comment.