From 3804d04a02b2ebcb0a27098eb9d1094496f4445f Mon Sep 17 00:00:00 2001 From: greg higgins Date: Wed, 27 Nov 2024 21:46:40 +0000 Subject: [PATCH] release 9.3.50 (#323) * updating poms for 9.3.50-SNAPSHOT development * updating poms for branch'release/9.3.49' with non-snapshot versions * updating develop poms to master versions to avoid merge conflicts * Updating develop poms back to pre merge state * Adds cast function to Mappers * Adds the ability to register a sink as a service by implementing the MessageSink interface --------- Co-authored-by: runner Co-authored-by: greg h --- compiler/pom.xml | 2 +- .../compiler/generation/sink/SinkTest.java | 10 ++++++++++ parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- .../runtime/dataflow/helpers/Mappers.java | 5 +++++ .../fluxtion/runtime/output/MessageSink.java | 12 ++++++++++++ .../fluxtion/runtime/output/SinkPublisher.java | 18 ++++++++++++++++++ 8 files changed, 49 insertions(+), 4 deletions(-) create mode 100644 runtime/src/main/java/com/fluxtion/runtime/output/MessageSink.java diff --git a/compiler/pom.xml b/compiler/pom.xml index 9ed129534..cb44387aa 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2024 gregory higgins (C) 2024 gregory higgins com.fluxtion root-parent-pom - 9.3.49-SNAPSHOT + 9.3.50-SNAPSHOT ../parent-root/pom.xml diff --git a/compiler/src/test/java/com/fluxtion/compiler/generation/sink/SinkTest.java b/compiler/src/test/java/com/fluxtion/compiler/generation/sink/SinkTest.java index d633ad086..daf8b16d6 100644 --- a/compiler/src/test/java/com/fluxtion/compiler/generation/sink/SinkTest.java +++ b/compiler/src/test/java/com/fluxtion/compiler/generation/sink/SinkTest.java @@ -3,6 +3,7 @@ import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest.SepTestConfig; import com.fluxtion.compiler.generation.util.MultipleSepTargetInProcessTest; import com.fluxtion.runtime.annotations.OnEventHandler; +import com.fluxtion.runtime.output.MessageSink; import com.fluxtion.runtime.output.SinkPublisher; import org.hamcrest.Matchers; import org.junit.Test; @@ -27,6 +28,15 @@ public void addSinkNode() { assertThat(myList, Matchers.is(Matchers.contains("hello world"))); } + @Test + public void addMessageSinkNode() { + List myList = new ArrayList<>(); + sep(c -> c.addNode(new MyNode("sinkA"))); + sep.registerService((MessageSink) myList::add, MessageSink.class, "sinkA"); + onEvent("hello world"); + assertThat(myList, Matchers.is(Matchers.contains("hello world"))); + } + public static class MyNode { private final SinkPublisher publisher;// = new SinkPublisher("sinkA"); diff --git a/parent-root/pom.xml b/parent-root/pom.xml index fe78e2bba..9d1f82d17 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.3.49-SNAPSHOT + 9.3.50-SNAPSHOT pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index 8e80431aa..c3d089d0b 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.3.49-SNAPSHOT + 9.3.50-SNAPSHOT pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index 42b9244d8..02a603b5c 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2024 gregory higgins (C) 2024 gregory higgins com.fluxtion root-parent-pom - 9.3.49-SNAPSHOT + 9.3.50-SNAPSHOT ../parent-root/pom.xml diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Mappers.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Mappers.java index ac6dff55f..81860f658 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Mappers.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Mappers.java @@ -33,6 +33,11 @@ static T identity(T in) { return in; } + @SuppressWarnings("all") + public static R cast(T in) { + return (R) in; + } + static SerializableToIntFunction count() { return Aggregates.countFactory().get()::aggregate; } diff --git a/runtime/src/main/java/com/fluxtion/runtime/output/MessageSink.java b/runtime/src/main/java/com/fluxtion/runtime/output/MessageSink.java new file mode 100644 index 000000000..50fb15cc2 --- /dev/null +++ b/runtime/src/main/java/com/fluxtion/runtime/output/MessageSink.java @@ -0,0 +1,12 @@ +package com.fluxtion.runtime.output; + +import java.util.function.Consumer; + +/** + * A sink for an EventProcessor. Implement this interface and register with : + * + * {@link com.fluxtion.runtime.EventProcessor#registerService(Object, Class, String)} + * @param the type of message published to the Sink + */ +public interface MessageSink extends Consumer { +} diff --git a/runtime/src/main/java/com/fluxtion/runtime/output/SinkPublisher.java b/runtime/src/main/java/com/fluxtion/runtime/output/SinkPublisher.java index 00dfa56b4..bd9417a87 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/output/SinkPublisher.java +++ b/runtime/src/main/java/com/fluxtion/runtime/output/SinkPublisher.java @@ -2,6 +2,8 @@ import com.fluxtion.runtime.annotations.OnEventHandler; import com.fluxtion.runtime.annotations.builder.AssignToField; +import com.fluxtion.runtime.annotations.runtime.ServiceDeregistered; +import com.fluxtion.runtime.annotations.runtime.ServiceRegistered; import com.fluxtion.runtime.node.SingleNamedNode; import java.util.function.Consumer; @@ -40,6 +42,22 @@ public void unregisterSink(SinkDeregister sinkDeregister) { doubleConsumer = null; } + @ServiceRegistered + public void messageSinkRegistered(MessageSink messageSink, String name){ + if(name.equals(getName())){ + auditLog.info("registeredMessageSink", name); + sink = messageSink; + } + } + + @ServiceDeregistered + public void messageSinkDeregistered(MessageSink messageSink, String name){ + if(name.equals(getName())){ + auditLog.info("deregisteredMessageSink", name); + sink = null; + } + } + public void publish(T publishItem) { if (sink != null) sink.accept(publishItem);