diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java index c4598bd95..6989be7b5 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java +++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java @@ -9,16 +9,9 @@ import com.fluxtion.runtime.dataflow.aggregate.function.TimedSlidingWindow; import com.fluxtion.runtime.dataflow.aggregate.function.TumblingWindow; import com.fluxtion.runtime.dataflow.function.BinaryMapFlowFunction.BinaryMapToRefFlowFunction; -import com.fluxtion.runtime.dataflow.function.FlatMapArrayFlowFunction; -import com.fluxtion.runtime.dataflow.function.FlatMapFlowFunction; -import com.fluxtion.runtime.dataflow.function.LookupFlowFunction; -import com.fluxtion.runtime.dataflow.function.MapFlowFunction; +import com.fluxtion.runtime.dataflow.function.*; import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction; -import com.fluxtion.runtime.dataflow.function.MergeFlowFunction; -import com.fluxtion.runtime.dataflow.groupby.GroupBy; -import com.fluxtion.runtime.dataflow.groupby.GroupByFlowFunctionWrapper; -import com.fluxtion.runtime.dataflow.groupby.GroupByTimedSlidingWindow; -import com.fluxtion.runtime.dataflow.groupby.GroupByTumblingWindow; +import com.fluxtion.runtime.dataflow.groupby.*; import com.fluxtion.runtime.dataflow.helpers.Aggregates; import com.fluxtion.runtime.dataflow.helpers.Collectors; import com.fluxtion.runtime.dataflow.helpers.DefaultValue; @@ -80,6 +73,30 @@ public FlowBuilder map(SerializableFunction mapFunction) { return super.mapBase(mapFunction); } + public FlowBuilder> mapToSet() { + return map(Collectors.toSet()); + } + + public FlowBuilder> mapToSet(SerializableFunction mapFunction) { + return map(mapFunction).map(Collectors.toSet()); + } + + public FlowBuilder> mapToList() { + return map(Collectors.toList()); + } + + public FlowBuilder> mapToList(SerializableFunction mapFunction) { + return map(mapFunction).map(Collectors.toList()); + } + + public FlowBuilder> mapToList(int maxElements) { + return map(Collectors.toList(maxElements)); + } + + public FlowBuilder> mapToList(SerializableFunction mapFunction, int maxElements) { + return map(mapFunction).map(Collectors.toList(maxElements)); + } + public FlowBuilder mapBiFunction(SerializableBiFunction int2IntFunction, FlowBuilder stream2Builder) { @@ -130,6 +147,19 @@ public FlowBuilder flatMapFromArray(SerializableFunction iterable new TimedSlidingWindow<>(eventStream, aggregateFunction, bucketSizeMillis, bucketsPerWindow)); } + /** + * Aggregates a flow using a key function to group by and an aggregating function to process new values for a keyed + * bucket. + * + * @param keyFunction The key function that groups and buckets incoming values + * @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function + * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance + * @param Value type extracted from the incoming data flow + * @param The type of the key used to group values + * @param The return type of the aggregating function + * @param The aggregating function type + * @return A GroupByFlowBuilder for the aggregated flow + */ public > GroupByFlowBuilder groupBy(SerializableFunction keyFunction, SerializableFunction valueFunction, @@ -139,33 +169,147 @@ public FlowBuilder flatMapFromArray(SerializableFunction iterable return new GroupByFlowBuilder<>(x); } + /** + * Specialisation of groupBy where the value is the identity of the incoming data flow + * + * @param keyFunction The key function that groups and buckets incoming values + * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance + * @param The type of the key used to group values + * @param The return type of the aggregating function + * @param The aggregating function type + * @return A GroupByFlowBuilder for the aggregated flow + * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier) + */ public > GroupByFlowBuilder groupBy(SerializableFunction keyFunction, SerializableSupplier aggregateFunctionSupplier) { return groupBy(keyFunction, Mappers::identity, aggregateFunctionSupplier); } + /** + * Specialisation of groupBy where the output of the groupBy is the last value received for a bucket. The value is + * extracted using the value function + * + * @param keyFunction The key function that groups and buckets incoming values + * @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function + * @param Value type extracted from the incoming data flow + * @param The type of the key used to group values + * @return A GroupByFlowBuilder for the aggregated flow + * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier) + */ public GroupByFlowBuilder groupBy( SerializableFunction keyFunction, SerializableFunction valueFunction) { return groupBy(keyFunction, valueFunction, Aggregates.identityFactory()); } + /** + * Specialisation of groupBy where the output of the groupBy is the last value received for a bucket, where + * the value is the identity of the incoming data flow + * + * @param keyFunction The key function that groups and buckets incoming values + * @param The type of the key used to group values + * @return A GroupByFlowBuilder for the aggregated flow + */ public GroupByFlowBuilder groupBy(SerializableFunction keyFunction) { return groupBy(keyFunction, Mappers::identity); } + /** + * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the value. + * The value is the last value supplied + * + * @param keyFunction key accessor + * @param keyFunctions multi arg key accessors + * @return GroupByFlowBuilder keyed on properties + */ + @SafeVarargs + public final GroupByFlowBuilder, T> groupByFields( + SerializableFunction keyFunction, + SerializableFunction... keyFunctions) { + return groupBy(GroupByKey.build(keyFunction, keyFunctions)); + } + + /** + * Aggregates a flow using a key to group by and an aggregating function to process new values for a keyed + * bucket. The key is a compound key created by a set of method reference accessors to for the value. + * + * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance + * @param keyFunction key accessor + * @param keyFunctions multi arg key accessors + * @param The return type of the aggregating function + * @param The aggregating function type + * @return A GroupByFlowBuilder for the aggregated flow + * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier) + */ + @SafeVarargs + public final > GroupByFlowBuilder, A> groupByFieldsAggregate( + SerializableSupplier aggregateFunctionSupplier, + SerializableFunction keyFunction, + SerializableFunction... keyFunctions) { + return groupBy(GroupByKey.build(keyFunction, keyFunctions), aggregateFunctionSupplier); + } + + /** + * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key + * The value is extracted from the input using the value function + * + * @param valueFunction the value that will be stored in the groupBy + * @param keyFunction key accessor + * @param keyFunctions multi arg key accessors + * @return GroupByFlowBuilder keyed on properties + */ + @SafeVarargs + public final GroupByFlowBuilder, V> groupByFieldsAndGet( + SerializableFunction valueFunction, + SerializableFunction keyFunction, + SerializableFunction... keyFunctions) { + return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction); + } + + /** + * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key + * The value is extracted from the input using the value function and is used as an input to the aggregating function + * + * @param valueFunction the value that will be stored in the groupBy + * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance + * @param keyFunction key accessor + * @param keyFunctions multi arg key accessors + * @param Value type extracted from the incoming data flow + * @param The return type of the aggregating function + * @param The aggregating function type + * @return A GroupByFlowBuilder for the aggregated flow + * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier) + */ + @SafeVarargs + public final > GroupByFlowBuilder, A> groupByFieldsGetAndAggregate( + SerializableFunction valueFunction, + SerializableSupplier aggregateFunctionSupplier, + SerializableFunction keyFunction, + SerializableFunction... keyFunctions) { + return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction, aggregateFunctionSupplier); + } + public GroupByFlowBuilder> groupByToList(SerializableFunction keyFunction) { - return groupBy(keyFunction, Mappers::identity, Collectors.toList()); + return groupBy(keyFunction, Mappers::identity, Collectors.listFactory()); + } + + public GroupByFlowBuilder> groupByToList( + SerializableFunction keyFunction, SerializableFunction valueFunction) { + return groupBy(keyFunction, valueFunction, Collectors.listFactory()); } public GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction) { - return groupBy(keyFunction, Mappers::identity, Collectors.toSet()); + return groupBy(keyFunction, Mappers::identity, Collectors.setFactory()); + } + + public GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction, SerializableFunction valueFunction) { + return groupBy(keyFunction, valueFunction, Collectors.setFactory()); } public GroupByFlowBuilder> groupByToList( SerializableFunction keyFunction, int maxElementsInList) { - return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList)); + return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList)); } public > GroupByFlowBuilder diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java index 4b996ef51..382c552e9 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java +++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java @@ -129,12 +129,12 @@ public FlowBuilderBase merge(FlowBuilderBase streamToMerge) { public GroupByFlowBuilder> groupByAsList(SerializableFunction keyFunction) { - return groupBy(keyFunction, Mappers::identity, Collectors.toList()); + return groupBy(keyFunction, Mappers::identity, Collectors.listFactory()); } public GroupByFlowBuilder> groupByAsList(SerializableFunction keyFunction, int maxElementsInList) { - return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList)); + return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList)); } public > GroupByFlowBuilder diff --git a/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java b/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java index 71a7a03dd..f6f635ea7 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java +++ b/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java @@ -4,6 +4,7 @@ import com.fluxtion.compiler.generation.GenerationContext; import com.fluxtion.compiler.generation.model.Field; import com.fluxtion.compiler.generation.util.ClassUtils; +import com.fluxtion.runtime.dataflow.groupby.GroupByKey; import com.fluxtion.runtime.dataflow.helpers.GroupingFactory; import org.jetbrains.annotations.NotNull; @@ -184,6 +185,10 @@ public String buildTypeDeclaration(Field field, Function, String> class String genericDeclaration = "<" + inputClass + ", " + returnType + ", ?, ?>"; return genericDeclaration; } + if (instance instanceof GroupByKey) { + GroupByKey groupByKey = (GroupByKey) instance; + return "<" + classNameConverter.apply(groupByKey.getValueClass()) + ">"; + } return ""; } diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java index 2559511c0..f0f3d6f80 100644 --- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java +++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java @@ -27,13 +27,7 @@ import org.junit.Test; import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.atomic.LongAdder; import static com.fluxtion.compiler.builder.dataflow.DataFlow.*; @@ -494,7 +488,7 @@ public void aggregateTest() { @Test public void aggregateToLIstTest() { sep(c -> subscribe(String.class) - .aggregate(Collectors.toList(4)) + .aggregate(Collectors.listFactory(4)) .id("myList")); onEvent("A"); @@ -546,6 +540,111 @@ public void tumblingMap() { assertThat(getStreamed("sum"), is(0)); } + @Test + public void testMapToSet() { + sep(c -> DataFlow.subscribe(String.class).mapToSet().id("set")); + HashSet set = new HashSet<>(); + set.add("test"); + onEvent("test"); + onEvent("test"); + assertThat(getStreamed("set"), is(set)); + onEvent("test2"); + set.add("test2"); + assertThat(getStreamed("set"), is(set)); + onEvent("test"); + assertThat(getStreamed("set"), is(set)); + } + + @Test + public void testMapToSetFromProperty() { + sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToSet(GroupByTest.Data::getName).id("set")); + HashSet set = new HashSet<>(); + set.add("test"); + onEvent(new GroupByTest.Data("test", 22)); + onEvent(new GroupByTest.Data("test", 31)); + assertThat(getStreamed("set"), is(set)); + onEvent(new GroupByTest.Data("test2", 2334)); + set.add("test2"); + assertThat(getStreamed("set"), is(set)); + onEvent(new GroupByTest.Data("test", 31)); + assertThat(getStreamed("set"), is(set)); + } + + @Test + public void testMapToList() { + sep(c -> DataFlow.subscribe(String.class).mapToList().id("list")); + List list = new ArrayList<>(); + list.add("test"); + list.add("test"); + onEvent("test"); + onEvent("test"); + assertThat(getStreamed("list"), is(list)); + onEvent("test2"); + list.add("test2"); + assertThat(getStreamed("list"), is(list)); + onEvent("test"); + list.add("test"); + assertThat(getStreamed("list"), is(list)); + } + + @Test + public void testMapToList_MaxElements() { + sep(c -> DataFlow.subscribe(String.class).mapToList(2).id("list")); + List list = new ArrayList<>(); + list.add("test"); + list.add("test"); + onEvent("test"); + onEvent("test"); + assertThat(getStreamed("list"), is(list)); + //deleting + onEvent("test2"); + list.add("test2"); + list.remove(0); + assertThat(getStreamed("list"), is(list)); + //deleting + onEvent("test"); + list.add("test"); + list.remove(0); + assertThat(getStreamed("list"), is(list)); + } + + @Test + public void testMapToListFromProperty() { + sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToList(GroupByTest.Data::getName).id("list")); + List list = new ArrayList<>(); + list.add("test"); + list.add("test"); + onEvent(new GroupByTest.Data("test", 22)); + onEvent(new GroupByTest.Data("test", 31)); + assertThat(getStreamed("list"), is(list)); + onEvent(new GroupByTest.Data("test2", 2334)); + list.add("test2"); + assertThat(getStreamed("list"), is(list)); + onEvent(new GroupByTest.Data("test3", 3451)); + list.add("test3"); + assertThat(getStreamed("list"), is(list)); + } + + @Test + public void testMapToListFromProperty_MaxElements() { + sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToList(GroupByTest.Data::getName, 2).id("list")); + List list = new ArrayList<>(); + list.add("test1"); + list.add("test2"); + onEvent(new GroupByTest.Data("test1", 22)); + onEvent(new GroupByTest.Data("test2", 31)); + assertThat(getStreamed("list"), is(list)); + //deleting + onEvent(new GroupByTest.Data("test3", 2334)); + list.add("test3"); + list.remove(0); + assertThat(getStreamed("list"), is(list)); + //deleting + onEvent(new GroupByTest.Data("tes4", 3451)); + list.add("tes4"); + list.remove(0); + assertThat(getStreamed("list"), is(list)); + } @Value public static class Person { @@ -554,12 +653,10 @@ public static class Person { String gender; } - public static int doubleInt(int value) { return value * 2; } - @Value public static class MergedType { int value; diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java index 936831e2a..ff64cc25b 100644 --- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java +++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java @@ -5,13 +5,16 @@ import com.fluxtion.compiler.builder.dataflow.EventStreamBuildTest.MyIntFilter; import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest.SepTestConfig; import com.fluxtion.compiler.generation.util.MultipleSepTargetInProcessTest; +import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction; import com.fluxtion.runtime.dataflow.aggregate.function.primitive.DoubleSumFlowFunction; import com.fluxtion.runtime.dataflow.aggregate.function.primitive.IntSumFlowFunction; import com.fluxtion.runtime.dataflow.groupby.GroupBy; import com.fluxtion.runtime.dataflow.groupby.GroupBy.KeyValue; +import com.fluxtion.runtime.dataflow.groupby.GroupByKey; import com.fluxtion.runtime.dataflow.helpers.Aggregates; import com.fluxtion.runtime.dataflow.helpers.Mappers; import com.fluxtion.runtime.dataflow.helpers.Tuples; +import lombok.Getter; import lombok.Value; import lombok.val; import org.hamcrest.CoreMatchers; @@ -605,6 +608,163 @@ public void maintainModel() { onEvent(new MyEvent(SubSystem.REFERENCE, Change_type.DELETE, "greg-1")); } + @Value + public static class Data3 { + String name; + int value; + int x; + + + } + + @Getter + public static class Data3Aggregate implements AggregateFlowFunction { + int value; + + @Override + public Integer reset() { + return value; + } + + @Override + public Integer get() { + return value; + } + + @Override + public Integer aggregate(Data3 input) { + value += input.getX(); + return get(); + } + } + + @Test + public void groupingKey() { + Map, Data3> expected = new HashMap<>(); + sep(c -> { + subscribe(Data3.class) + .groupByFields(Data3::getName, Data3::getValue) + .map(GroupBy::toMap) + .id("results"); + }); + val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply(); + + onEvent(new Data3("A", 10, 1)); + expected.put(keyFactory.apply(new Data3("A", 10, 1)), new Data3("A", 10, 1)); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data2 = new Data3("A", 10, 2); + onEvent(data2); + expected.put(keyFactory.apply(data2), data2); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data3 = new Data3("A", 10, 3); + onEvent(data3); + expected.put(keyFactory.apply(data3), data3); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data4 = new Data3("A", 15, 111); + onEvent(data4); + expected.put(keyFactory.apply(data4), data4); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB1 = new Data3("B", 10, 1); + onEvent(dataB1); + expected.put(keyFactory.apply(dataB1), dataB1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB2 = new Data3("B", 10, 99); + onEvent(dataB2); + expected.put(keyFactory.apply(dataB2), dataB2); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + } + + @Test + public void aggregateCompoundField() { + Map, Integer> expected = new HashMap<>(); + sep(c -> { + subscribe(Data3.class) + .groupByFieldsAggregate(Data3Aggregate::new, Data3::getName, Data3::getValue) + .map(GroupBy::toMap) + .id("results"); + }); + val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply(); + + onEvent(new Data3("A", 10, 1)); + expected.put(keyFactory.apply(new Data3("A", 10, 1)), 1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data2 = new Data3("A", 10, 2); + onEvent(data2); + expected.put(keyFactory.apply(data2), 3); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data3 = new Data3("A", 10, 3); + onEvent(data3); + expected.put(keyFactory.apply(data3), 6); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data4 = new Data3("A", 15, 111); + onEvent(data4); + expected.put(keyFactory.apply(data4), 111); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB1 = new Data3("B", 10, 1); + onEvent(dataB1); + expected.put(keyFactory.apply(dataB1), 1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB2 = new Data3("B", 10, 99); + onEvent(dataB2); + expected.put(keyFactory.apply(dataB2), 100); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + } + + @Test + public void aggregateExtractedPropertyCompoundField() { + Map, Integer> expected = new HashMap<>(); + sep(c -> { + subscribe(Data3.class) + .groupByFieldsGetAndAggregate( + Data3::getX, + Aggregates.intSumFactory(), + Data3::getName, Data3::getValue) + .map(GroupBy::toMap) + .id("results"); + }); + val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply(); + + onEvent(new Data3("A", 10, 1)); + expected.put(keyFactory.apply(new Data3("A", 10, 1)), 1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data2 = new Data3("A", 10, 2); + onEvent(data2); + expected.put(keyFactory.apply(data2), 3); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data3 = new Data3("A", 10, 3); + onEvent(data3); + expected.put(keyFactory.apply(data3), 6); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data4 = new Data3("A", 15, 111); + onEvent(data4); + expected.put(keyFactory.apply(data4), 111); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB1 = new Data3("B", 10, 1); + onEvent(dataB1); + expected.put(keyFactory.apply(dataB1), 1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB2 = new Data3("B", 10, 99); + onEvent(dataB2); + expected.put(keyFactory.apply(dataB2), 100); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + } + + public static MyModel updateItemScalar(MyModel model, MyEvent myEvent) { model.createItem(myEvent.getData()); return model; diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java index 2dbe54c78..f0148d4a2 100644 --- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java +++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java @@ -120,7 +120,7 @@ public void nestedGroupByToList_WithHelper() { @Test public void nestedDataFlowGroupBy_toCollector() { sep(c -> { - DataFlow.groupBy(Person::getCountry, Collectors.groupingBy(Person::getGender, Collectors.toList())) + DataFlow.groupBy(Person::getCountry, Collectors.groupingBy(Person::getGender, Collectors.listFactory())) .sink("groupBy"); }); this.addSink("groupBy", this::convertToMapList); @@ -179,7 +179,7 @@ public void nestedGroupByToCollector_List_WithHelper() { subscribe(Person.class) .groupBy( Person::getCountry, - Collectors.groupingBy(Person::getGender, Collectors.toList())) + Collectors.groupingBy(Person::getGender, Collectors.listFactory())) .sink("groupBy"); }); this.addSink("groupBy", this::convertToMapList); diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java index 0bd71d06a..8e723cac2 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java @@ -7,7 +7,7 @@ public class AggregateToListFlowFunction implements AggregateFlowFunction, AggregateToListFlowFunction> { - private final List list = new ArrayList<>(); + private transient final List list = new ArrayList<>(); private final int maxElementCount; diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java index 8b33fe38e..405a6c825 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java @@ -7,7 +7,7 @@ public class AggregateToSetFlowFunction implements AggregateFlowFunction, AggregateToSetFlowFunction> { - private final Set list = new HashSet<>(); + private transient final Set list = new HashSet<>(); @Override public Set reset() { @@ -36,11 +36,4 @@ public Set aggregate(T input) { return list; } - - public static class AggregateToSetFactory { - - public AggregateToSetFlowFunction newList() { - return new AggregateToSetFlowFunction<>(); - } - } } diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java new file mode 100644 index 000000000..e2e307217 --- /dev/null +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java @@ -0,0 +1,109 @@ +package com.fluxtion.runtime.dataflow.groupby; + +import com.fluxtion.runtime.partition.LambdaReflection; +import lombok.Getter; +import lombok.ToString; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Constructs a compound key for using on group by constructs in a data flow. The key is composed using method references + * of the type to be grouped by. + * + * @param The type of data flow to create a key for + */ +@ToString(of = {"key", "name"}) +public class GroupByKey { + public final List> accessors; + private final transient StringBuilder keyHolder = new StringBuilder(); + @Getter + private final transient Class valueClass; + @Getter + private transient String key; + private transient final String name; + + public GroupByKey(List> accessorsToAdd) { + this.accessors = new ArrayList<>(); + String tmpName = ""; + for (LambdaReflection.SerializableFunction element : accessorsToAdd) { + if (!accessors.contains(element)) { + accessors.add(element); + tmpName += "_" + element.method().getName(); + } + } + valueClass = (Class) accessors.get(0).method().getDeclaringClass(); + name = valueClass.getName() + tmpName; + } + + public GroupByKey(LambdaReflection.SerializableFunction accessor) { + this(Arrays.asList(accessor)); + } + + @SafeVarargs + public GroupByKey(LambdaReflection.SerializableFunction... accessorList) { + this(Arrays.asList(accessorList)); + } + + private GroupByKey(GroupByKey toClone) { + accessors = toClone.accessors; + valueClass = toClone.getValueClass(); + name = toClone.name; + } + + public static LambdaReflection.SerializableFunction> build(LambdaReflection.SerializableFunction accessor) { + return new GroupByKey<>(accessor)::toKey; + } + + @SafeVarargs + public static LambdaReflection.SerializableFunction> build( + LambdaReflection.SerializableFunction accessor, + LambdaReflection.SerializableFunction... accessorList) { + List> accessors = new ArrayList<>(); + accessors.add(accessor); + accessors.addAll(Arrays.asList(accessorList)); + GroupByKey accessorKey = new GroupByKey<>(accessors); + return accessorKey::toKey; + } + + + public boolean keyPresent(LambdaReflection.SerializableFunction keyToCheck) { + return accessors.contains(keyToCheck); + } + + public GroupByKey toKey(T input) { + //TODO add object pooling + GroupByKey cloned = new GroupByKey<>(this); + cloned.keyHolder.setLength(0); + for (int i = 0, accessorsSize = accessors.size(); i < accessorsSize; i++) { + LambdaReflection.SerializableFunction accessor = accessors.get(i); + cloned.keyHolder.append(accessor.apply(input).toString()); + cloned.keyHolder.append("_"); + } + cloned.key = cloned.keyHolder.toString(); + return cloned; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + GroupByKey that = (GroupByKey) o; + + if (!valueClass.equals(that.valueClass)) return false; + if (!Objects.equals(key, that.key)) return false; + return name.equals(that.name); + } + + @Override + public int hashCode() { + int result = valueClass.hashCode(); + result = 31 * result + (key != null ? key.hashCode() : 0); + result = 31 * result + name.hashCode(); + return result; + } + +} diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java index fa2990506..bb8765007 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java @@ -10,18 +10,31 @@ import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier; import java.util.List; +import java.util.Set; public interface Collectors { - static SerializableSupplier> toList(int maximumElementCount) { + static SerializableFunction> toSet() { + return new AggregateToSetFlowFunction()::aggregate; + } + + static SerializableFunction> toList() { + return new AggregateToListFlowFunction()::aggregate; + } + + static SerializableFunction> toList(int maxElements) { + return new AggregateToListFlowFunction(maxElements)::aggregate; + } + + static SerializableSupplier> listFactory(int maximumElementCount) { return new AggregateToListFactory(maximumElementCount)::newList; } - static SerializableSupplier> toList() { - return toList(-1); + static SerializableSupplier> listFactory() { + return listFactory(-1); } - static SerializableSupplier> toSet() { + static SerializableSupplier> setFactory() { return AggregateToSetFlowFunction::new; } diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java index 464639661..73311bb9e 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java @@ -31,7 +31,7 @@ public SerializableFunction getKeyFunction() { } public GroupByFlowFunctionWrapper, AggregateToListFlowFunction> groupByToList() { - SerializableSupplier> list = Collectors.toList(); + SerializableSupplier> list = Collectors.listFactory(); return new GroupByFlowFunctionWrapper<>(keyFunction, Mappers::identity, list); }