diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java index 706d792b91..8f68925cbe 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java @@ -26,17 +26,14 @@ import org.apache.synapse.MessageContext; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; -import org.apache.synapse.aspects.ComponentType; import org.apache.synapse.aspects.flow.statistics.StatisticsCloseEventListener; -import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; -import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; -import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; import org.apache.synapse.carbonext.TenantInfoConfigurator; import org.apache.synapse.continuation.ContinuationStackManager; import org.apache.synapse.continuation.SeqContinuationState; import org.apache.synapse.debug.SynapseDebugManager; import org.apache.synapse.mediators.base.SequenceMediator; +import org.apache.synapse.mediators.v2.ScatterGatherUtils; import org.apache.synapse.util.logging.LoggingUtils; /** @@ -99,18 +96,14 @@ public void run() { boolean result = seq.mediate(synCtx); // If this is a scatter message, then we need to use the continuation state and continue the mediation - if (isScatterMessage(synCtx) && result) { + if (ScatterGatherUtils.isScatterMessage(synCtx) && result) { SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx); if (seqContinuationState == null) { return; } SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState); - - FlowContinuableMediator mediator = - (FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition()); - synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true); - mediator.mediate(synCtx, seqContinuationState); + sequenceMediator.mediate(synCtx, seqContinuationState); } //((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard(); @@ -150,7 +143,7 @@ public void run() { debugManager.advertiseMediationFlowTerminatePoint(synCtx); debugManager.releaseMediationFlowLock(); } - if (RuntimeStatisticCollector.isStatisticsEnabled() && !isScatterMessage(synCtx)) { + if (RuntimeStatisticCollector.isStatisticsEnabled() && !ScatterGatherUtils.isScatterMessage(synCtx)) { this.statisticsCloseEventListener.invokeCloseEventEntry(synCtx); } } @@ -175,16 +168,4 @@ private void warn(boolean traceOn, String msg, MessageContext msgContext) { public void setStatisticsCloseEventListener(StatisticsCloseEventListener statisticsCloseEventListener) { this.statisticsCloseEventListener = statisticsCloseEventListener; } - - /** - * Check whether the message is a scatter message or not - * - * @param synCtx MessageContext - * @return true if the message is a scatter message - */ - private static boolean isScatterMessage(MessageContext synCtx) { - - Boolean isScatterMessage = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); - return isScatterMessage != null && isScatterMessage; - } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java index ccf33c72ec..c357b31d49 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java @@ -32,6 +32,7 @@ import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder; +import org.apache.synapse.mediators.v2.ScatterGatherUtils; import org.apache.synapse.transport.customlogsetter.CustomLogSetter; import org.apache.synapse.aspects.ComponentType; import org.apache.synapse.continuation.ContinuationStackManager; @@ -157,7 +158,7 @@ public boolean mediate(MessageContext synCtx) { boolean result = super.mediate(synCtx); - if (result && !skipAddition) { + if (result && !skipAddition && !ScatterGatherUtils.isScatterMessage(synCtx)) { // if flow completed remove the previously added SeqContinuationState ContinuationStackManager.removeSeqContinuationState(synCtx, sequenceType); } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java index 2ca6deede3..bbf7dd225e 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java @@ -100,19 +100,6 @@ public ForEachMediatorV2() { id = String.valueOf(new Random().nextLong()); } - /** - * Check whether the message is a foreach message or not - * - * @param synCtx MessageContext - * @return true if the message is a foreach message - */ - private static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) { - - Boolean isContinuationTriggeredMediatorWorker = - (Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER); - return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker; - } - @Override public boolean mediate(MessageContext synCtx) { @@ -262,44 +249,33 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat } boolean result; - // If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch - // otherwise start aggregation - if (isContinuationTriggeredFromMediatorWorker(synCtx)) { - synLog.traceOrDebug("Continuation is triggered from a mediator worker"); - if (continuationState.hasChild()) { - SequenceMediator branchSequence = target.getSequence(); - boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); - FlowContinuableMediator mediator = - (FlowContinuableMediator) branchSequence.getChild(continuationState.getChildContState().getPosition()); - - result = mediator.mediate(synCtx, continuationState.getChildContState()); - if (isStatisticsEnabled) { - ((Mediator) mediator).reportCloseStatistics(synCtx, null); - } - } else { + SequenceMediator branchSequence = target.getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + // If there are no children and the continuation was triggered from a mediator worker start aggregation + // otherwise mediate through the sub branch sequence + if (!continuationState.hasChild()) { + if (ScatterGatherUtils.isContinuationTriggeredFromMediatorWorker(synCtx)) { + synLog.traceOrDebug("Continuation is triggered from a mediator worker"); result = true; + } else { + synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence"); + result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); } } else { - synLog.traceOrDebug("Continuation is triggered from a callback"); - // If the continuation is triggered from a callback, continue the mediation from the continuation state - SequenceMediator branchSequence = target.getSequence(); - boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); - if (!continuationState.hasChild()) { - result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); - } else { - FlowContinuableMediator mediator = - (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition()); + synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state"); + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition()); - result = mediator.mediate(synCtx, continuationState.getChildContState()); - if (isStatisticsEnabled) { - ((Mediator) mediator).reportCloseStatistics(synCtx, null); - } + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); } + } + + if (result) { // If the mediation is completed, remove the child continuation state from the stack, so the aggregation // will continue the mediation from the parent continuation state ContinuationStackManager.removeReliantContinuationState(synCtx); - } - if (result) { return aggregateMessages(synCtx, synLog); } return false; @@ -432,7 +408,6 @@ private boolean completeAggregate(ForEachAggregate aggregate) { activeAggregates.remove(aggregate.getCorrelation()); // Update the continuation state to current mediator position as we are using the original message context ContinuationStackManager.updateSeqContinuationState(originalMessageContext, getMediatorPosition()); - SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(originalMessageContext); getLog(originalMessageContext).traceOrDebug("End : Foreach mediator"); boolean result = false; @@ -443,14 +418,19 @@ private boolean completeAggregate(ForEachAggregate aggregate) { CloseEventCollector.closeEntryEvent(originalMessageContext, getMediatorName(), ComponentType.MEDIATOR, statisticReportingIndex, isContentAltering()); } - - if (seqContinuationState != null) { - SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(originalMessageContext, seqContinuationState); - result = sequenceMediator.mediate(originalMessageContext, seqContinuationState); - if (RuntimeStatisticCollector.isStatisticsEnabled()) { - sequenceMediator.reportCloseStatistics(originalMessageContext, null); + do { + SeqContinuationState seqContinuationState = + (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(originalMessageContext); + if (seqContinuationState != null) { + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(originalMessageContext, seqContinuationState); + result = sequenceMediator.mediate(originalMessageContext, seqContinuationState); + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + sequenceMediator.reportCloseStatistics(originalMessageContext, null); + } + } else { + break; } - } + } while (result && !originalMessageContext.getContinuationStateStack().isEmpty()); CloseEventCollector.closeEventsAfterScatterGather(originalMessageContext); return result; } else { diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java index 5dbb70b9c9..9992ff7e10 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -23,7 +23,6 @@ import com.google.gson.JsonSyntaxException; import org.apache.axiom.om.OMAbstractFactory; import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMNode; import org.apache.axiom.om.util.AXIOMUtil; import org.apache.axiom.soap.SOAP11Constants; import org.apache.axiom.soap.SOAPEnvelope; @@ -262,47 +261,34 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat } boolean result; - // If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch - // otherwise start aggregation - if (isContinuationTriggeredFromMediatorWorker(synCtx)) { - synLog.traceOrDebug("Continuation is triggered from a mediator worker"); - if (continuationState.hasChild()) { - int subBranch = ((ReliantContinuationState) continuationState.getChildContState()).getSubBranch(); - SequenceMediator branchSequence = targets.get(subBranch).getSequence(); - boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); - FlowContinuableMediator mediator = - (FlowContinuableMediator) branchSequence.getChild(continuationState.getChildContState().getPosition()); - - result = mediator.mediate(synCtx, continuationState.getChildContState()); - if (isStatisticsEnabled) { - ((Mediator) mediator).reportCloseStatistics(synCtx, null); - } - } else { + int subBranch = ((ReliantContinuationState) continuationState).getSubBranch(); + + SequenceMediator branchSequence = targets.get(subBranch).getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + // If there are no children and the continuation was triggered from a mediator worker start aggregation + // otherwise mediate through the sub branch sequence + if (!continuationState.hasChild()) { + if (ScatterGatherUtils.isContinuationTriggeredFromMediatorWorker(synCtx)) { + synLog.traceOrDebug("Continuation is triggered from a mediator worker"); result = true; + } else { + synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence"); + result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); } } else { - synLog.traceOrDebug("Continuation is triggered from a callback"); - // If the continuation is triggered from a callback, continue the mediation from the continuation state - int subBranch = ((ReliantContinuationState) continuationState).getSubBranch(); + synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state"); + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition()); - SequenceMediator branchSequence = targets.get(subBranch).getSequence(); - boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); - if (!continuationState.hasChild()) { - result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); - } else { - FlowContinuableMediator mediator = - (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition()); - - result = mediator.mediate(synCtx, continuationState.getChildContState()); - if (isStatisticsEnabled) { - ((Mediator) mediator).reportCloseStatistics(synCtx, null); - } + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); } + } + if (result) { // If the mediation is completed, remove the child continuation state from the stack, so the aggregation // will continue the mediation from the parent continuation state ContinuationStackManager.removeReliantContinuationState(synCtx); - } - if (result) { return aggregateMessages(synCtx, synLog); } return false; @@ -490,8 +476,6 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr // Update the continuation state to current mediator position as we are using the original message context ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition()); } - - SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext); messageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); if (RuntimeStatisticCollector.isStatisticsEnabled()) { @@ -501,13 +485,19 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr getLog(messageContext).traceOrDebug("End : Scatter Gather mediator"); boolean result = false; - if (seqContinuationState != null) { - SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState); - result = sequenceMediator.mediate(messageContext, seqContinuationState); - if (RuntimeStatisticCollector.isStatisticsEnabled()) { - sequenceMediator.reportCloseStatistics(messageContext, null); + do { + SeqContinuationState seqContinuationState = + (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext); + if (seqContinuationState != null) { + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState); + result = sequenceMediator.mediate(messageContext, seqContinuationState); + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + sequenceMediator.reportCloseStatistics(messageContext, null); + } + } else { + break; } - } + } while (result && !messageContext.getContinuationStateStack().isEmpty()); CloseEventCollector.closeEventsAfterScatterGather(messageContext); return result; } @@ -556,29 +546,6 @@ private void setContentTypeHeader(Object resultValue, org.apache.axis2.context.M } } - private static void addChildren(List list, OMElement element) { - - for (Object item : list) { - if (item instanceof OMElement) { - element.addChild((OMElement) item); - } - } - } - - private static List getMatchingElements(MessageContext messageContext, SynapsePath expression) { - - Object o = expression.objectValueOf(messageContext); - if (o instanceof OMNode) { - List list = new ArrayList(); - list.add(o); - return list; - } else if (o instanceof List) { - return (List) o; - } else { - return new ArrayList(); - } - } - private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, Aggregate aggregate) { Object variable = null; @@ -730,19 +697,6 @@ public void setMaxMessagesToComplete(Value maxMessagesToComplete) { this.maxMessagesToComplete = maxMessagesToComplete; } - /** - * Check whether the message is a scatter message or not - * - * @param synCtx MessageContext - * @return true if the message is a scatter message - */ - private static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) { - - Boolean isContinuationTriggeredMediatorWorker = - (Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER); - return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker; - } - @Override public Integer reportOpenStatistics(MessageContext messageContext, boolean isContentAltering) { diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGatherUtils.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGatherUtils.java new file mode 100644 index 0000000000..8849923b63 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGatherUtils.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.mediators.v2; + +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; + +public class ScatterGatherUtils { + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + public static boolean isScatterMessage(MessageContext synCtx) { + + Boolean isScatterMessage = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); + return isScatterMessage != null && isScatterMessage; + } + + /** + * Check whether the message is a foreach message or not + * + * @param synCtx MessageContext + * @return true if the message is a foreach message + */ + public static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) { + + Boolean isContinuationTriggeredMediatorWorker = + (Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER); + return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker; + } +}