Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 9.6.0 #334

Merged
merged 12 commits into from
Jan 12, 2025
5 changes: 3 additions & 2 deletions compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
~ along with this program. If not, see
~ <http://www.mongodb.com/licensing/server-side-public-license>.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.5.2-SNAPSHOT</version>
<version>9.6.0-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@

import com.fluxtion.runtime.EventProcessorBuilderService;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.FlowSupplier;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction;
import com.fluxtion.runtime.dataflow.function.*;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
import com.fluxtion.runtime.dataflow.function.MergeMapFlowFunction;
import com.fluxtion.runtime.dataflow.function.NodePropertyToFlowFunction;
import com.fluxtion.runtime.dataflow.function.NodeToFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupByHashMap;
import com.fluxtion.runtime.dataflow.groupby.GroupByKey;
Expand All @@ -36,9 +35,7 @@
import com.fluxtion.runtime.node.NamedFeedEventHandlerNode;
import com.fluxtion.runtime.node.NamedFeedTopicFilteredEventHandlerNode;
import com.fluxtion.runtime.partition.LambdaReflection;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
import com.fluxtion.runtime.partition.LambdaReflection.*;

import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -372,11 +369,44 @@ static LongFlowBuilder subscribeToLongSignal(String filterId, long defaultValue)
*
* @param builder The builder defining the merge operations
* @param <T> The output type of the merged stream
* @return An {@link FlowBuilder} that can used to construct stream processing logic
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
static <T> FlowBuilder<T> mergeMap(MergeAndMapFlowBuilder<T> builder) {
MergeMapFlowFunction<T> build = builder.build();
return new FlowBuilder<>(build);
return new FlowBuilder<>(builder.build());
}

/**
* Builds a {@link FlowBuilder} that is formed from merging multiple {@link FlowBuilder} inputs of differernt types pushing
* to a target instance. To create {@link com.fluxtion.compiler.builder.dataflow.MergeAndMapFlowBuilder.MergeInput}
* legs us the {@link MergeAndMapFlowBuilder#requiredMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)},
* {@link MergeAndMapFlowBuilder#optionalMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)}
* utility functions
*
* @param target Supplier of target instances that store the result of the push
* @param mergeInputs The legs that supply the inputs to the merge consumer operations
* @param <T> The target class
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
@SafeVarargs
static <T> FlowBuilder<T> mergeMap(LambdaReflection.SerializableSupplier<T> target, MergeAndMapFlowBuilder.MergeInput<T, ?>... mergeInputs) {
return MergeAndMapFlowBuilder.merge(target, mergeInputs);
}

/**
* Builds a {@link FlowBuilder} that is formed from merging multiple {@link FlowBuilder} inputs of different types pushing
* to a target instance. To create {@link com.fluxtion.compiler.builder.dataflow.MergeAndMapFlowBuilder.MergeInput}
* legs us the {@link MergeAndMapFlowBuilder#requiredMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)},
* {@link MergeAndMapFlowBuilder#optionalMergeInput(FlowBuilder, LambdaReflection.SerializableBiConsumer)}
* utility functions
*
* @param target target instance that store the result of the push
* @param mergeInputs The legs that supply the inputs to the merge consumer operations
* @param <T> The target class
* @return An {@link FlowBuilder} wrapping merged output that can used to construct stream processing logic
*/
@SafeVarargs
static <K, T> FlowBuilder<T> mergeMapToNode(T target, MergeAndMapFlowBuilder.MergeInput<T, ?>... mergeInputs) {
return MergeAndMapFlowBuilder.mergeToNode(target, mergeInputs);
}

/**
Expand Down Expand Up @@ -534,4 +564,177 @@ static <T, K, V> GroupByFlowBuilder<K, V> groupBy(
Class<T> classSubscription = (Class<T>) keyFunction.method().getDeclaringClass();
return subscribe(classSubscription).groupBy(keyFunction, valueFunction, aggregateFunctionSupplier);
}

/**
* Builds a GroupByFlowBuilder that is formed from multiple joins and pushed to a target instance.
*
* @param target Supplier of target instances that store the result of the join
* @param joinLegs The legs that supply the inputs to the join
* @param <K> The key class
* @param <T> The join target class
* @return The GroupByFlow with a new instance of the target allocated to every key
*/
@SafeVarargs
static <K, T> GroupByFlowBuilder<K, T> multiJoin(LambdaReflection.SerializableSupplier<T> target, MultiJoinBuilder.MultiJoinLeg<K, T, ?>... joinLegs) {
return MultiJoinBuilder.multiJoin(target, joinLegs);
}

/**
* Gathers two input {@link FlowSupplier} and pushes them to a single binary consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B> FlowBuilder<T> push(
SerializableBiConsumer<A, B> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB) {
BiPushFunction<T, A, B> pushFunction = new BiPushFunction<>(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers three input {@link FlowSupplier} and pushes them to a single 3 argument consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C> FlowBuilder<T> push(
SerializableTriConsumer<A, B, C> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC) {
TriPushFunction<T, A, B, C> pushFunction = new TriPushFunction<>(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers four input {@link FlowSupplier} and pushes them to a single 4 argument consumer method.
*
* @param instancePushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param flowBuilderD supply for argument 4
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @param <D> Type of argument 4
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C, D> FlowBuilder<T> push(
SerializableQuadConsumer<T, A, B, C> instancePushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC,
FlowDataSupplier<? extends FlowSupplier<D>> flowBuilderD) {
QuadPushFunction pushFunction = new QuadPushFunction(
instancePushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier(),
flowBuilderD.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers two input {@link FlowSupplier} and pushes them to a single binary consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 1
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B> FlowBuilder<T> push(
SerializableTriConsumer<T, A, B> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB) {
BiPushFunction<T, A, B> pushFunction = new BiPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers three input {@link FlowSupplier} and pushes them to a single 3 argument consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C> FlowBuilder<T> push(
SerializableQuadConsumer<T, A, B, C> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC) {
TriPushFunction<T, A, B, C> pushFunction = new TriPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier());
return new FlowBuilder<>(pushFunction);
}

/**
* Gathers four input {@link FlowSupplier} and pushes them to a single 4 argument consumer method.
* Implicitly creates a target instance from the
*
* @param classPushMethod consumer method
* @param flowBuilderA supply for argument 1
* @param flowBuilderB supply for argument 2
* @param flowBuilderC supply for argument 3
* @param flowBuilderD supply for argument 4
* @param <T> The target push instance type
* @param <A> Type of argument 1
* @param <B> Type of argument 2
* @param <C> Type of argument 3
* @param <D> Type of argument 4
* @return A {@link FlowBuilder} wrapping an instance of T that can used to construct stream processing logic
*/
static <T, A, B, C, D> FlowBuilder<T> push(
SerializableQuinConsumer<T, A, B, C, D> classPushMethod,
FlowDataSupplier<? extends FlowSupplier<A>> flowBuilderA,
FlowDataSupplier<? extends FlowSupplier<B>> flowBuilderB,
FlowDataSupplier<? extends FlowSupplier<C>> flowBuilderC,
FlowDataSupplier<? extends FlowSupplier<D>> flowBuilderD) {
QuadPushFunction<T, A, B, C, D> pushFunction = new QuadPushFunction<>(
classPushMethod,
flowBuilderA.flowSupplier(),
flowBuilderB.flowSupplier(),
flowBuilderC.flowSupplier(),
flowBuilderD.flowSupplier());
return new FlowBuilder<>(pushFunction);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
/*
* SPDX-FileCopyrightText: © 2024 Gregory Higgins <[email protected]>
* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) 2025 gregory higgins.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/

package com.fluxtion.compiler.builder.dataflow;
Expand Down Expand Up @@ -106,8 +119,6 @@ public <R> FlowBuilder<List<R>> mapToList(SerializableFunction<T, R> mapFunction

public <S, R> FlowBuilder<R> mapBiFunction(SerializableBiFunction<T, S, R> int2IntFunction,
FlowBuilder<S> stream2Builder) {

TriggeredFlowFunction<T> e1 = eventStream;
return new FlowBuilder<>(
new BinaryMapToRefFlowFunction<>(
eventStream, stream2Builder.eventStream, int2IntFunction)
Expand Down
Loading
Loading