Skip to content

Commit

Permalink
support 3 and 4 argument push helper in DataFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
greg-higgins committed Jan 12, 2025
1 parent 94983e8 commit 466eb80
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@

import com.fluxtion.runtime.EventProcessorBuilderService;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.FlowSupplier;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction;
import com.fluxtion.runtime.dataflow.function.BiPushFunction;
import com.fluxtion.runtime.dataflow.function.*;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
import com.fluxtion.runtime.dataflow.function.NodePropertyToFlowFunction;
import com.fluxtion.runtime.dataflow.function.NodeToFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupByHashMap;
import com.fluxtion.runtime.dataflow.groupby.GroupByKey;
Expand All @@ -36,10 +35,7 @@
import com.fluxtion.runtime.node.NamedFeedEventHandlerNode;
import com.fluxtion.runtime.node.NamedFeedTopicFilteredEventHandlerNode;
import com.fluxtion.runtime.partition.LambdaReflection;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiConsumer;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
import com.fluxtion.runtime.partition.LambdaReflection.*;

import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -373,7 +369,7 @@ static LongFlowBuilder subscribeToLongSignal(String filterId, long defaultValue)
*
* @param builder The builder defining the merge operations
* @param <T> The output type of the merged stream
* @return An {@link FlowBuilder} that can used to construct stream processing logic
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
static <T> FlowBuilder<T> mergeMap(MergeAndMapFlowBuilder<T> builder) {
return new FlowBuilder<>(builder.build());
Expand All @@ -386,33 +382,31 @@ static <T> FlowBuilder<T> mergeMap(MergeAndMapFlowBuilder<T> builder) {
* {@link MergeAndMapFlowBuilder#optionalMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)}
* utility functions
*
* @param target Supplier of target instances that store the result of the push
* @param joinLegs The legs that supply the inputs to the join
* @param <K> The key class
* @param <T> The join target class
* @return The GroupByFlow with a new instance of the target allocated to every key
* @param target Supplier of target instances that store the result of the push
* @param mergeInputs The legs that supply the inputs to the merge consumer operations
* @param <T> The target class
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
@SafeVarargs
static <K, T> FlowBuilder<T> mergeMap(LambdaReflection.SerializableSupplier<T> target, MergeAndMapFlowBuilder.MergeInput<T, ?>... joinLegs) {
return MergeAndMapFlowBuilder.merge(target, joinLegs);
static <T> FlowBuilder<T> mergeMap(LambdaReflection.SerializableSupplier<T> target, MergeAndMapFlowBuilder.MergeInput<T, ?>... mergeInputs) {
return MergeAndMapFlowBuilder.merge(target, mergeInputs);
}

/**
* Builds a {@link FlowBuilder} that is formed from merging multiple {@link FlowBuilder} inputs of differernt types pushing
* Builds a {@link FlowBuilder} that is formed from merging multiple {@link FlowBuilder} inputs of different types pushing
* to a target instance. To create {@link com.fluxtion.compiler.builder.dataflow.MergeAndMapFlowBuilder.MergeInput}
* legs us the {@link MergeAndMapFlowBuilder#requiredMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)},
* {@link MergeAndMapFlowBuilder#optionalMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)}
* utility functions
*
* @param target target instance that store the result of the push
* @param joinLegs The legs that supply the inputs to the join
* @param <K> The key class
* @param <T> The join target class
* @return The GroupByFlow with a new instance of the target allocated to every key
* @param target target instance that store the result of the push
* @param mergeInputs The legs that supply the inputs to the merge consumer operations
* @param <T> The target class
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
@SafeVarargs
static <K, T> FlowBuilder<T> mergeMapToNode(T target, MergeAndMapFlowBuilder.MergeInput<T, ?>... joinLegs) {
return MergeAndMapFlowBuilder.mergeToNode(target, joinLegs);
static <K, T> FlowBuilder<T> mergeMapToNode(T target, MergeAndMapFlowBuilder.MergeInput<T, ?>... mergeInputs) {
return MergeAndMapFlowBuilder.mergeToNode(target, mergeInputs);
}

/**
Expand Down Expand Up @@ -585,26 +579,162 @@ static <K, T> GroupByFlowBuilder<K, T> multiJoin(LambdaReflection.SerializableSu
return MultiJoinBuilder.multiJoin(target, joinLegs);
}

/**
* Gathers two input {@link FlowSupplier} and pushes them to a single binary consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B> FlowBuilder<T> push(
SerializableBiConsumer<A, B> instancePushMethod,
FlowBuilder<A> flowBuilderA,
FlowBuilder<B> flowBuilderB) {
BiPushFunction<T, A, B> biPush = new BiPushFunction<>(
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB) {
BiPushFunction<T, A, B> pushFunction = new BiPushFunction<>(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier());
return new FlowBuilder<>(biPush);
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers three input {@link FlowSupplier} and pushes them to a single 3 argument consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C> FlowBuilder<T> push(
SerializableTriConsumer<A, B, C> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC) {
TriPushFunction<T, A, B, C> pushFunction = new TriPushFunction<>(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers four input {@link FlowSupplier} and pushes them to a single 4 argument consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param flowBuilderD supply for argument 4
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @param <D> Type of argument 4
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C, D> FlowBuilder<T> push(
SerializableQuadConsumer<T, A, B, C> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC,
FlowDataSupplier<? extends FlowSupplier<D>> flowBuilderD) {
QuadPushFunction pushFunction = new QuadPushFunction(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier(),
flowBuilderD.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers two input {@link FlowSupplier} and pushes them to a single binary consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 1
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B> FlowBuilder<T> push(
LambdaReflection.SerializableTriConsumer<T, A, B> classPushMethod,
FlowBuilder<A> flowBuilderA,
FlowBuilder<B> flowBuilderB) {
//add check for tri consumer instance method
BiPushFunction<T, A, B> biPush = new BiPushFunction<>(
SerializableTriConsumer<T, A, B> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB) {
BiPushFunction<T, A, B> pushFunction = new BiPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier());
return new FlowBuilder<>(biPush);
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers three input {@link FlowSupplier} and pushes them to a single 3 argument consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C> FlowBuilder<T> push(
SerializableQuadConsumer<T, A, B, C> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC) {
TriPushFunction<T, A, B, C> pushFunction = new TriPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers four input {@link FlowSupplier} and pushes them to a single 4 argument consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param flowBuilderD supply for argument 4
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @param <D> Type of argument 4
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C, D> FlowBuilder<T> push(
SerializableQuinConsumer<T, A, B, C, D> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC,
FlowDataSupplier<? extends FlowSupplier<D>> flowBuilderD) {
QuadPushFunction<T, A, B, C, D> pushFunction = new QuadPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier(),
flowBuilderD.flowSupplier());
return new FlowBuilder<>(pushFunction);
}
}
Loading

0 comments on commit 466eb80

Please sign in to comment.