Skip to content

Commit

Permalink
release 9.4.5 (#330)
Browse files Browse the repository at this point in the history
* updating poms for 9.4.3-SNAPSHOT development

* updating poms for branch'release/9.4.2' with non-snapshot versions

* updating develop poms to master versions to avoid merge conflicts

* Updating develop poms back to pre merge state

* Feature/reentrant callback bug (#326)

* fixes EventProcessorContext#getStaticEventProcessor for InMemoryEventProcessor, was throwing null pointer

* re-entrant callback fix, pushes rentrant events to the front of queued events

---------

Co-authored-by: greg <[email protected]>

* doc version number for next release

* updating poms for 9.4.3 branch with snapshot versions

* updating poms for 9.4.4-SNAPSHOT development

* updating poms for branch'release/9.4.3' with non-snapshot versions

* updating develop poms to master versions to avoid merge conflicts

* Updating develop poms back to pre merge state

* add a BaseNode class that extracts the current name of the node in an event processor

* updating poms for 9.4.5-SNAPSHOT development

* updating poms for branch'release/9.4.4' with non-snapshot versions

* updating develop poms to master versions to avoid merge conflicts

* Updating develop poms back to pre merge state

* update kafka example

* add a BaseNode class that extracts the current name of the node in an event processor (#329)

---------

Co-authored-by: runner <runner@fv-az585-603.vitsawovrx5upos552vgumbwsa.phxx.internal.cloudapp.net>
Co-authored-by: greg <[email protected]>
Co-authored-by: runner <runner@fv-az740-257.ps3gqx3xffzu3a3pyfxfsa1ajb.bx.internal.cloudapp.net>
Co-authored-by: runner <runner@fv-az885-319.m43iyx0u3v4e1h2pvvlabjukza.cx.internal.cloudapp.net>
  • Loading branch information
5 people authored Dec 27, 2024
1 parent 136885e commit be71f70
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 12 deletions.
2 changes: 1 addition & 1 deletion compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.4.4-SNAPSHOT</version>
<version>9.4.5-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* SPDX-FileCopyrightText: © 2024 Gregory Higgins <[email protected]>
* SPDX-License-Identifier: AGPL-3.0-only
*/

package com.fluxtion.compiler.builder.dataflow;

import com.fluxtion.runtime.EventProcessorBuilderService;
Expand Down Expand Up @@ -128,14 +133,32 @@ public <R> FlowBuilder<R> flatMap(SerializableFunction<T, Iterable<R>> iterableF
return new FlowBuilder<>(new FlatMapFlowFunction<>(eventStream, iterableFunction));
}

public <R> FlowBuilder<R> flatMap(SerializableFunction<T, Iterable<R>> iterableFunction, String flatMapCompleteSignal) {
FlatMapFlowFunction<T, R, TriggeredFlowFunction<T>> flatMapIteratorFlowFunction = new FlatMapFlowFunction<>(eventStream, iterableFunction);
flatMapIteratorFlowFunction.setFlatMapCompleteSignal(flatMapCompleteSignal);
return new FlowBuilder<>(flatMapIteratorFlowFunction);
}

public <R> FlowBuilder<R> flatMapFromIterator(SerializableFunction<T, Iterator<R>> iterableFunction) {
return new FlowBuilder<>(new FlatMapIteratorFlowFunction<>(eventStream, iterableFunction));
}

public <R> FlowBuilder<R> flatMapFromIterator(SerializableFunction<T, Iterator<R>> iterableFunction, String flatMapCompleteSignal) {
FlatMapIteratorFlowFunction<T, R, TriggeredFlowFunction<T>> flatMapIteratorFlowFunction = new FlatMapIteratorFlowFunction<>(eventStream, iterableFunction);
flatMapIteratorFlowFunction.setFlatMapCompleteSignal(flatMapCompleteSignal);
return new FlowBuilder<>(flatMapIteratorFlowFunction);
}

public <R> FlowBuilder<R> flatMapFromArray(SerializableFunction<T, R[]> iterableFunction) {
return new FlowBuilder<>(new FlatMapArrayFlowFunction<>(eventStream, iterableFunction));
}

public <R> FlowBuilder<R> flatMapFromArray(SerializableFunction<T, R[]> iterableFunction, String flatMapCompleteSignal) {
FlatMapArrayFlowFunction<T, R, TriggeredFlowFunction<T>> flatMapIteratorFlowFunction = new FlatMapArrayFlowFunction<>(eventStream, iterableFunction);
flatMapIteratorFlowFunction.setFlatMapCompleteSignal(flatMapCompleteSignal);
return new FlowBuilder<>(flatMapIteratorFlowFunction);
}

public <R, F extends AggregateFlowFunction<T, R, F>> FlowBuilder<R>
aggregate(SerializableSupplier<F> aggregateFunction) {
return new FlowBuilder<>(new AggregateFlowFunctionWrapper<>(eventStream, aggregateFunction));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* SPDX-FileCopyrightText: © 2024 Gregory Higgins <[email protected]>
* SPDX-License-Identifier: AGPL-3.0-only
*/

package com.fluxtion.compiler.builder.dataflow;

import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest.SepTestConfig;
Expand Down Expand Up @@ -200,6 +205,28 @@ public void flatMapTest() {
assertThat(notifyTarget.getStrings(), CoreMatchers.hasItems("one", "2", "THREE"));
}

@Test
public void flatMapWithSignalTest() {
sep(c -> {
subscribe(String.class)
.flatMap(EventStreamBuildTest::csvToIterable, "myfilter")
.push(new NotifyAndPushTarget()::addStringElement);

subscribeToSignal("myfilter")
.mapToInt(Mappers.count()).id("count_strings");
}
);

assertThat(getStreamed("count_strings"), CoreMatchers.is(0));

onEvent("one,2,THREE");
NotifyAndPushTarget notifyTarget = getField("notifyTarget");
assertThat(notifyTarget.getOnEventCount(), is(3));
assertThat(notifyTarget.getStrings(), CoreMatchers.hasItems("one", "2", "THREE"));

assertThat(getStreamed("count_strings"), CoreMatchers.is(1));
}

@Test
public void flatMapFromIteratorTest() {
sep(c -> subscribe(String.class)
Expand All @@ -212,6 +239,28 @@ public void flatMapFromIteratorTest() {
assertThat(notifyTarget.getStrings(), CoreMatchers.hasItems("one", "2", "THREE"));
}

@Test
public void flatMapFromIteratorWithSignalTest() {
sep(c -> {
subscribe(String.class)
.flatMapFromIterator(EventStreamBuildTest::csvToIterator, "myfilter")
.push(new NotifyAndPushTarget()::addStringElement);

subscribeToSignal("myfilter")
.mapToInt(Mappers.count()).id("count_strings");
}
);

assertThat(getStreamed("count_strings"), CoreMatchers.is(0));

onEvent("one,2,THREE");
NotifyAndPushTarget notifyTarget = getField("notifyTarget");
assertThat(notifyTarget.getOnEventCount(), is(3));
assertThat(notifyTarget.getStrings(), CoreMatchers.hasItems("one", "2", "THREE"));

assertThat(getStreamed("count_strings"), CoreMatchers.is(1));
}

@Test
public void flatMapThenMapEachElementTest() {
sep(c -> subscribe(String.class)
Expand Down Expand Up @@ -245,6 +294,25 @@ public void flatMapFromArrayThenMapEachElementTest() {
assertThat(getStreamed("sum"), is(103));
}

@Test
public void flatMapFromArrayWithSignalThenMapEachElementTest() {
sep(c -> {
subscribe(String.class)
.flatMapFromArray(EventStreamBuildTest::csvToStringArray, "myfilter")
.mapToInt(EventStreamBuildTest::parseInt)
.map(Mappers.cumSumInt()).id("sum");

subscribeToSignal("myfilter")
.mapToInt(Mappers.count()).id("count_strings");
}
);
assertThat(getStreamed("count_strings"), CoreMatchers.is(0));

onEvent("15,33,55");
assertThat(getStreamed("sum"), is(103));
assertThat(getStreamed("count_strings"), CoreMatchers.is(1));
}

@Test
public void mapWithInstanceFunctionTest() {
sep(c -> subscribe(Integer.class).map(new Adder()::add).id("cumsum"));
Expand Down
2 changes: 1 addition & 1 deletion parent-root/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.4.4-SNAPSHOT</version>
<version>9.4.5-SNAPSHOT</version>
<packaging>pom</packaging>
<name>fluxtion :: poms :: parent root</name>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ along with this program. If not, see
<modelVersion>4.0.0</modelVersion>
<groupId>com.fluxtion</groupId>
<artifactId>fluxtion.master</artifactId>
<version>9.4.4-SNAPSHOT</version>
<version>9.4.5-SNAPSHOT</version>
<packaging>pom</packaging>
<name>fluxtion</name>

Expand Down
2 changes: 1 addition & 1 deletion runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Copyright (C) 2024 gregory higgins (C) 2024 gregory higgins
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.4.4-SNAPSHOT</version>
<version>9.4.5-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
/*
* SPDX-FileCopyrightText: © 2024 Gregory Higgins <[email protected]>
* SPDX-License-Identifier: AGPL-3.0-only
*/

package com.fluxtion.runtime.dataflow.function;

import com.fluxtion.runtime.EventProcessorBuilderService;
import com.fluxtion.runtime.annotations.NoTriggerReference;
import com.fluxtion.runtime.annotations.OnParentUpdate;
import com.fluxtion.runtime.annotations.OnTrigger;
import com.fluxtion.runtime.annotations.builder.Inject;
import com.fluxtion.runtime.audit.EventLogNode;
import com.fluxtion.runtime.callback.Callback;
import com.fluxtion.runtime.callback.DirtyStateMonitor;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.node.BaseNode;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import lombok.Getter;
import lombok.Setter;

import java.util.Arrays;

Expand All @@ -21,7 +28,7 @@
* @param <R> Output type
* @param <S> Previous {@link FlowFunction} type
*/
public class FlatMapArrayFlowFunction<T, R, S extends FlowFunction<T>> extends EventLogNode implements TriggeredFlowFunction<R> {
public class FlatMapArrayFlowFunction<T, R, S extends FlowFunction<T>> extends BaseNode implements TriggeredFlowFunction<R> {

@NoTriggerReference
private final S inputEventStream;
Expand All @@ -33,6 +40,9 @@ public class FlatMapArrayFlowFunction<T, R, S extends FlowFunction<T>> extends E
private transient R value;
@Inject
public Callback<R> callback;
@Getter
@Setter
private String flatMapCompleteSignal;

public FlatMapArrayFlowFunction(S inputEventStream, SerializableFunction<T, R[]> iterableFunction) {
this.inputEventStream = inputEventStream;
Expand All @@ -49,6 +59,9 @@ public void inputUpdatedAndFlatMap(S inputEventStream) {
T input = inputEventStream.get();
Iterable<R> iterable = Arrays.asList(iterableFunction.apply(input));
callback.fireCallback(iterable.iterator());
if (flatMapCompleteSignal != null) {
getContext().getStaticEventProcessor().publishSignal(flatMapCompleteSignal, flatMapCompleteSignal);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
/*
* SPDX-FileCopyrightText: © 2024 Gregory Higgins <[email protected]>
* SPDX-License-Identifier: AGPL-3.0-only
*/

package com.fluxtion.runtime.dataflow.function;

import com.fluxtion.runtime.EventProcessorBuilderService;
import com.fluxtion.runtime.annotations.NoTriggerReference;
import com.fluxtion.runtime.annotations.OnParentUpdate;
import com.fluxtion.runtime.annotations.OnTrigger;
import com.fluxtion.runtime.annotations.builder.Inject;
import com.fluxtion.runtime.audit.EventLogNode;
import com.fluxtion.runtime.callback.Callback;
import com.fluxtion.runtime.callback.DirtyStateMonitor;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.node.BaseNode;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import lombok.Getter;
import lombok.Setter;

/**
* Flatmap stream node
Expand All @@ -19,7 +26,7 @@
* @param <R> Output type
* @param <S> Previous {@link FlowFunction} type
*/
public class FlatMapFlowFunction<T, R, S extends FlowFunction<T>> extends EventLogNode implements TriggeredFlowFunction<R> {
public class FlatMapFlowFunction<T, R, S extends FlowFunction<T>> extends BaseNode implements TriggeredFlowFunction<R> {

@NoTriggerReference
private final S inputEventStream;
Expand All @@ -31,6 +38,9 @@ public class FlatMapFlowFunction<T, R, S extends FlowFunction<T>> extends EventL
public Callback<R> callback;
@Inject
public DirtyStateMonitor dirtyStateMonitor;
@Getter
@Setter
private String flatMapCompleteSignal;

public FlatMapFlowFunction(S inputEventStream, SerializableFunction<T, Iterable<R>> iterableFunction) {
this.inputEventStream = inputEventStream;
Expand All @@ -47,6 +57,9 @@ public void inputUpdatedAndFlatMap(S inputEventStream) {
T input = inputEventStream.get();
Iterable<R> iterable = iterableFunction.apply(input);
callback.fireCallback(iterable.iterator());
if (flatMapCompleteSignal != null) {
getContext().getStaticEventProcessor().publishSignal(flatMapCompleteSignal, flatMapCompleteSignal);
}
}

@OnTrigger
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
/*
* SPDX-FileCopyrightText: © 2024 Gregory Higgins <[email protected]>
* SPDX-License-Identifier: AGPL-3.0-only
*/

package com.fluxtion.runtime.dataflow.function;

import com.fluxtion.runtime.EventProcessorBuilderService;
import com.fluxtion.runtime.annotations.NoTriggerReference;
import com.fluxtion.runtime.annotations.OnParentUpdate;
import com.fluxtion.runtime.annotations.OnTrigger;
import com.fluxtion.runtime.annotations.builder.Inject;
import com.fluxtion.runtime.audit.EventLogNode;
import com.fluxtion.runtime.callback.Callback;
import com.fluxtion.runtime.callback.DirtyStateMonitor;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.node.BaseNode;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import lombok.Getter;
import lombok.Setter;

import java.util.Iterator;

Expand All @@ -21,7 +28,7 @@
* @param <R> Output type
* @param <S> Previous {@link FlowFunction} type
*/
public class FlatMapIteratorFlowFunction<T, R, S extends FlowFunction<T>> extends EventLogNode implements TriggeredFlowFunction<R> {
public class FlatMapIteratorFlowFunction<T, R, S extends FlowFunction<T>> extends BaseNode implements TriggeredFlowFunction<R> {

@NoTriggerReference
private final S inputEventStream;
Expand All @@ -33,6 +40,9 @@ public class FlatMapIteratorFlowFunction<T, R, S extends FlowFunction<T>> extend
public Callback<R> callback;
@Inject
public DirtyStateMonitor dirtyStateMonitor;
@Getter
@Setter
private String flatMapCompleteSignal;

public FlatMapIteratorFlowFunction(S inputEventStream, SerializableFunction<T, Iterator<R>> iterableFunction) {
this.inputEventStream = inputEventStream;
Expand All @@ -47,6 +57,9 @@ public FlatMapIteratorFlowFunction(S inputEventStream, SerializableFunction<T, I
@OnParentUpdate("inputEventStream")
public void inputUpdatedAndFlatMap(S inputEventStream) {
callback.fireCallback(iterableFunction.apply(inputEventStream.get()));
if (flatMapCompleteSignal != null) {
getContext().getStaticEventProcessor().publishSignal(flatMapCompleteSignal, flatMapCompleteSignal);
}
}

@OnTrigger
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* SPDX-FileCopyrightText: © 2024 Gregory Higgins <[email protected]>
* SPDX-License-Identifier: AGPL-3.0-only
*/

package com.fluxtion.runtime.output;

import com.fluxtion.runtime.event.DefaultEvent;
Expand All @@ -11,9 +16,7 @@ public class SinkRegistration<T> extends DefaultEvent {

private Consumer<T> consumer;
private IntConsumer intConsumer;

private DoubleConsumer doubleConsumer;

private LongConsumer longConsumer;

public static <S> SinkRegistration<S> sink(String filterId, Consumer<S> consumer) {
Expand Down

0 comments on commit be71f70

Please sign in to comment.