Skip to content

Commit

Permalink
Merge branch 'develop' into release
Browse files Browse the repository at this point in the history
# Conflicts:
#	compiler/pom.xml
#	parent-root/pom.xml
#	pom.xml
#	runtime/pom.xml
  • Loading branch information
greg-higgins committed Dec 3, 2023
2 parents 8f38daa + f3d16f1 commit 19aca2d
Show file tree
Hide file tree
Showing 15 changed files with 565 additions and 44 deletions.
2 changes: 1 addition & 1 deletion compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd.
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.1.11-SNAPSHOT</version>
<version>9.1.12-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,9 @@
import com.fluxtion.runtime.dataflow.aggregate.function.TimedSlidingWindow;
import com.fluxtion.runtime.dataflow.aggregate.function.TumblingWindow;
import com.fluxtion.runtime.dataflow.function.BinaryMapFlowFunction.BinaryMapToRefFlowFunction;
import com.fluxtion.runtime.dataflow.function.FlatMapArrayFlowFunction;
import com.fluxtion.runtime.dataflow.function.FlatMapFlowFunction;
import com.fluxtion.runtime.dataflow.function.LookupFlowFunction;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction;
import com.fluxtion.runtime.dataflow.function.*;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
import com.fluxtion.runtime.dataflow.function.MergeFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupByFlowFunctionWrapper;
import com.fluxtion.runtime.dataflow.groupby.GroupByTimedSlidingWindow;
import com.fluxtion.runtime.dataflow.groupby.GroupByTumblingWindow;
import com.fluxtion.runtime.dataflow.groupby.*;
import com.fluxtion.runtime.dataflow.helpers.Aggregates;
import com.fluxtion.runtime.dataflow.helpers.Collectors;
import com.fluxtion.runtime.dataflow.helpers.DefaultValue;
Expand Down Expand Up @@ -80,6 +73,30 @@ public <R> FlowBuilder<R> map(SerializableFunction<T, R> mapFunction) {
return super.mapBase(mapFunction);
}

public FlowBuilder<Set<T>> mapToSet() {
return map(Collectors.toSet());
}

public <R> FlowBuilder<Set<R>> mapToSet(SerializableFunction<T, R> mapFunction) {
return map(mapFunction).map(Collectors.toSet());
}

public FlowBuilder<List<T>> mapToList() {
return map(Collectors.toList());
}

public <R> FlowBuilder<List<R>> mapToList(SerializableFunction<T, R> mapFunction) {
return map(mapFunction).map(Collectors.toList());
}

public FlowBuilder<List<T>> mapToList(int maxElements) {
return map(Collectors.toList(maxElements));
}

public <R> FlowBuilder<List<R>> mapToList(SerializableFunction<T, R> mapFunction, int maxElements) {
return map(mapFunction).map(Collectors.toList(maxElements));
}

public <S, R> FlowBuilder<R> mapBiFunction(SerializableBiFunction<T, S, R> int2IntFunction,
FlowBuilder<S> stream2Builder) {

Expand Down Expand Up @@ -130,6 +147,19 @@ public <R> FlowBuilder<R> flatMapFromArray(SerializableFunction<T, R[]> iterable
new TimedSlidingWindow<>(eventStream, aggregateFunction, bucketSizeMillis, bucketsPerWindow));
}

/**
* Aggregates a flow using a key function to group by and an aggregating function to process new values for a keyed
* bucket.
*
* @param keyFunction The key function that groups and buckets incoming values
* @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function
* @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
* @param <V> Value type extracted from the incoming data flow
* @param <K1> The type of the key used to group values
* @param <A> The return type of the aggregating function
* @param <F> The aggregating function type
* @return A GroupByFlowBuilder for the aggregated flow
*/
public <V, K1, A, F extends AggregateFlowFunction<V, A, F>> GroupByFlowBuilder<K1, A>
groupBy(SerializableFunction<T, K1> keyFunction,
SerializableFunction<T, V> valueFunction,
Expand All @@ -139,33 +169,147 @@ public <R> FlowBuilder<R> flatMapFromArray(SerializableFunction<T, R[]> iterable
return new GroupByFlowBuilder<>(x);
}

/**
* Specialisation of groupBy where the value is the identity of the incoming data flow
*
* @param keyFunction The key function that groups and buckets incoming values
* @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
* @param <K1> The type of the key used to group values
* @param <A> The return type of the aggregating function
* @param <F> The aggregating function type
* @return A GroupByFlowBuilder for the aggregated flow
* @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
*/
public <K1, A, F extends AggregateFlowFunction<T, A, F>> GroupByFlowBuilder<K1, A>
groupBy(SerializableFunction<T, K1> keyFunction, SerializableSupplier<F> aggregateFunctionSupplier) {
return groupBy(keyFunction, Mappers::identity, aggregateFunctionSupplier);
}

/**
* Specialisation of groupBy where the output of the groupBy is the last value received for a bucket. The value is
* extracted using the value function
*
* @param keyFunction The key function that groups and buckets incoming values
* @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function
* @param <V> Value type extracted from the incoming data flow
* @param <K1> The type of the key used to group values
* @return A GroupByFlowBuilder for the aggregated flow
* @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
*/
public <V, K1> GroupByFlowBuilder<K1, V> groupBy(
SerializableFunction<T, K1> keyFunction,
SerializableFunction<T, V> valueFunction) {
return groupBy(keyFunction, valueFunction, Aggregates.identityFactory());
}

/**
* Specialisation of groupBy where the output of the groupBy is the last value received for a bucket, where
* the value is the identity of the incoming data flow
*
* @param keyFunction The key function that groups and buckets incoming values
* @param <K> The type of the key used to group values
* @return A GroupByFlowBuilder for the aggregated flow
*/
public <K> GroupByFlowBuilder<K, T> groupBy(SerializableFunction<T, K> keyFunction) {
return groupBy(keyFunction, Mappers::identity);
}

/**
* Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the value.
* The value is the last value supplied
*
* @param keyFunction key accessor
* @param keyFunctions multi arg key accessors
* @return GroupByFlowBuilder keyed on properties
*/
@SafeVarargs
public final GroupByFlowBuilder<GroupByKey<T>, T> groupByFields(
SerializableFunction<T, ?> keyFunction,
SerializableFunction<T, ?>... keyFunctions) {
return groupBy(GroupByKey.build(keyFunction, keyFunctions));
}

/**
* Aggregates a flow using a key to group by and an aggregating function to process new values for a keyed
* bucket. The key is a compound key created by a set of method reference accessors to for the value.
*
* @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
* @param keyFunction key accessor
* @param keyFunctions multi arg key accessors
* @param <A> The return type of the aggregating function
* @param <F> The aggregating function type
* @return A GroupByFlowBuilder for the aggregated flow
* @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
*/
@SafeVarargs
public final <A, F extends AggregateFlowFunction<T, A, F>> GroupByFlowBuilder<GroupByKey<T>, A> groupByFieldsAggregate(
SerializableSupplier<F> aggregateFunctionSupplier,
SerializableFunction<T, ?> keyFunction,
SerializableFunction<T, ?>... keyFunctions) {
return groupBy(GroupByKey.build(keyFunction, keyFunctions), aggregateFunctionSupplier);
}

/**
* Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key
* The value is extracted from the input using the value function
*
* @param valueFunction the value that will be stored in the groupBy
* @param keyFunction key accessor
* @param keyFunctions multi arg key accessors
* @return GroupByFlowBuilder keyed on properties
*/
@SafeVarargs
public final <V> GroupByFlowBuilder<GroupByKey<T>, V> groupByFieldsAndGet(
SerializableFunction<T, V> valueFunction,
SerializableFunction<T, ?> keyFunction,
SerializableFunction<T, ?>... keyFunctions) {
return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction);
}

/**
* Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key
* The value is extracted from the input using the value function and is used as an input to the aggregating function
*
* @param valueFunction the value that will be stored in the groupBy
* @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
* @param keyFunction key accessor
* @param keyFunctions multi arg key accessors
* @param <V> Value type extracted from the incoming data flow
* @param <A> The return type of the aggregating function
* @param <F> The aggregating function type
* @return A GroupByFlowBuilder for the aggregated flow
* @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
*/
@SafeVarargs
public final <V, A, F extends AggregateFlowFunction<V, A, F>> GroupByFlowBuilder<GroupByKey<T>, A> groupByFieldsGetAndAggregate(
SerializableFunction<T, V> valueFunction,
SerializableSupplier<F> aggregateFunctionSupplier,
SerializableFunction<T, ?> keyFunction,
SerializableFunction<T, ?>... keyFunctions) {
return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction, aggregateFunctionSupplier);
}

public <K> GroupByFlowBuilder<K, List<T>> groupByToList(SerializableFunction<T, K> keyFunction) {
return groupBy(keyFunction, Mappers::identity, Collectors.toList());
return groupBy(keyFunction, Mappers::identity, Collectors.listFactory());
}

public <K, V> GroupByFlowBuilder<K, List<V>> groupByToList(
SerializableFunction<T, K> keyFunction, SerializableFunction<T, V> valueFunction) {
return groupBy(keyFunction, valueFunction, Collectors.listFactory());
}

public <K> GroupByFlowBuilder<K, Set<T>> groupByToSet(SerializableFunction<T, K> keyFunction) {
return groupBy(keyFunction, Mappers::identity, Collectors.toSet());
return groupBy(keyFunction, Mappers::identity, Collectors.setFactory());
}

public <K, V> GroupByFlowBuilder<K, Set<V>> groupByToSet(SerializableFunction<T, K> keyFunction, SerializableFunction<T, V> valueFunction) {
return groupBy(keyFunction, valueFunction, Collectors.setFactory());
}

public <K> GroupByFlowBuilder<K, List<T>> groupByToList(
SerializableFunction<T, K> keyFunction,
int maxElementsInList) {
return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList));
return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList));
}

public <V, K, A, F extends AggregateFlowFunction<V, A, F>> GroupByFlowBuilder<K, A>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ public FlowBuilderBase<T> merge(FlowBuilderBase<? extends T> streamToMerge) {

public <V, K> GroupByFlowBuilder<K, List<T>>
groupByAsList(SerializableFunction<T, K> keyFunction) {
return groupBy(keyFunction, Mappers::identity, Collectors.toList());
return groupBy(keyFunction, Mappers::identity, Collectors.listFactory());
}

public <V, K> GroupByFlowBuilder<K, List<T>>
groupByAsList(SerializableFunction<T, K> keyFunction, int maxElementsInList) {
return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList));
return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList));
}

public <V, K, A, F extends AggregateFlowFunction<V, A, F>> GroupByFlowBuilder<K, A>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fluxtion.compiler.generation.GenerationContext;
import com.fluxtion.compiler.generation.model.Field;
import com.fluxtion.compiler.generation.util.ClassUtils;
import com.fluxtion.runtime.dataflow.groupby.GroupByKey;
import com.fluxtion.runtime.dataflow.helpers.GroupingFactory;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -184,6 +185,10 @@ public String buildTypeDeclaration(Field field, Function<Class<?>, String> class
String genericDeclaration = "<" + inputClass + ", " + returnType + ", ?, ?>";
return genericDeclaration;
}
if (instance instanceof GroupByKey) {
GroupByKey groupByKey = (GroupByKey) instance;
return "<" + classNameConverter.apply(groupByKey.getValueClass()) + ">";
}
return "";
}

Expand Down
Loading

0 comments on commit 19aca2d

Please sign in to comment.