From be71f70fd766cf99060508abf7fb3c25b930bfdf Mon Sep 17 00:00:00 2001 From: greg higgins Date: Fri, 27 Dec 2024 12:15:39 +0000 Subject: [PATCH] release 9.4.5 (#330) * updating poms for 9.4.3-SNAPSHOT development * updating poms for branch'release/9.4.2' with non-snapshot versions * updating develop poms to master versions to avoid merge conflicts * Updating develop poms back to pre merge state * Feature/reentrant callback bug (#326) * fixes EventProcessorContext#getStaticEventProcessor for InMemoryEventProcessor, was throwing null pointer * re-entrant callback fix, pushes rentrant events to the front of queued events --------- Co-authored-by: greg * doc version number for next release * updating poms for 9.4.3 branch with snapshot versions * updating poms for 9.4.4-SNAPSHOT development * updating poms for branch'release/9.4.3' with non-snapshot versions * updating develop poms to master versions to avoid merge conflicts * Updating develop poms back to pre merge state * add a BaseNode class that extracts the current name of the node in an event processor * updating poms for 9.4.5-SNAPSHOT development * updating poms for branch'release/9.4.4' with non-snapshot versions * updating develop poms to master versions to avoid merge conflicts * Updating develop poms back to pre merge state * update kafka example * add a BaseNode class that extracts the current name of the node in an event processor (#329) --------- Co-authored-by: runner Co-authored-by: greg Co-authored-by: runner Co-authored-by: runner --- compiler/pom.xml | 2 +- .../builder/dataflow/FlowBuilder.java | 23 +++++++ .../dataflow/EventStreamBuildTest.java | 68 +++++++++++++++++++ parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- .../function/FlatMapArrayFlowFunction.java | 17 ++++- .../function/FlatMapFlowFunction.java | 17 ++++- .../function/FlatMapIteratorFlowFunction.java | 17 ++++- .../runtime/output/SinkRegistration.java | 7 +- 10 files changed, 145 insertions(+), 12 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index f855fbf46..064f6d211 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -8,7 +8,7 @@ com.fluxtion root-parent-pom - 9.4.4-SNAPSHOT + 9.4.5-SNAPSHOT ../parent-root/pom.xml diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java index ed3f0465d..dc21e3102 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java +++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java @@ -1,3 +1,8 @@ +/* + * SPDX-FileCopyrightText: © 2024 Gregory Higgins + * SPDX-License-Identifier: AGPL-3.0-only + */ + package com.fluxtion.compiler.builder.dataflow; import com.fluxtion.runtime.EventProcessorBuilderService; @@ -128,14 +133,32 @@ public FlowBuilder flatMap(SerializableFunction> iterableF return new FlowBuilder<>(new FlatMapFlowFunction<>(eventStream, iterableFunction)); } + public FlowBuilder flatMap(SerializableFunction> iterableFunction, String flatMapCompleteSignal) { + FlatMapFlowFunction> flatMapIteratorFlowFunction = new FlatMapFlowFunction<>(eventStream, iterableFunction); + flatMapIteratorFlowFunction.setFlatMapCompleteSignal(flatMapCompleteSignal); + return new FlowBuilder<>(flatMapIteratorFlowFunction); + } + public FlowBuilder flatMapFromIterator(SerializableFunction> iterableFunction) { return new FlowBuilder<>(new FlatMapIteratorFlowFunction<>(eventStream, iterableFunction)); } + public FlowBuilder flatMapFromIterator(SerializableFunction> iterableFunction, String flatMapCompleteSignal) { + FlatMapIteratorFlowFunction> flatMapIteratorFlowFunction = new FlatMapIteratorFlowFunction<>(eventStream, iterableFunction); + flatMapIteratorFlowFunction.setFlatMapCompleteSignal(flatMapCompleteSignal); + return new FlowBuilder<>(flatMapIteratorFlowFunction); + } + public FlowBuilder flatMapFromArray(SerializableFunction iterableFunction) { return new FlowBuilder<>(new FlatMapArrayFlowFunction<>(eventStream, iterableFunction)); } + public FlowBuilder flatMapFromArray(SerializableFunction iterableFunction, String flatMapCompleteSignal) { + FlatMapArrayFlowFunction> flatMapIteratorFlowFunction = new FlatMapArrayFlowFunction<>(eventStream, iterableFunction); + flatMapIteratorFlowFunction.setFlatMapCompleteSignal(flatMapCompleteSignal); + return new FlowBuilder<>(flatMapIteratorFlowFunction); + } + public > FlowBuilder aggregate(SerializableSupplier aggregateFunction) { return new FlowBuilder<>(new AggregateFlowFunctionWrapper<>(eventStream, aggregateFunction)); diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java index 0b652c040..aab33abc1 100644 --- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java +++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java @@ -1,3 +1,8 @@ +/* + * SPDX-FileCopyrightText: © 2024 Gregory Higgins + * SPDX-License-Identifier: AGPL-3.0-only + */ + package com.fluxtion.compiler.builder.dataflow; import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest.SepTestConfig; @@ -200,6 +205,28 @@ public void flatMapTest() { assertThat(notifyTarget.getStrings(), CoreMatchers.hasItems("one", "2", "THREE")); } + @Test + public void flatMapWithSignalTest() { + sep(c -> { + subscribe(String.class) + .flatMap(EventStreamBuildTest::csvToIterable, "myfilter") + .push(new NotifyAndPushTarget()::addStringElement); + + subscribeToSignal("myfilter") + .mapToInt(Mappers.count()).id("count_strings"); + } + ); + + assertThat(getStreamed("count_strings"), CoreMatchers.is(0)); + + onEvent("one,2,THREE"); + NotifyAndPushTarget notifyTarget = getField("notifyTarget"); + assertThat(notifyTarget.getOnEventCount(), is(3)); + assertThat(notifyTarget.getStrings(), CoreMatchers.hasItems("one", "2", "THREE")); + + assertThat(getStreamed("count_strings"), CoreMatchers.is(1)); + } + @Test public void flatMapFromIteratorTest() { sep(c -> subscribe(String.class) @@ -212,6 +239,28 @@ public void flatMapFromIteratorTest() { assertThat(notifyTarget.getStrings(), CoreMatchers.hasItems("one", "2", "THREE")); } + @Test + public void flatMapFromIteratorWithSignalTest() { + sep(c -> { + subscribe(String.class) + .flatMapFromIterator(EventStreamBuildTest::csvToIterator, "myfilter") + .push(new NotifyAndPushTarget()::addStringElement); + + subscribeToSignal("myfilter") + .mapToInt(Mappers.count()).id("count_strings"); + } + ); + + assertThat(getStreamed("count_strings"), CoreMatchers.is(0)); + + onEvent("one,2,THREE"); + NotifyAndPushTarget notifyTarget = getField("notifyTarget"); + assertThat(notifyTarget.getOnEventCount(), is(3)); + assertThat(notifyTarget.getStrings(), CoreMatchers.hasItems("one", "2", "THREE")); + + assertThat(getStreamed("count_strings"), CoreMatchers.is(1)); + } + @Test public void flatMapThenMapEachElementTest() { sep(c -> subscribe(String.class) @@ -245,6 +294,25 @@ public void flatMapFromArrayThenMapEachElementTest() { assertThat(getStreamed("sum"), is(103)); } + @Test + public void flatMapFromArrayWithSignalThenMapEachElementTest() { + sep(c -> { + subscribe(String.class) + .flatMapFromArray(EventStreamBuildTest::csvToStringArray, "myfilter") + .mapToInt(EventStreamBuildTest::parseInt) + .map(Mappers.cumSumInt()).id("sum"); + + subscribeToSignal("myfilter") + .mapToInt(Mappers.count()).id("count_strings"); + } + ); + assertThat(getStreamed("count_strings"), CoreMatchers.is(0)); + + onEvent("15,33,55"); + assertThat(getStreamed("sum"), is(103)); + assertThat(getStreamed("count_strings"), CoreMatchers.is(1)); + } + @Test public void mapWithInstanceFunctionTest() { sep(c -> subscribe(Integer.class).map(new Adder()::add).id("cumsum")); diff --git a/parent-root/pom.xml b/parent-root/pom.xml index 578e04942..381526036 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.4.4-SNAPSHOT + 9.4.5-SNAPSHOT pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index ffff61923..a8c097d7b 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.4.4-SNAPSHOT + 9.4.5-SNAPSHOT pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index 204166f33..7b5c2b9b8 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2024 gregory higgins (C) 2024 gregory higgins com.fluxtion root-parent-pom - 9.4.4-SNAPSHOT + 9.4.5-SNAPSHOT ../parent-root/pom.xml diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapArrayFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapArrayFlowFunction.java index 9c73ce7fa..7e6a03967 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapArrayFlowFunction.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapArrayFlowFunction.java @@ -1,3 +1,8 @@ +/* + * SPDX-FileCopyrightText: © 2024 Gregory Higgins + * SPDX-License-Identifier: AGPL-3.0-only + */ + package com.fluxtion.runtime.dataflow.function; import com.fluxtion.runtime.EventProcessorBuilderService; @@ -5,12 +10,14 @@ import com.fluxtion.runtime.annotations.OnParentUpdate; import com.fluxtion.runtime.annotations.OnTrigger; import com.fluxtion.runtime.annotations.builder.Inject; -import com.fluxtion.runtime.audit.EventLogNode; import com.fluxtion.runtime.callback.Callback; import com.fluxtion.runtime.callback.DirtyStateMonitor; import com.fluxtion.runtime.dataflow.FlowFunction; import com.fluxtion.runtime.dataflow.TriggeredFlowFunction; +import com.fluxtion.runtime.node.BaseNode; import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction; +import lombok.Getter; +import lombok.Setter; import java.util.Arrays; @@ -21,7 +28,7 @@ * @param Output type * @param Previous {@link FlowFunction} type */ -public class FlatMapArrayFlowFunction> extends EventLogNode implements TriggeredFlowFunction { +public class FlatMapArrayFlowFunction> extends BaseNode implements TriggeredFlowFunction { @NoTriggerReference private final S inputEventStream; @@ -33,6 +40,9 @@ public class FlatMapArrayFlowFunction> extends E private transient R value; @Inject public Callback callback; + @Getter + @Setter + private String flatMapCompleteSignal; public FlatMapArrayFlowFunction(S inputEventStream, SerializableFunction iterableFunction) { this.inputEventStream = inputEventStream; @@ -49,6 +59,9 @@ public void inputUpdatedAndFlatMap(S inputEventStream) { T input = inputEventStream.get(); Iterable iterable = Arrays.asList(iterableFunction.apply(input)); callback.fireCallback(iterable.iterator()); + if (flatMapCompleteSignal != null) { + getContext().getStaticEventProcessor().publishSignal(flatMapCompleteSignal, flatMapCompleteSignal); + } } @Override diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapFlowFunction.java index e208fe1ee..a74dd119e 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapFlowFunction.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapFlowFunction.java @@ -1,3 +1,8 @@ +/* + * SPDX-FileCopyrightText: © 2024 Gregory Higgins + * SPDX-License-Identifier: AGPL-3.0-only + */ + package com.fluxtion.runtime.dataflow.function; import com.fluxtion.runtime.EventProcessorBuilderService; @@ -5,12 +10,14 @@ import com.fluxtion.runtime.annotations.OnParentUpdate; import com.fluxtion.runtime.annotations.OnTrigger; import com.fluxtion.runtime.annotations.builder.Inject; -import com.fluxtion.runtime.audit.EventLogNode; import com.fluxtion.runtime.callback.Callback; import com.fluxtion.runtime.callback.DirtyStateMonitor; import com.fluxtion.runtime.dataflow.FlowFunction; import com.fluxtion.runtime.dataflow.TriggeredFlowFunction; +import com.fluxtion.runtime.node.BaseNode; import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction; +import lombok.Getter; +import lombok.Setter; /** * Flatmap stream node @@ -19,7 +26,7 @@ * @param Output type * @param Previous {@link FlowFunction} type */ -public class FlatMapFlowFunction> extends EventLogNode implements TriggeredFlowFunction { +public class FlatMapFlowFunction> extends BaseNode implements TriggeredFlowFunction { @NoTriggerReference private final S inputEventStream; @@ -31,6 +38,9 @@ public class FlatMapFlowFunction> extends EventL public Callback callback; @Inject public DirtyStateMonitor dirtyStateMonitor; + @Getter + @Setter + private String flatMapCompleteSignal; public FlatMapFlowFunction(S inputEventStream, SerializableFunction> iterableFunction) { this.inputEventStream = inputEventStream; @@ -47,6 +57,9 @@ public void inputUpdatedAndFlatMap(S inputEventStream) { T input = inputEventStream.get(); Iterable iterable = iterableFunction.apply(input); callback.fireCallback(iterable.iterator()); + if (flatMapCompleteSignal != null) { + getContext().getStaticEventProcessor().publishSignal(flatMapCompleteSignal, flatMapCompleteSignal); + } } @OnTrigger diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapIteratorFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapIteratorFlowFunction.java index 1ff1f1675..5c8e6c562 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapIteratorFlowFunction.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/function/FlatMapIteratorFlowFunction.java @@ -1,3 +1,8 @@ +/* + * SPDX-FileCopyrightText: © 2024 Gregory Higgins + * SPDX-License-Identifier: AGPL-3.0-only + */ + package com.fluxtion.runtime.dataflow.function; import com.fluxtion.runtime.EventProcessorBuilderService; @@ -5,12 +10,14 @@ import com.fluxtion.runtime.annotations.OnParentUpdate; import com.fluxtion.runtime.annotations.OnTrigger; import com.fluxtion.runtime.annotations.builder.Inject; -import com.fluxtion.runtime.audit.EventLogNode; import com.fluxtion.runtime.callback.Callback; import com.fluxtion.runtime.callback.DirtyStateMonitor; import com.fluxtion.runtime.dataflow.FlowFunction; import com.fluxtion.runtime.dataflow.TriggeredFlowFunction; +import com.fluxtion.runtime.node.BaseNode; import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction; +import lombok.Getter; +import lombok.Setter; import java.util.Iterator; @@ -21,7 +28,7 @@ * @param Output type * @param Previous {@link FlowFunction} type */ -public class FlatMapIteratorFlowFunction> extends EventLogNode implements TriggeredFlowFunction { +public class FlatMapIteratorFlowFunction> extends BaseNode implements TriggeredFlowFunction { @NoTriggerReference private final S inputEventStream; @@ -33,6 +40,9 @@ public class FlatMapIteratorFlowFunction> extend public Callback callback; @Inject public DirtyStateMonitor dirtyStateMonitor; + @Getter + @Setter + private String flatMapCompleteSignal; public FlatMapIteratorFlowFunction(S inputEventStream, SerializableFunction> iterableFunction) { this.inputEventStream = inputEventStream; @@ -47,6 +57,9 @@ public FlatMapIteratorFlowFunction(S inputEventStream, SerializableFunction + * SPDX-License-Identifier: AGPL-3.0-only + */ + package com.fluxtion.runtime.output; import com.fluxtion.runtime.event.DefaultEvent; @@ -11,9 +16,7 @@ public class SinkRegistration extends DefaultEvent { private Consumer consumer; private IntConsumer intConsumer; - private DoubleConsumer doubleConsumer; - private LongConsumer longConsumer; public static SinkRegistration sink(String filterId, Consumer consumer) {