Skip to content

Commit

Permalink
release 9.3.50 (#323)
Browse files Browse the repository at this point in the history
* updating poms for 9.3.50-SNAPSHOT development

* updating poms for branch'release/9.3.49' with non-snapshot versions

* updating develop poms to master versions to avoid merge conflicts

* Updating develop poms back to pre merge state

* Adds cast function to Mappers

* Adds the ability to register a sink as a service by implementing the MessageSink interface

---------

Co-authored-by: runner <runner@fv-az1532-127.r45ftdqj0iiudj1ovfs3chqk3e.ex.internal.cloudapp.net>
Co-authored-by: greg h <[email protected]>
  • Loading branch information
3 people authored Nov 27, 2024
1 parent 8974923 commit 3804d04
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 4 deletions.
2 changes: 1 addition & 1 deletion compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Copyright (C) 2024 gregory higgins (C) 2024 gregory higgins
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.3.49-SNAPSHOT</version>
<version>9.3.50-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest.SepTestConfig;
import com.fluxtion.compiler.generation.util.MultipleSepTargetInProcessTest;
import com.fluxtion.runtime.annotations.OnEventHandler;
import com.fluxtion.runtime.output.MessageSink;
import com.fluxtion.runtime.output.SinkPublisher;
import org.hamcrest.Matchers;
import org.junit.Test;
Expand All @@ -27,6 +28,15 @@ public void addSinkNode() {
assertThat(myList, Matchers.is(Matchers.contains("hello world")));
}

@Test
public void addMessageSinkNode() {
List<String> myList = new ArrayList<>();
sep(c -> c.addNode(new MyNode("sinkA")));
sep.registerService((MessageSink<String>) myList::add, MessageSink.class, "sinkA");
onEvent("hello world");
assertThat(myList, Matchers.is(Matchers.contains("hello world")));
}

public static class MyNode {

private final SinkPublisher<String> publisher;// = new SinkPublisher("sinkA");
Expand Down
2 changes: 1 addition & 1 deletion parent-root/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.3.49-SNAPSHOT</version>
<version>9.3.50-SNAPSHOT</version>
<packaging>pom</packaging>
<name>fluxtion :: poms :: parent root</name>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ along with this program. If not, see
<modelVersion>4.0.0</modelVersion>
<groupId>com.fluxtion</groupId>
<artifactId>fluxtion.master</artifactId>
<version>9.3.49-SNAPSHOT</version>
<version>9.3.50-SNAPSHOT</version>
<packaging>pom</packaging>
<name>fluxtion</name>

Expand Down
2 changes: 1 addition & 1 deletion runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Copyright (C) 2024 gregory higgins (C) 2024 gregory higgins
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.3.49-SNAPSHOT</version>
<version>9.3.50-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ static <T> T identity(T in) {
return in;
}

@SuppressWarnings("all")
public static <T, R> R cast(T in) {
return (R) in;
}

static <T> SerializableToIntFunction<T> count() {
return Aggregates.countFactory().get()::aggregate;
}
Expand Down
12 changes: 12 additions & 0 deletions runtime/src/main/java/com/fluxtion/runtime/output/MessageSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.fluxtion.runtime.output;

import java.util.function.Consumer;

/**
* A sink for an EventProcessor. Implement this interface and register with :
*
* {@link com.fluxtion.runtime.EventProcessor#registerService(Object, Class, String)}
* @param <T> the type of message published to the Sink
*/
public interface MessageSink<T> extends Consumer<T> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.fluxtion.runtime.annotations.OnEventHandler;
import com.fluxtion.runtime.annotations.builder.AssignToField;
import com.fluxtion.runtime.annotations.runtime.ServiceDeregistered;
import com.fluxtion.runtime.annotations.runtime.ServiceRegistered;
import com.fluxtion.runtime.node.SingleNamedNode;

import java.util.function.Consumer;
Expand Down Expand Up @@ -40,6 +42,22 @@ public void unregisterSink(SinkDeregister sinkDeregister) {
doubleConsumer = null;
}

@ServiceRegistered
public void messageSinkRegistered(MessageSink<T> messageSink, String name){
if(name.equals(getName())){
auditLog.info("registeredMessageSink", name);
sink = messageSink;
}
}

@ServiceDeregistered
public void messageSinkDeregistered(MessageSink<T> messageSink, String name){
if(name.equals(getName())){
auditLog.info("deregisteredMessageSink", name);
sink = null;
}
}

public void publish(T publishItem) {
if (sink != null)
sink.accept(publishItem);
Expand Down

0 comments on commit 3804d04

Please sign in to comment.