Skip to content

Commit

Permalink
Release 9.6.0 (#334)
Browse files Browse the repository at this point in the history
* updating poms for 9.5.3-SNAPSHOT development

* updating poms for branch'release/9.5.2' with non-snapshot versions

* updating develop poms to master versions to avoid merge conflicts

* Updating develop poms back to pre merge state

* simplify api for using MultiJoinBuilder

* simplify api for using merge and Map. Merge and map now supports merging to an instance, previously was only a factory

* add correct @SafeVarargs suppression

* code tidy

* move next release version to 9.6.0

* Feature/multi push function (#333)

* initial work on pushing multiple data flows to a single node instance method with multiple parameters. Each flow maps to a method argument

* remove unused BiPushFunctionOLD.java

* remove debug from tests

* support 3 and 4 argument push helper in DataFlow

* Added test checking triggering of child node to a multi arg push method on parent

---------

Co-authored-by: greg <[email protected]>

---------

Co-authored-by: runner <runner@fv-az1501-311>
Co-authored-by: greg <[email protected]>
  • Loading branch information
3 people authored Jan 12, 2025
1 parent 6cda28e commit 69672c3
Show file tree
Hide file tree
Showing 25 changed files with 1,645 additions and 49 deletions.
5 changes: 3 additions & 2 deletions compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
~ along with this program. If not, see
~ <http://www.mongodb.com/licensing/server-side-public-license>.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.5.2-SNAPSHOT</version>
<version>9.6.0-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@

import com.fluxtion.runtime.EventProcessorBuilderService;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.FlowSupplier;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction;
import com.fluxtion.runtime.dataflow.function.*;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
import com.fluxtion.runtime.dataflow.function.MergeMapFlowFunction;
import com.fluxtion.runtime.dataflow.function.NodePropertyToFlowFunction;
import com.fluxtion.runtime.dataflow.function.NodeToFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupByHashMap;
import com.fluxtion.runtime.dataflow.groupby.GroupByKey;
Expand All @@ -36,9 +35,7 @@
import com.fluxtion.runtime.node.NamedFeedEventHandlerNode;
import com.fluxtion.runtime.node.NamedFeedTopicFilteredEventHandlerNode;
import com.fluxtion.runtime.partition.LambdaReflection;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
import com.fluxtion.runtime.partition.LambdaReflection.*;

import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -372,11 +369,44 @@ static LongFlowBuilder subscribeToLongSignal(String filterId, long defaultValue)
*
* @param builder The builder defining the merge operations
* @param <T> The output type of the merged stream
* @return An {@link FlowBuilder} that can used to construct stream processing logic
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
static <T> FlowBuilder<T> mergeMap(MergeAndMapFlowBuilder<T> builder) {
MergeMapFlowFunction<T> build = builder.build();
return new FlowBuilder<>(build);
return new FlowBuilder<>(builder.build());
}

/**
* Builds a {@link FlowBuilder} that is formed from merging multiple {@link FlowBuilder} inputs of differernt types pushing
* to a target instance. To create {@link com.fluxtion.compiler.builder.dataflow.MergeAndMapFlowBuilder.MergeInput}
* legs us the {@link MergeAndMapFlowBuilder#requiredMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)},
* {@link MergeAndMapFlowBuilder#optionalMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)}
* utility functions
*
* @param target Supplier of target instances that store the result of the push
* @param mergeInputs The legs that supply the inputs to the merge consumer operations
* @param <T> The target class
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
@SafeVarargs
static <T> FlowBuilder<T> mergeMap(LambdaReflection.SerializableSupplier<T> target, MergeAndMapFlowBuilder.MergeInput<T, ?>... mergeInputs) {
return MergeAndMapFlowBuilder.merge(target, mergeInputs);
}

/**
* Builds a {@link FlowBuilder} that is formed from merging multiple {@link FlowBuilder} inputs of different types pushing
* to a target instance. To create {@link com.fluxtion.compiler.builder.dataflow.MergeAndMapFlowBuilder.MergeInput}
* legs us the {@link MergeAndMapFlowBuilder#requiredMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)},
* {@link MergeAndMapFlowBuilder#optionalMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)}
* utility functions
*
* @param target target instance that store the result of the push
* @param mergeInputs The legs that supply the inputs to the merge consumer operations
* @param <T> The target class
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
@SafeVarargs
static <K, T> FlowBuilder<T> mergeMapToNode(T target, MergeAndMapFlowBuilder.MergeInput<T, ?>... mergeInputs) {
return MergeAndMapFlowBuilder.mergeToNode(target, mergeInputs);
}

/**
Expand Down Expand Up @@ -534,4 +564,177 @@ static <T, K, V> GroupByFlowBuilder<K, V> groupBy(
Class<T> classSubscription = (Class<T>) keyFunction.method().getDeclaringClass();
return subscribe(classSubscription).groupBy(keyFunction, valueFunction, aggregateFunctionSupplier);
}

/**
* Builds a GroupByFlowBuilder that is formed from multiple joins and pushed to a target instance.
*
* @param target Supplier of target instances that store the result of the join
* @param joinLegs The legs that supply the inputs to the join
* @param <K> The key class
* @param <T> The join target class
* @return The GroupByFlow with a new instance of the target allocated to every key
*/
@SafeVarargs
static <K, T> GroupByFlowBuilder<K, T> multiJoin(LambdaReflection.SerializableSupplier<T> target, MultiJoinBuilder.MultiJoinLeg<K, T, ?>... joinLegs) {
return MultiJoinBuilder.multiJoin(target, joinLegs);
}

/**
* Gathers two input {@link FlowSupplier} and pushes them to a single binary consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B> FlowBuilder<T> push(
SerializableBiConsumer<A, B> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB) {
BiPushFunction<T, A, B> pushFunction = new BiPushFunction<>(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers three input {@link FlowSupplier} and pushes them to a single 3 argument consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C> FlowBuilder<T> push(
SerializableTriConsumer<A, B, C> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC) {
TriPushFunction<T, A, B, C> pushFunction = new TriPushFunction<>(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers four input {@link FlowSupplier} and pushes them to a single 4 argument consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param flowBuilderD supply for argument 4
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @param <D> Type of argument 4
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C, D> FlowBuilder<T> push(
SerializableQuadConsumer<T, A, B, C> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC,
FlowDataSupplier<? extends FlowSupplier<D>> flowBuilderD) {
QuadPushFunction pushFunction = new QuadPushFunction(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier(),
flowBuilderD.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers two input {@link FlowSupplier} and pushes them to a single binary consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 1
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B> FlowBuilder<T> push(
SerializableTriConsumer<T, A, B> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB) {
BiPushFunction<T, A, B> pushFunction = new BiPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers three input {@link FlowSupplier} and pushes them to a single 3 argument consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C> FlowBuilder<T> push(
SerializableQuadConsumer<T, A, B, C> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC) {
TriPushFunction<T, A, B, C> pushFunction = new TriPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers four input {@link FlowSupplier} and pushes them to a single 4 argument consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param flowBuilderD supply for argument 4
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @param <D> Type of argument 4
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C, D> FlowBuilder<T> push(
SerializableQuinConsumer<T, A, B, C, D> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC,
FlowDataSupplier<? extends FlowSupplier<D>> flowBuilderD) {
QuadPushFunction<T, A, B, C, D> pushFunction = new QuadPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier(),
flowBuilderD.flowSupplier());
return new FlowBuilder<>(pushFunction);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
/*
* SPDX-FileCopyrightText: © 2024 Gregory Higgins <[email protected]>
* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) 2025 gregory higgins.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/

package com.fluxtion.compiler.builder.dataflow;
Expand Down Expand Up @@ -106,8 +119,6 @@ public <R> FlowBuilder<List<R>> mapToList(SerializableFunction<T, R> mapFunction

public <S, R> FlowBuilder<R> mapBiFunction(SerializableBiFunction<T, S, R> int2IntFunction,
FlowBuilder<S> stream2Builder) {

TriggeredFlowFunction<T> e1 = eventStream;
return new FlowBuilder<>(
new BinaryMapToRefFlowFunction<>(
eventStream, stream2Builder.eventStream, int2IntFunction)
Expand Down
Loading

0 comments on commit 69672c3

Please sign in to comment.