Skip to content

Commit

Permalink
Adds support in data flow for mapping to sets and lists in FlowBuilde…
Browse files Browse the repository at this point in the history
…r. Compound keys in groupBy are supported with GroupByKey in FlowBuilder.
  • Loading branch information
greg-higgins committed Dec 3, 2023
1 parent 5ebcceb commit f3d16f1
Show file tree
Hide file tree
Showing 11 changed files with 561 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,9 @@
import com.fluxtion.runtime.dataflow.aggregate.function.TimedSlidingWindow;
import com.fluxtion.runtime.dataflow.aggregate.function.TumblingWindow;
import com.fluxtion.runtime.dataflow.function.BinaryMapFlowFunction.BinaryMapToRefFlowFunction;
import com.fluxtion.runtime.dataflow.function.FlatMapArrayFlowFunction;
import com.fluxtion.runtime.dataflow.function.FlatMapFlowFunction;
import com.fluxtion.runtime.dataflow.function.LookupFlowFunction;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction;
import com.fluxtion.runtime.dataflow.function.*;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
import com.fluxtion.runtime.dataflow.function.MergeFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupByFlowFunctionWrapper;
import com.fluxtion.runtime.dataflow.groupby.GroupByTimedSlidingWindow;
import com.fluxtion.runtime.dataflow.groupby.GroupByTumblingWindow;
import com.fluxtion.runtime.dataflow.groupby.*;
import com.fluxtion.runtime.dataflow.helpers.Aggregates;
import com.fluxtion.runtime.dataflow.helpers.Collectors;
import com.fluxtion.runtime.dataflow.helpers.DefaultValue;
Expand Down Expand Up @@ -80,6 +73,30 @@ public <R> FlowBuilder<R> map(SerializableFunction<T, R> mapFunction) {
return super.mapBase(mapFunction);
}

public FlowBuilder<Set<T>> mapToSet() {
return map(Collectors.toSet());
}

public <R> FlowBuilder<Set<R>> mapToSet(SerializableFunction<T, R> mapFunction) {
return map(mapFunction).map(Collectors.toSet());
}

public FlowBuilder<List<T>> mapToList() {
return map(Collectors.toList());
}

public <R> FlowBuilder<List<R>> mapToList(SerializableFunction<T, R> mapFunction) {
return map(mapFunction).map(Collectors.toList());
}

public FlowBuilder<List<T>> mapToList(int maxElements) {
return map(Collectors.toList(maxElements));
}

public <R> FlowBuilder<List<R>> mapToList(SerializableFunction<T, R> mapFunction, int maxElements) {
return map(mapFunction).map(Collectors.toList(maxElements));
}

public <S, R> FlowBuilder<R> mapBiFunction(SerializableBiFunction<T, S, R> int2IntFunction,
FlowBuilder<S> stream2Builder) {

Expand Down Expand Up @@ -130,6 +147,19 @@ public <R> FlowBuilder<R> flatMapFromArray(SerializableFunction<T, R[]> iterable
new TimedSlidingWindow<>(eventStream, aggregateFunction, bucketSizeMillis, bucketsPerWindow));
}

/**
* Aggregates a flow using a key function to group by and an aggregating function to process new values for a keyed
* bucket.
*
* @param keyFunction The key function that groups and buckets incoming values
* @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function
* @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
* @param <V> Value type extracted from the incoming data flow
* @param <K1> The type of the key used to group values
* @param <A> The return type of the aggregating function
* @param <F> The aggregating function type
* @return A GroupByFlowBuilder for the aggregated flow
*/
public <V, K1, A, F extends AggregateFlowFunction<V, A, F>> GroupByFlowBuilder<K1, A>
groupBy(SerializableFunction<T, K1> keyFunction,
SerializableFunction<T, V> valueFunction,
Expand All @@ -139,33 +169,147 @@ public <R> FlowBuilder<R> flatMapFromArray(SerializableFunction<T, R[]> iterable
return new GroupByFlowBuilder<>(x);
}

/**
* Specialisation of groupBy where the value is the identity of the incoming data flow
*
* @param keyFunction The key function that groups and buckets incoming values
* @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
* @param <K1> The type of the key used to group values
* @param <A> The return type of the aggregating function
* @param <F> The aggregating function type
* @return A GroupByFlowBuilder for the aggregated flow
* @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
*/
public <K1, A, F extends AggregateFlowFunction<T, A, F>> GroupByFlowBuilder<K1, A>
groupBy(SerializableFunction<T, K1> keyFunction, SerializableSupplier<F> aggregateFunctionSupplier) {
return groupBy(keyFunction, Mappers::identity, aggregateFunctionSupplier);
}

/**
* Specialisation of groupBy where the output of the groupBy is the last value received for a bucket. The value is
* extracted using the value function
*
* @param keyFunction The key function that groups and buckets incoming values
* @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function
* @param <V> Value type extracted from the incoming data flow
* @param <K1> The type of the key used to group values
* @return A GroupByFlowBuilder for the aggregated flow
* @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
*/
public <V, K1> GroupByFlowBuilder<K1, V> groupBy(
SerializableFunction<T, K1> keyFunction,
SerializableFunction<T, V> valueFunction) {
return groupBy(keyFunction, valueFunction, Aggregates.identityFactory());
}

/**
* Specialisation of groupBy where the output of the groupBy is the last value received for a bucket, where
* the value is the identity of the incoming data flow
*
* @param keyFunction The key function that groups and buckets incoming values
* @param <K> The type of the key used to group values
* @return A GroupByFlowBuilder for the aggregated flow
*/
public <K> GroupByFlowBuilder<K, T> groupBy(SerializableFunction<T, K> keyFunction) {
return groupBy(keyFunction, Mappers::identity);
}

/**
* Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the value.
* The value is the last value supplied
*
* @param keyFunction key accessor
* @param keyFunctions multi arg key accessors
* @return GroupByFlowBuilder keyed on properties
*/
@SafeVarargs
public final GroupByFlowBuilder<GroupByKey<T>, T> groupByFields(
SerializableFunction<T, ?> keyFunction,
SerializableFunction<T, ?>... keyFunctions) {
return groupBy(GroupByKey.build(keyFunction, keyFunctions));
}

/**
* Aggregates a flow using a key to group by and an aggregating function to process new values for a keyed
* bucket. The key is a compound key created by a set of method reference accessors to for the value.
*
* @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
* @param keyFunction key accessor
* @param keyFunctions multi arg key accessors
* @param <A> The return type of the aggregating function
* @param <F> The aggregating function type
* @return A GroupByFlowBuilder for the aggregated flow
* @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
*/
@SafeVarargs
public final <A, F extends AggregateFlowFunction<T, A, F>> GroupByFlowBuilder<GroupByKey<T>, A> groupByFieldsAggregate(
SerializableSupplier<F> aggregateFunctionSupplier,
SerializableFunction<T, ?> keyFunction,
SerializableFunction<T, ?>... keyFunctions) {
return groupBy(GroupByKey.build(keyFunction, keyFunctions), aggregateFunctionSupplier);
}

/**
* Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key
* The value is extracted from the input using the value function
*
* @param valueFunction the value that will be stored in the groupBy
* @param keyFunction key accessor
* @param keyFunctions multi arg key accessors
* @return GroupByFlowBuilder keyed on properties
*/
@SafeVarargs
public final <V> GroupByFlowBuilder<GroupByKey<T>, V> groupByFieldsAndGet(
SerializableFunction<T, V> valueFunction,
SerializableFunction<T, ?> keyFunction,
SerializableFunction<T, ?>... keyFunctions) {
return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction);
}

/**
* Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key
* The value is extracted from the input using the value function and is used as an input to the aggregating function
*
* @param valueFunction the value that will be stored in the groupBy
* @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
* @param keyFunction key accessor
* @param keyFunctions multi arg key accessors
* @param <V> Value type extracted from the incoming data flow
* @param <A> The return type of the aggregating function
* @param <F> The aggregating function type
* @return A GroupByFlowBuilder for the aggregated flow
* @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
*/
@SafeVarargs
public final <V, A, F extends AggregateFlowFunction<V, A, F>> GroupByFlowBuilder<GroupByKey<T>, A> groupByFieldsGetAndAggregate(
SerializableFunction<T, V> valueFunction,
SerializableSupplier<F> aggregateFunctionSupplier,
SerializableFunction<T, ?> keyFunction,
SerializableFunction<T, ?>... keyFunctions) {
return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction, aggregateFunctionSupplier);
}

public <K> GroupByFlowBuilder<K, List<T>> groupByToList(SerializableFunction<T, K> keyFunction) {
return groupBy(keyFunction, Mappers::identity, Collectors.toList());
return groupBy(keyFunction, Mappers::identity, Collectors.listFactory());
}

public <K, V> GroupByFlowBuilder<K, List<V>> groupByToList(
SerializableFunction<T, K> keyFunction, SerializableFunction<T, V> valueFunction) {
return groupBy(keyFunction, valueFunction, Collectors.listFactory());
}

public <K> GroupByFlowBuilder<K, Set<T>> groupByToSet(SerializableFunction<T, K> keyFunction) {
return groupBy(keyFunction, Mappers::identity, Collectors.toSet());
return groupBy(keyFunction, Mappers::identity, Collectors.setFactory());
}

public <K, V> GroupByFlowBuilder<K, Set<V>> groupByToSet(SerializableFunction<T, K> keyFunction, SerializableFunction<T, V> valueFunction) {
return groupBy(keyFunction, valueFunction, Collectors.setFactory());
}

public <K> GroupByFlowBuilder<K, List<T>> groupByToList(
SerializableFunction<T, K> keyFunction,
int maxElementsInList) {
return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList));
return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList));
}

public <V, K, A, F extends AggregateFlowFunction<V, A, F>> GroupByFlowBuilder<K, A>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ public FlowBuilderBase<T> merge(FlowBuilderBase<? extends T> streamToMerge) {

public <V, K> GroupByFlowBuilder<K, List<T>>
groupByAsList(SerializableFunction<T, K> keyFunction) {
return groupBy(keyFunction, Mappers::identity, Collectors.toList());
return groupBy(keyFunction, Mappers::identity, Collectors.listFactory());
}

public <V, K> GroupByFlowBuilder<K, List<T>>
groupByAsList(SerializableFunction<T, K> keyFunction, int maxElementsInList) {
return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList));
return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList));
}

public <V, K, A, F extends AggregateFlowFunction<V, A, F>> GroupByFlowBuilder<K, A>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fluxtion.compiler.generation.GenerationContext;
import com.fluxtion.compiler.generation.model.Field;
import com.fluxtion.compiler.generation.util.ClassUtils;
import com.fluxtion.runtime.dataflow.groupby.GroupByKey;
import com.fluxtion.runtime.dataflow.helpers.GroupingFactory;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -184,6 +185,10 @@ public String buildTypeDeclaration(Field field, Function<Class<?>, String> class
String genericDeclaration = "<" + inputClass + ", " + returnType + ", ?, ?>";
return genericDeclaration;
}
if (instance instanceof GroupByKey) {
GroupByKey groupByKey = (GroupByKey) instance;
return "<" + classNameConverter.apply(groupByKey.getValueClass()) + ">";
}
return "";
}

Expand Down
Loading

0 comments on commit f3d16f1

Please sign in to comment.