diff --git a/compiler/pom.xml b/compiler/pom.xml
index 074ac330a..3ae00c815 100644
--- a/compiler/pom.xml
+++ b/compiler/pom.xml
@@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd.
com.fluxtion
root-parent-pom
- 9.1.11-SNAPSHOT
+ 9.1.12-SNAPSHOT
../parent-root/pom.xml
diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java
index c4598bd95..6989be7b5 100644
--- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java
+++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java
@@ -9,16 +9,9 @@
import com.fluxtion.runtime.dataflow.aggregate.function.TimedSlidingWindow;
import com.fluxtion.runtime.dataflow.aggregate.function.TumblingWindow;
import com.fluxtion.runtime.dataflow.function.BinaryMapFlowFunction.BinaryMapToRefFlowFunction;
-import com.fluxtion.runtime.dataflow.function.FlatMapArrayFlowFunction;
-import com.fluxtion.runtime.dataflow.function.FlatMapFlowFunction;
-import com.fluxtion.runtime.dataflow.function.LookupFlowFunction;
-import com.fluxtion.runtime.dataflow.function.MapFlowFunction;
+import com.fluxtion.runtime.dataflow.function.*;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
-import com.fluxtion.runtime.dataflow.function.MergeFlowFunction;
-import com.fluxtion.runtime.dataflow.groupby.GroupBy;
-import com.fluxtion.runtime.dataflow.groupby.GroupByFlowFunctionWrapper;
-import com.fluxtion.runtime.dataflow.groupby.GroupByTimedSlidingWindow;
-import com.fluxtion.runtime.dataflow.groupby.GroupByTumblingWindow;
+import com.fluxtion.runtime.dataflow.groupby.*;
import com.fluxtion.runtime.dataflow.helpers.Aggregates;
import com.fluxtion.runtime.dataflow.helpers.Collectors;
import com.fluxtion.runtime.dataflow.helpers.DefaultValue;
@@ -80,6 +73,30 @@ public FlowBuilder map(SerializableFunction mapFunction) {
return super.mapBase(mapFunction);
}
+ public FlowBuilder> mapToSet() {
+ return map(Collectors.toSet());
+ }
+
+ public FlowBuilder> mapToSet(SerializableFunction mapFunction) {
+ return map(mapFunction).map(Collectors.toSet());
+ }
+
+ public FlowBuilder> mapToList() {
+ return map(Collectors.toList());
+ }
+
+ public FlowBuilder> mapToList(SerializableFunction mapFunction) {
+ return map(mapFunction).map(Collectors.toList());
+ }
+
+ public FlowBuilder> mapToList(int maxElements) {
+ return map(Collectors.toList(maxElements));
+ }
+
+ public FlowBuilder> mapToList(SerializableFunction mapFunction, int maxElements) {
+ return map(mapFunction).map(Collectors.toList(maxElements));
+ }
+
public FlowBuilder mapBiFunction(SerializableBiFunction int2IntFunction,
FlowBuilder stream2Builder) {
@@ -130,6 +147,19 @@ public FlowBuilder flatMapFromArray(SerializableFunction iterable
new TimedSlidingWindow<>(eventStream, aggregateFunction, bucketSizeMillis, bucketsPerWindow));
}
+ /**
+ * Aggregates a flow using a key function to group by and an aggregating function to process new values for a keyed
+ * bucket.
+ *
+ * @param keyFunction The key function that groups and buckets incoming values
+ * @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function
+ * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
+ * @param Value type extracted from the incoming data flow
+ * @param The type of the key used to group values
+ * @param The return type of the aggregating function
+ * @param The aggregating function type
+ * @return A GroupByFlowBuilder for the aggregated flow
+ */
public > GroupByFlowBuilder
groupBy(SerializableFunction keyFunction,
SerializableFunction valueFunction,
@@ -139,33 +169,147 @@ public FlowBuilder flatMapFromArray(SerializableFunction iterable
return new GroupByFlowBuilder<>(x);
}
+ /**
+ * Specialisation of groupBy where the value is the identity of the incoming data flow
+ *
+ * @param keyFunction The key function that groups and buckets incoming values
+ * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
+ * @param The type of the key used to group values
+ * @param The return type of the aggregating function
+ * @param The aggregating function type
+ * @return A GroupByFlowBuilder for the aggregated flow
+ * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
+ */
public > GroupByFlowBuilder
groupBy(SerializableFunction keyFunction, SerializableSupplier aggregateFunctionSupplier) {
return groupBy(keyFunction, Mappers::identity, aggregateFunctionSupplier);
}
+ /**
+ * Specialisation of groupBy where the output of the groupBy is the last value received for a bucket. The value is
+ * extracted using the value function
+ *
+ * @param keyFunction The key function that groups and buckets incoming values
+ * @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function
+ * @param Value type extracted from the incoming data flow
+ * @param The type of the key used to group values
+ * @return A GroupByFlowBuilder for the aggregated flow
+ * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
+ */
public GroupByFlowBuilder groupBy(
SerializableFunction keyFunction,
SerializableFunction valueFunction) {
return groupBy(keyFunction, valueFunction, Aggregates.identityFactory());
}
+ /**
+ * Specialisation of groupBy where the output of the groupBy is the last value received for a bucket, where
+ * the value is the identity of the incoming data flow
+ *
+ * @param keyFunction The key function that groups and buckets incoming values
+ * @param The type of the key used to group values
+ * @return A GroupByFlowBuilder for the aggregated flow
+ */
public GroupByFlowBuilder groupBy(SerializableFunction keyFunction) {
return groupBy(keyFunction, Mappers::identity);
}
+ /**
+ * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the value.
+ * The value is the last value supplied
+ *
+ * @param keyFunction key accessor
+ * @param keyFunctions multi arg key accessors
+ * @return GroupByFlowBuilder keyed on properties
+ */
+ @SafeVarargs
+ public final GroupByFlowBuilder, T> groupByFields(
+ SerializableFunction keyFunction,
+ SerializableFunction... keyFunctions) {
+ return groupBy(GroupByKey.build(keyFunction, keyFunctions));
+ }
+
+ /**
+ * Aggregates a flow using a key to group by and an aggregating function to process new values for a keyed
+ * bucket. The key is a compound key created by a set of method reference accessors to for the value.
+ *
+ * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
+ * @param keyFunction key accessor
+ * @param keyFunctions multi arg key accessors
+ * @param The return type of the aggregating function
+ * @param The aggregating function type
+ * @return A GroupByFlowBuilder for the aggregated flow
+ * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
+ */
+ @SafeVarargs
+ public final > GroupByFlowBuilder, A> groupByFieldsAggregate(
+ SerializableSupplier aggregateFunctionSupplier,
+ SerializableFunction keyFunction,
+ SerializableFunction... keyFunctions) {
+ return groupBy(GroupByKey.build(keyFunction, keyFunctions), aggregateFunctionSupplier);
+ }
+
+ /**
+ * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key
+ * The value is extracted from the input using the value function
+ *
+ * @param valueFunction the value that will be stored in the groupBy
+ * @param keyFunction key accessor
+ * @param keyFunctions multi arg key accessors
+ * @return GroupByFlowBuilder keyed on properties
+ */
+ @SafeVarargs
+ public final GroupByFlowBuilder, V> groupByFieldsAndGet(
+ SerializableFunction valueFunction,
+ SerializableFunction keyFunction,
+ SerializableFunction... keyFunctions) {
+ return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction);
+ }
+
+ /**
+ * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key
+ * The value is extracted from the input using the value function and is used as an input to the aggregating function
+ *
+ * @param valueFunction the value that will be stored in the groupBy
+ * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
+ * @param keyFunction key accessor
+ * @param keyFunctions multi arg key accessors
+ * @param Value type extracted from the incoming data flow
+ * @param The return type of the aggregating function
+ * @param The aggregating function type
+ * @return A GroupByFlowBuilder for the aggregated flow
+ * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
+ */
+ @SafeVarargs
+ public final > GroupByFlowBuilder, A> groupByFieldsGetAndAggregate(
+ SerializableFunction valueFunction,
+ SerializableSupplier aggregateFunctionSupplier,
+ SerializableFunction keyFunction,
+ SerializableFunction... keyFunctions) {
+ return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction, aggregateFunctionSupplier);
+ }
+
public GroupByFlowBuilder> groupByToList(SerializableFunction keyFunction) {
- return groupBy(keyFunction, Mappers::identity, Collectors.toList());
+ return groupBy(keyFunction, Mappers::identity, Collectors.listFactory());
+ }
+
+ public GroupByFlowBuilder> groupByToList(
+ SerializableFunction keyFunction, SerializableFunction valueFunction) {
+ return groupBy(keyFunction, valueFunction, Collectors.listFactory());
}
public GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction) {
- return groupBy(keyFunction, Mappers::identity, Collectors.toSet());
+ return groupBy(keyFunction, Mappers::identity, Collectors.setFactory());
+ }
+
+ public GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction, SerializableFunction valueFunction) {
+ return groupBy(keyFunction, valueFunction, Collectors.setFactory());
}
public GroupByFlowBuilder> groupByToList(
SerializableFunction keyFunction,
int maxElementsInList) {
- return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList));
+ return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList));
}
public > GroupByFlowBuilder
diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java
index 4b996ef51..382c552e9 100644
--- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java
+++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java
@@ -129,12 +129,12 @@ public FlowBuilderBase merge(FlowBuilderBase extends T> streamToMerge) {
public GroupByFlowBuilder>
groupByAsList(SerializableFunction keyFunction) {
- return groupBy(keyFunction, Mappers::identity, Collectors.toList());
+ return groupBy(keyFunction, Mappers::identity, Collectors.listFactory());
}
public GroupByFlowBuilder>
groupByAsList(SerializableFunction keyFunction, int maxElementsInList) {
- return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList));
+ return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList));
}
public > GroupByFlowBuilder
diff --git a/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java b/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java
index 71a7a03dd..f6f635ea7 100644
--- a/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java
+++ b/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java
@@ -4,6 +4,7 @@
import com.fluxtion.compiler.generation.GenerationContext;
import com.fluxtion.compiler.generation.model.Field;
import com.fluxtion.compiler.generation.util.ClassUtils;
+import com.fluxtion.runtime.dataflow.groupby.GroupByKey;
import com.fluxtion.runtime.dataflow.helpers.GroupingFactory;
import org.jetbrains.annotations.NotNull;
@@ -184,6 +185,10 @@ public String buildTypeDeclaration(Field field, Function, String> class
String genericDeclaration = "<" + inputClass + ", " + returnType + ", ?, ?>";
return genericDeclaration;
}
+ if (instance instanceof GroupByKey) {
+ GroupByKey groupByKey = (GroupByKey) instance;
+ return "<" + classNameConverter.apply(groupByKey.getValueClass()) + ">";
+ }
return "";
}
diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java
index 2559511c0..f0f3d6f80 100644
--- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java
+++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java
@@ -27,13 +27,7 @@
import org.junit.Test;
import java.util.AbstractMap.SimpleEntry;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
import java.util.concurrent.atomic.LongAdder;
import static com.fluxtion.compiler.builder.dataflow.DataFlow.*;
@@ -494,7 +488,7 @@ public void aggregateTest() {
@Test
public void aggregateToLIstTest() {
sep(c -> subscribe(String.class)
- .aggregate(Collectors.toList(4))
+ .aggregate(Collectors.listFactory(4))
.id("myList"));
onEvent("A");
@@ -546,6 +540,111 @@ public void tumblingMap() {
assertThat(getStreamed("sum"), is(0));
}
+ @Test
+ public void testMapToSet() {
+ sep(c -> DataFlow.subscribe(String.class).mapToSet().id("set"));
+ HashSet set = new HashSet<>();
+ set.add("test");
+ onEvent("test");
+ onEvent("test");
+ assertThat(getStreamed("set"), is(set));
+ onEvent("test2");
+ set.add("test2");
+ assertThat(getStreamed("set"), is(set));
+ onEvent("test");
+ assertThat(getStreamed("set"), is(set));
+ }
+
+ @Test
+ public void testMapToSetFromProperty() {
+ sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToSet(GroupByTest.Data::getName).id("set"));
+ HashSet set = new HashSet<>();
+ set.add("test");
+ onEvent(new GroupByTest.Data("test", 22));
+ onEvent(new GroupByTest.Data("test", 31));
+ assertThat(getStreamed("set"), is(set));
+ onEvent(new GroupByTest.Data("test2", 2334));
+ set.add("test2");
+ assertThat(getStreamed("set"), is(set));
+ onEvent(new GroupByTest.Data("test", 31));
+ assertThat(getStreamed("set"), is(set));
+ }
+
+ @Test
+ public void testMapToList() {
+ sep(c -> DataFlow.subscribe(String.class).mapToList().id("list"));
+ List list = new ArrayList<>();
+ list.add("test");
+ list.add("test");
+ onEvent("test");
+ onEvent("test");
+ assertThat(getStreamed("list"), is(list));
+ onEvent("test2");
+ list.add("test2");
+ assertThat(getStreamed("list"), is(list));
+ onEvent("test");
+ list.add("test");
+ assertThat(getStreamed("list"), is(list));
+ }
+
+ @Test
+ public void testMapToList_MaxElements() {
+ sep(c -> DataFlow.subscribe(String.class).mapToList(2).id("list"));
+ List list = new ArrayList<>();
+ list.add("test");
+ list.add("test");
+ onEvent("test");
+ onEvent("test");
+ assertThat(getStreamed("list"), is(list));
+ //deleting
+ onEvent("test2");
+ list.add("test2");
+ list.remove(0);
+ assertThat(getStreamed("list"), is(list));
+ //deleting
+ onEvent("test");
+ list.add("test");
+ list.remove(0);
+ assertThat(getStreamed("list"), is(list));
+ }
+
+ @Test
+ public void testMapToListFromProperty() {
+ sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToList(GroupByTest.Data::getName).id("list"));
+ List list = new ArrayList<>();
+ list.add("test");
+ list.add("test");
+ onEvent(new GroupByTest.Data("test", 22));
+ onEvent(new GroupByTest.Data("test", 31));
+ assertThat(getStreamed("list"), is(list));
+ onEvent(new GroupByTest.Data("test2", 2334));
+ list.add("test2");
+ assertThat(getStreamed("list"), is(list));
+ onEvent(new GroupByTest.Data("test3", 3451));
+ list.add("test3");
+ assertThat(getStreamed("list"), is(list));
+ }
+
+ @Test
+ public void testMapToListFromProperty_MaxElements() {
+ sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToList(GroupByTest.Data::getName, 2).id("list"));
+ List list = new ArrayList<>();
+ list.add("test1");
+ list.add("test2");
+ onEvent(new GroupByTest.Data("test1", 22));
+ onEvent(new GroupByTest.Data("test2", 31));
+ assertThat(getStreamed("list"), is(list));
+ //deleting
+ onEvent(new GroupByTest.Data("test3", 2334));
+ list.add("test3");
+ list.remove(0);
+ assertThat(getStreamed("list"), is(list));
+ //deleting
+ onEvent(new GroupByTest.Data("tes4", 3451));
+ list.add("tes4");
+ list.remove(0);
+ assertThat(getStreamed("list"), is(list));
+ }
@Value
public static class Person {
@@ -554,12 +653,10 @@ public static class Person {
String gender;
}
-
public static int doubleInt(int value) {
return value * 2;
}
-
@Value
public static class MergedType {
int value;
diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java
index 936831e2a..ff64cc25b 100644
--- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java
+++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java
@@ -5,13 +5,16 @@
import com.fluxtion.compiler.builder.dataflow.EventStreamBuildTest.MyIntFilter;
import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest.SepTestConfig;
import com.fluxtion.compiler.generation.util.MultipleSepTargetInProcessTest;
+import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.function.primitive.DoubleSumFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.function.primitive.IntSumFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupBy.KeyValue;
+import com.fluxtion.runtime.dataflow.groupby.GroupByKey;
import com.fluxtion.runtime.dataflow.helpers.Aggregates;
import com.fluxtion.runtime.dataflow.helpers.Mappers;
import com.fluxtion.runtime.dataflow.helpers.Tuples;
+import lombok.Getter;
import lombok.Value;
import lombok.val;
import org.hamcrest.CoreMatchers;
@@ -605,6 +608,163 @@ public void maintainModel() {
onEvent(new MyEvent(SubSystem.REFERENCE, Change_type.DELETE, "greg-1"));
}
+ @Value
+ public static class Data3 {
+ String name;
+ int value;
+ int x;
+
+
+ }
+
+ @Getter
+ public static class Data3Aggregate implements AggregateFlowFunction {
+ int value;
+
+ @Override
+ public Integer reset() {
+ return value;
+ }
+
+ @Override
+ public Integer get() {
+ return value;
+ }
+
+ @Override
+ public Integer aggregate(Data3 input) {
+ value += input.getX();
+ return get();
+ }
+ }
+
+ @Test
+ public void groupingKey() {
+ Map, Data3> expected = new HashMap<>();
+ sep(c -> {
+ subscribe(Data3.class)
+ .groupByFields(Data3::getName, Data3::getValue)
+ .map(GroupBy::toMap)
+ .id("results");
+ });
+ val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply();
+
+ onEvent(new Data3("A", 10, 1));
+ expected.put(keyFactory.apply(new Data3("A", 10, 1)), new Data3("A", 10, 1));
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data2 = new Data3("A", 10, 2);
+ onEvent(data2);
+ expected.put(keyFactory.apply(data2), data2);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data3 = new Data3("A", 10, 3);
+ onEvent(data3);
+ expected.put(keyFactory.apply(data3), data3);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data4 = new Data3("A", 15, 111);
+ onEvent(data4);
+ expected.put(keyFactory.apply(data4), data4);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 dataB1 = new Data3("B", 10, 1);
+ onEvent(dataB1);
+ expected.put(keyFactory.apply(dataB1), dataB1);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 dataB2 = new Data3("B", 10, 99);
+ onEvent(dataB2);
+ expected.put(keyFactory.apply(dataB2), dataB2);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+ }
+
+ @Test
+ public void aggregateCompoundField() {
+ Map, Integer> expected = new HashMap<>();
+ sep(c -> {
+ subscribe(Data3.class)
+ .groupByFieldsAggregate(Data3Aggregate::new, Data3::getName, Data3::getValue)
+ .map(GroupBy::toMap)
+ .id("results");
+ });
+ val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply();
+
+ onEvent(new Data3("A", 10, 1));
+ expected.put(keyFactory.apply(new Data3("A", 10, 1)), 1);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data2 = new Data3("A", 10, 2);
+ onEvent(data2);
+ expected.put(keyFactory.apply(data2), 3);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data3 = new Data3("A", 10, 3);
+ onEvent(data3);
+ expected.put(keyFactory.apply(data3), 6);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data4 = new Data3("A", 15, 111);
+ onEvent(data4);
+ expected.put(keyFactory.apply(data4), 111);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 dataB1 = new Data3("B", 10, 1);
+ onEvent(dataB1);
+ expected.put(keyFactory.apply(dataB1), 1);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 dataB2 = new Data3("B", 10, 99);
+ onEvent(dataB2);
+ expected.put(keyFactory.apply(dataB2), 100);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+ }
+
+ @Test
+ public void aggregateExtractedPropertyCompoundField() {
+ Map, Integer> expected = new HashMap<>();
+ sep(c -> {
+ subscribe(Data3.class)
+ .groupByFieldsGetAndAggregate(
+ Data3::getX,
+ Aggregates.intSumFactory(),
+ Data3::getName, Data3::getValue)
+ .map(GroupBy::toMap)
+ .id("results");
+ });
+ val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply();
+
+ onEvent(new Data3("A", 10, 1));
+ expected.put(keyFactory.apply(new Data3("A", 10, 1)), 1);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data2 = new Data3("A", 10, 2);
+ onEvent(data2);
+ expected.put(keyFactory.apply(data2), 3);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data3 = new Data3("A", 10, 3);
+ onEvent(data3);
+ expected.put(keyFactory.apply(data3), 6);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 data4 = new Data3("A", 15, 111);
+ onEvent(data4);
+ expected.put(keyFactory.apply(data4), 111);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 dataB1 = new Data3("B", 10, 1);
+ onEvent(dataB1);
+ expected.put(keyFactory.apply(dataB1), 1);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+
+ Data3 dataB2 = new Data3("B", 10, 99);
+ onEvent(dataB2);
+ expected.put(keyFactory.apply(dataB2), 100);
+ MatcherAssert.assertThat(getStreamed("results"), is(expected));
+ }
+
+
public static MyModel updateItemScalar(MyModel model, MyEvent myEvent) {
model.createItem(myEvent.getData());
return model;
diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java
index 2dbe54c78..f0148d4a2 100644
--- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java
+++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java
@@ -120,7 +120,7 @@ public void nestedGroupByToList_WithHelper() {
@Test
public void nestedDataFlowGroupBy_toCollector() {
sep(c -> {
- DataFlow.groupBy(Person::getCountry, Collectors.groupingBy(Person::getGender, Collectors.toList()))
+ DataFlow.groupBy(Person::getCountry, Collectors.groupingBy(Person::getGender, Collectors.listFactory()))
.sink("groupBy");
});
this.addSink("groupBy", this::convertToMapList);
@@ -179,7 +179,7 @@ public void nestedGroupByToCollector_List_WithHelper() {
subscribe(Person.class)
.groupBy(
Person::getCountry,
- Collectors.groupingBy(Person::getGender, Collectors.toList()))
+ Collectors.groupingBy(Person::getGender, Collectors.listFactory()))
.sink("groupBy");
});
this.addSink("groupBy", this::convertToMapList);
diff --git a/parent-root/pom.xml b/parent-root/pom.xml
index 58e8af066..c81a26276 100644
--- a/parent-root/pom.xml
+++ b/parent-root/pom.xml
@@ -21,7 +21,7 @@
4.0.0
com.fluxtion
root-parent-pom
- 9.1.11-SNAPSHOT
+ 9.1.12-SNAPSHOT
pom
fluxtion :: poms :: parent root
diff --git a/pom.xml b/pom.xml
index 57b9627d0..fb72804dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@ along with this program. If not, see
4.0.0
com.fluxtion
fluxtion.master
- 9.1.11-SNAPSHOT
+ 9.1.12-SNAPSHOT
pom
fluxtion
diff --git a/runtime/pom.xml b/runtime/pom.xml
index bbb767752..2d2facd4e 100644
--- a/runtime/pom.xml
+++ b/runtime/pom.xml
@@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd.
com.fluxtion
root-parent-pom
- 9.1.11-SNAPSHOT
+ 9.1.12-SNAPSHOT
../parent-root/pom.xml
diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java
index 0bd71d06a..8e723cac2 100644
--- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java
+++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java
@@ -7,7 +7,7 @@
public class AggregateToListFlowFunction implements AggregateFlowFunction, AggregateToListFlowFunction> {
- private final List list = new ArrayList<>();
+ private transient final List list = new ArrayList<>();
private final int maxElementCount;
diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java
index 8b33fe38e..405a6c825 100644
--- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java
+++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java
@@ -7,7 +7,7 @@
public class AggregateToSetFlowFunction implements AggregateFlowFunction, AggregateToSetFlowFunction> {
- private final Set list = new HashSet<>();
+ private transient final Set list = new HashSet<>();
@Override
public Set reset() {
@@ -36,11 +36,4 @@ public Set aggregate(T input) {
return list;
}
-
- public static class AggregateToSetFactory {
-
- public AggregateToSetFlowFunction newList() {
- return new AggregateToSetFlowFunction<>();
- }
- }
}
diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java
new file mode 100644
index 000000000..e2e307217
--- /dev/null
+++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java
@@ -0,0 +1,109 @@
+package com.fluxtion.runtime.dataflow.groupby;
+
+import com.fluxtion.runtime.partition.LambdaReflection;
+import lombok.Getter;
+import lombok.ToString;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Constructs a compound key for using on group by constructs in a data flow. The key is composed using method references
+ * of the type to be grouped by.
+ *
+ * @param The type of data flow to create a key for
+ */
+@ToString(of = {"key", "name"})
+public class GroupByKey {
+ public final List> accessors;
+ private final transient StringBuilder keyHolder = new StringBuilder();
+ @Getter
+ private final transient Class valueClass;
+ @Getter
+ private transient String key;
+ private transient final String name;
+
+ public GroupByKey(List> accessorsToAdd) {
+ this.accessors = new ArrayList<>();
+ String tmpName = "";
+ for (LambdaReflection.SerializableFunction element : accessorsToAdd) {
+ if (!accessors.contains(element)) {
+ accessors.add(element);
+ tmpName += "_" + element.method().getName();
+ }
+ }
+ valueClass = (Class) accessors.get(0).method().getDeclaringClass();
+ name = valueClass.getName() + tmpName;
+ }
+
+ public GroupByKey(LambdaReflection.SerializableFunction accessor) {
+ this(Arrays.asList(accessor));
+ }
+
+ @SafeVarargs
+ public GroupByKey(LambdaReflection.SerializableFunction... accessorList) {
+ this(Arrays.asList(accessorList));
+ }
+
+ private GroupByKey(GroupByKey toClone) {
+ accessors = toClone.accessors;
+ valueClass = toClone.getValueClass();
+ name = toClone.name;
+ }
+
+ public static LambdaReflection.SerializableFunction> build(LambdaReflection.SerializableFunction accessor) {
+ return new GroupByKey<>(accessor)::toKey;
+ }
+
+ @SafeVarargs
+ public static LambdaReflection.SerializableFunction> build(
+ LambdaReflection.SerializableFunction accessor,
+ LambdaReflection.SerializableFunction... accessorList) {
+ List> accessors = new ArrayList<>();
+ accessors.add(accessor);
+ accessors.addAll(Arrays.asList(accessorList));
+ GroupByKey accessorKey = new GroupByKey<>(accessors);
+ return accessorKey::toKey;
+ }
+
+
+ public boolean keyPresent(LambdaReflection.SerializableFunction keyToCheck) {
+ return accessors.contains(keyToCheck);
+ }
+
+ public GroupByKey toKey(T input) {
+ //TODO add object pooling
+ GroupByKey cloned = new GroupByKey<>(this);
+ cloned.keyHolder.setLength(0);
+ for (int i = 0, accessorsSize = accessors.size(); i < accessorsSize; i++) {
+ LambdaReflection.SerializableFunction accessor = accessors.get(i);
+ cloned.keyHolder.append(accessor.apply(input).toString());
+ cloned.keyHolder.append("_");
+ }
+ cloned.key = cloned.keyHolder.toString();
+ return cloned;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ GroupByKey> that = (GroupByKey>) o;
+
+ if (!valueClass.equals(that.valueClass)) return false;
+ if (!Objects.equals(key, that.key)) return false;
+ return name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = valueClass.hashCode();
+ result = 31 * result + (key != null ? key.hashCode() : 0);
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+}
diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java
index fa2990506..bb8765007 100644
--- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java
+++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java
@@ -10,18 +10,31 @@
import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
import java.util.List;
+import java.util.Set;
public interface Collectors {
- static SerializableSupplier> toList(int maximumElementCount) {
+ static SerializableFunction> toSet() {
+ return new AggregateToSetFlowFunction()::aggregate;
+ }
+
+ static SerializableFunction> toList() {
+ return new AggregateToListFlowFunction()::aggregate;
+ }
+
+ static SerializableFunction> toList(int maxElements) {
+ return new AggregateToListFlowFunction(maxElements)::aggregate;
+ }
+
+ static SerializableSupplier> listFactory(int maximumElementCount) {
return new AggregateToListFactory(maximumElementCount)::newList;
}
- static SerializableSupplier> toList() {
- return toList(-1);
+ static SerializableSupplier> listFactory() {
+ return listFactory(-1);
}
- static SerializableSupplier> toSet() {
+ static SerializableSupplier> setFactory() {
return AggregateToSetFlowFunction::new;
}
diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java
index 464639661..73311bb9e 100644
--- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java
+++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java
@@ -31,7 +31,7 @@ public SerializableFunction getKeyFunction() {
}
public GroupByFlowFunctionWrapper, AggregateToListFlowFunction> groupByToList() {
- SerializableSupplier> list = Collectors.toList();
+ SerializableSupplier> list = Collectors.listFactory();
return new GroupByFlowFunctionWrapper<>(keyFunction, Mappers::identity, list);
}