diff --git a/compiler/pom.xml b/compiler/pom.xml
index 9ed129534..cb44387aa 100644
--- a/compiler/pom.xml
+++ b/compiler/pom.xml
@@ -19,7 +19,7 @@ Copyright (C) 2024 gregory higgins (C) 2024 gregory higgins
com.fluxtion
root-parent-pom
- 9.3.49-SNAPSHOT
+ 9.3.50-SNAPSHOT
../parent-root/pom.xml
diff --git a/compiler/src/test/java/com/fluxtion/compiler/generation/sink/SinkTest.java b/compiler/src/test/java/com/fluxtion/compiler/generation/sink/SinkTest.java
index d633ad086..daf8b16d6 100644
--- a/compiler/src/test/java/com/fluxtion/compiler/generation/sink/SinkTest.java
+++ b/compiler/src/test/java/com/fluxtion/compiler/generation/sink/SinkTest.java
@@ -3,6 +3,7 @@
import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest.SepTestConfig;
import com.fluxtion.compiler.generation.util.MultipleSepTargetInProcessTest;
import com.fluxtion.runtime.annotations.OnEventHandler;
+import com.fluxtion.runtime.output.MessageSink;
import com.fluxtion.runtime.output.SinkPublisher;
import org.hamcrest.Matchers;
import org.junit.Test;
@@ -27,6 +28,15 @@ public void addSinkNode() {
assertThat(myList, Matchers.is(Matchers.contains("hello world")));
}
+ @Test
+ public void addMessageSinkNode() {
+ List myList = new ArrayList<>();
+ sep(c -> c.addNode(new MyNode("sinkA")));
+ sep.registerService((MessageSink) myList::add, MessageSink.class, "sinkA");
+ onEvent("hello world");
+ assertThat(myList, Matchers.is(Matchers.contains("hello world")));
+ }
+
public static class MyNode {
private final SinkPublisher publisher;// = new SinkPublisher("sinkA");
diff --git a/parent-root/pom.xml b/parent-root/pom.xml
index fe78e2bba..9d1f82d17 100644
--- a/parent-root/pom.xml
+++ b/parent-root/pom.xml
@@ -21,7 +21,7 @@
4.0.0
com.fluxtion
root-parent-pom
- 9.3.49-SNAPSHOT
+ 9.3.50-SNAPSHOT
pom
fluxtion :: poms :: parent root
diff --git a/pom.xml b/pom.xml
index 8e80431aa..c3d089d0b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@ along with this program. If not, see
4.0.0
com.fluxtion
fluxtion.master
- 9.3.49-SNAPSHOT
+ 9.3.50-SNAPSHOT
pom
fluxtion
diff --git a/runtime/pom.xml b/runtime/pom.xml
index 42b9244d8..02a603b5c 100644
--- a/runtime/pom.xml
+++ b/runtime/pom.xml
@@ -20,7 +20,7 @@ Copyright (C) 2024 gregory higgins (C) 2024 gregory higgins
com.fluxtion
root-parent-pom
- 9.3.49-SNAPSHOT
+ 9.3.50-SNAPSHOT
../parent-root/pom.xml
diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Mappers.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Mappers.java
index ac6dff55f..81860f658 100644
--- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Mappers.java
+++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Mappers.java
@@ -33,6 +33,11 @@ static T identity(T in) {
return in;
}
+ @SuppressWarnings("all")
+ public static R cast(T in) {
+ return (R) in;
+ }
+
static SerializableToIntFunction count() {
return Aggregates.countFactory().get()::aggregate;
}
diff --git a/runtime/src/main/java/com/fluxtion/runtime/output/MessageSink.java b/runtime/src/main/java/com/fluxtion/runtime/output/MessageSink.java
new file mode 100644
index 000000000..50fb15cc2
--- /dev/null
+++ b/runtime/src/main/java/com/fluxtion/runtime/output/MessageSink.java
@@ -0,0 +1,12 @@
+package com.fluxtion.runtime.output;
+
+import java.util.function.Consumer;
+
+/**
+ * A sink for an EventProcessor. Implement this interface and register with :
+ *
+ * {@link com.fluxtion.runtime.EventProcessor#registerService(Object, Class, String)}
+ * @param the type of message published to the Sink
+ */
+public interface MessageSink extends Consumer {
+}
diff --git a/runtime/src/main/java/com/fluxtion/runtime/output/SinkPublisher.java b/runtime/src/main/java/com/fluxtion/runtime/output/SinkPublisher.java
index 00dfa56b4..bd9417a87 100644
--- a/runtime/src/main/java/com/fluxtion/runtime/output/SinkPublisher.java
+++ b/runtime/src/main/java/com/fluxtion/runtime/output/SinkPublisher.java
@@ -2,6 +2,8 @@
import com.fluxtion.runtime.annotations.OnEventHandler;
import com.fluxtion.runtime.annotations.builder.AssignToField;
+import com.fluxtion.runtime.annotations.runtime.ServiceDeregistered;
+import com.fluxtion.runtime.annotations.runtime.ServiceRegistered;
import com.fluxtion.runtime.node.SingleNamedNode;
import java.util.function.Consumer;
@@ -40,6 +42,22 @@ public void unregisterSink(SinkDeregister sinkDeregister) {
doubleConsumer = null;
}
+ @ServiceRegistered
+ public void messageSinkRegistered(MessageSink messageSink, String name){
+ if(name.equals(getName())){
+ auditLog.info("registeredMessageSink", name);
+ sink = messageSink;
+ }
+ }
+
+ @ServiceDeregistered
+ public void messageSinkDeregistered(MessageSink messageSink, String name){
+ if(name.equals(getName())){
+ auditLog.info("deregisteredMessageSink", name);
+ sink = null;
+ }
+ }
+
public void publish(T publishItem) {
if (sink != null)
sink.accept(publishItem);