Skip to content

Commit

Permalink
Feature/multi push function (#333)
Browse files Browse the repository at this point in the history
* initial work on pushing multiple data flows to a single node instance method with multiple parameters. Each flow maps to a method argument

* remove unused BiPushFunctionOLD.java

* remove debug from tests

* support 3 and 4 argument push helper in DataFlow

* Added test checking triggering of child node to a multi arg push method on parent

---------

Co-authored-by: greg <[email protected]>
  • Loading branch information
gregv12 and greg-higgins authored Jan 12, 2025
1 parent 43d5dd1 commit 3acb936
Show file tree
Hide file tree
Showing 13 changed files with 1,104 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import com.fluxtion.runtime.EventProcessorBuilderService;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.FlowSupplier;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction;
import com.fluxtion.runtime.dataflow.function.*;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
import com.fluxtion.runtime.dataflow.function.NodePropertyToFlowFunction;
import com.fluxtion.runtime.dataflow.function.NodeToFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupByHashMap;
import com.fluxtion.runtime.dataflow.groupby.GroupByKey;
Expand All @@ -35,9 +35,7 @@
import com.fluxtion.runtime.node.NamedFeedEventHandlerNode;
import com.fluxtion.runtime.node.NamedFeedTopicFilteredEventHandlerNode;
import com.fluxtion.runtime.partition.LambdaReflection;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
import com.fluxtion.runtime.partition.LambdaReflection.*;

import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -371,7 +369,7 @@ static LongFlowBuilder subscribeToLongSignal(String filterId, long defaultValue)
*
* @param builder The builder defining the merge operations
* @param <T> The output type of the merged stream
* @return An {@link FlowBuilder} that can used to construct stream processing logic
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
static <T> FlowBuilder<T> mergeMap(MergeAndMapFlowBuilder<T> builder) {
return new FlowBuilder<>(builder.build());
Expand All @@ -384,33 +382,31 @@ static <T> FlowBuilder<T> mergeMap(MergeAndMapFlowBuilder<T> builder) {
* {@link MergeAndMapFlowBuilder#optionalMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)}
* utility functions
*
* @param target Supplier of target instances that store the result of the push
* @param joinLegs The legs that supply the inputs to the join
* @param <K> The key class
* @param <T> The join target class
* @return The GroupByFlow with a new instance of the target allocated to every key
* @param target Supplier of target instances that store the result of the push
* @param mergeInputs The legs that supply the inputs to the merge consumer operations
* @param <T> The target class
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
@SafeVarargs
static <K, T> FlowBuilder<T> mergeMap(LambdaReflection.SerializableSupplier<T> target, MergeAndMapFlowBuilder.MergeInput<T, ?>... joinLegs) {
return MergeAndMapFlowBuilder.merge(target, joinLegs);
static <T> FlowBuilder<T> mergeMap(LambdaReflection.SerializableSupplier<T> target, MergeAndMapFlowBuilder.MergeInput<T, ?>... mergeInputs) {
return MergeAndMapFlowBuilder.merge(target, mergeInputs);
}

/**
* Builds a {@link FlowBuilder} that is formed from merging multiple {@link FlowBuilder} inputs of differernt types pushing
* Builds a {@link FlowBuilder} that is formed from merging multiple {@link FlowBuilder} inputs of different types pushing
* to a target instance. To create {@link com.fluxtion.compiler.builder.dataflow.MergeAndMapFlowBuilder.MergeInput}
* legs us the {@link MergeAndMapFlowBuilder#requiredMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)},
* {@link MergeAndMapFlowBuilder#optionalMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)}
* utility functions
*
* @param target target instance that store the result of the push
* @param joinLegs The legs that supply the inputs to the join
* @param <K> The key class
* @param <T> The join target class
* @return The GroupByFlow with a new instance of the target allocated to every key
* @param target target instance that store the result of the push
* @param mergeInputs The legs that supply the inputs to the merge consumer operations
* @param <T> The target class
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
@SafeVarargs
static <K, T> FlowBuilder<T> mergeMapToNode(T target, MergeAndMapFlowBuilder.MergeInput<T, ?>... joinLegs) {
return MergeAndMapFlowBuilder.mergeToNode(target, joinLegs);
static <K, T> FlowBuilder<T> mergeMapToNode(T target, MergeAndMapFlowBuilder.MergeInput<T, ?>... mergeInputs) {
return MergeAndMapFlowBuilder.mergeToNode(target, mergeInputs);
}

/**
Expand Down Expand Up @@ -579,7 +575,166 @@ static <T, K, V> GroupByFlowBuilder<K, V> groupBy(
* @return The GroupByFlow with a new instance of the target allocated to every key
*/
@SafeVarargs
public static <K, T> GroupByFlowBuilder<K, T> multiJoin(LambdaReflection.SerializableSupplier<T> target, MultiJoinBuilder.MultiJoinLeg<K, T, ?>... joinLegs) {
static <K, T> GroupByFlowBuilder<K, T> multiJoin(LambdaReflection.SerializableSupplier<T> target, MultiJoinBuilder.MultiJoinLeg<K, T, ?>... joinLegs) {
return MultiJoinBuilder.multiJoin(target, joinLegs);
}

/**
* Gathers two input {@link FlowSupplier} and pushes them to a single binary consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B> FlowBuilder<T> push(
SerializableBiConsumer<A, B> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB) {
BiPushFunction<T, A, B> pushFunction = new BiPushFunction<>(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers three input {@link FlowSupplier} and pushes them to a single 3 argument consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C> FlowBuilder<T> push(
SerializableTriConsumer<A, B, C> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC) {
TriPushFunction<T, A, B, C> pushFunction = new TriPushFunction<>(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers four input {@link FlowSupplier} and pushes them to a single 4 argument consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param flowBuilderD supply for argument 4
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @param <D> Type of argument 4
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C, D> FlowBuilder<T> push(
SerializableQuadConsumer<T, A, B, C> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC,
FlowDataSupplier<? extends FlowSupplier<D>> flowBuilderD) {
QuadPushFunction pushFunction = new QuadPushFunction(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier(),
flowBuilderD.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers two input {@link FlowSupplier} and pushes them to a single binary consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 1
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B> FlowBuilder<T> push(
SerializableTriConsumer<T, A, B> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB) {
BiPushFunction<T, A, B> pushFunction = new BiPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers three input {@link FlowSupplier} and pushes them to a single 3 argument consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C> FlowBuilder<T> push(
SerializableQuadConsumer<T, A, B, C> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC) {
TriPushFunction<T, A, B, C> pushFunction = new TriPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers four input {@link FlowSupplier} and pushes them to a single 4 argument consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param flowBuilderD supply for argument 4
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @param <D> Type of argument 4
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C, D> FlowBuilder<T> push(
SerializableQuinConsumer<T, A, B, C, D> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC,
FlowDataSupplier<? extends FlowSupplier<D>> flowBuilderD) {
QuadPushFunction<T, A, B, C, D> pushFunction = new QuadPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier(),
flowBuilderD.flowSupplier());
return new FlowBuilder<>(pushFunction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2025 gregory higgins.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/

package com.fluxtion.compiler.builder.dataflow;

import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.dataflow.function.MergeMapToNodeFlowFunction;
import com.fluxtion.runtime.dataflow.function.MergeProperty;
import com.fluxtion.runtime.partition.LambdaReflection;
import lombok.Data;

import java.util.ArrayList;
import java.util.List;

/**
* A builder that merges and maps several {@link FlowFunction}'s into a single event stream of type T
*
* @param <T> The output type of the merged stream
*/
public class MultiPushBuilder<T> {

private final T resultInstance;
private final List<MergeProperty<T, ?>> required = new ArrayList<>();

private MultiPushBuilder(T resultInstance) {
this.resultInstance = resultInstance;
}


public static <T> MultiPushBuilder<T> of(T resultInstance) {
return new MultiPushBuilder<T>(resultInstance);
}

public <R> MultiPushBuilder<T> required(FlowBuilder<R> trigger, LambdaReflection.SerializableBiConsumer<T, R> setValue) {
required.add(new MergeProperty<T, R>(trigger.eventStream, setValue, true, true));
return this;
}

public <R> MultiPushBuilder<T> requiredNoTrigger(FlowBuilder<R> trigger, LambdaReflection.SerializableBiConsumer<T, R> setValue) {
required.add(new MergeProperty<T, R>(trigger.eventStream, setValue, false, true));
return this;
}

/**
* Builds a FlowBuilder that is formed from multiple inouts pushing to a target instance.
*
* @param target Supplier of target instances that store the result of the push
* @param joinLegs The legs that supply the inputs to the join
* @param <K> The key class
* @param <T> The join target class
* @return The GroupByFlow with a new instance of the target allocated to every key
*/
@SuppressWarnings("all")
public static <K, T> FlowBuilder<T> merge(LambdaReflection.SerializableSupplier<T> target, MergeInput<T, ?>... joinLegs) {
MultiPushBuilder multiJoinBuilder = new MultiPushBuilder(target);
for (MergeInput joinLeg : joinLegs) {
if (joinLeg.isTriggering()) {
multiJoinBuilder.required(joinLeg.flow, joinLeg.getSetter());
} else {
multiJoinBuilder.requiredNoTrigger(joinLeg.flow, joinLeg.getSetter());
}
}
return multiJoinBuilder.dataFlow();
}

@SuppressWarnings("all")
public static <K, T> FlowBuilder<T> mergeToNode(T target, MergeInput<T, ?>... joinLegs) {
MultiPushBuilder multiJoinBuilder = new MultiPushBuilder(target);
for (MergeInput joinLeg : joinLegs) {
if (joinLeg.isTriggering()) {
multiJoinBuilder.required(joinLeg.flow, joinLeg.getSetter());
} else {
multiJoinBuilder.requiredNoTrigger(joinLeg.flow, joinLeg.getSetter());
}
}
return multiJoinBuilder.dataFlow();
}

public static <T1, R> MergeInput<T1, R> requiredMergeInput(FlowBuilder<R> flow,
LambdaReflection.SerializableBiConsumer<T1, R> setter) {
return new MultiPushBuilder.MergeInput<>(true, flow, setter);
}

public static <T1, R> MergeInput<T1, R> optionalMergeInput(FlowBuilder<R> flow,
LambdaReflection.SerializableBiConsumer<T1, R> setter) {
return new MultiPushBuilder.MergeInput<>(false, flow, setter);
}

@Data
public static class MergeInput<T, R> {
private final boolean triggering;
private final FlowBuilder<R> flow;
private final LambdaReflection.SerializableBiConsumer<T, R> setter;
}

public TriggeredFlowFunction<T> build() {
TriggeredFlowFunction<T> flowFunction;
MergeMapToNodeFlowFunction<T> streamNode = new MergeMapToNodeFlowFunction<>(resultInstance);
required.forEach(streamNode::registerTrigger);
flowFunction = streamNode;
return flowFunction;
}

/**
* Merges and maps several {@link FlowFunction}'s into a single event stream of type T
*
* @return An {@link FlowBuilder} that can used to construct stream processing logic
*/
public FlowBuilder<T> dataFlow() {
return new FlowBuilder<>(build());
}
}
Loading

0 comments on commit 3acb936

Please sign in to comment.