From 0ccc7e5b9f1098accbb588180a1d9a040c4143ea Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Mon, 4 Nov 2024 10:15:29 +0530 Subject: [PATCH 1/6] Add scatter gather mediator --- .../org/apache/synapse/SynapseConstants.java | 2 + .../collectors/CloseEventCollector.java | 13 + .../handling/event/CloseEventHandler.java | 7 + .../management/handling/span/SpanHandler.java | 10 + .../config/xml/MediatorFactoryFinder.java | 3 +- .../config/xml/MediatorSerializerFinder.java | 3 +- .../xml/ScatterGatherMediatorFactory.java | 136 ++++ .../xml/ScatterGatherMediatorSerializer.java | 84 +++ .../synapse/mediators/MediatorWorker.java | 45 +- .../mediators/base/SequenceMediator.java | 12 + .../mediators/eip/aggregator/Aggregate.java | 41 +- .../synapse/mediators/v2/ScatterGather.java | 710 ++++++++++++++++++ ...catterGatherMediatorSerializationTest.java | 51 ++ 13 files changed, 1108 insertions(+), 9 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java create mode 100755 modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java create mode 100644 modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java create mode 100644 modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java diff --git a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java index 4058874e53..edcdbeb1b0 100644 --- a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java @@ -626,4 +626,6 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE public static final String ANALYTICS_METADATA = "ANALYTICS_METADATA"; + public static final String SCATTER_MESSAGES = "SCATTER_MESSAGES"; + public static final String CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER = "CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER"; } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java index 4c7dfc361e..d6b197bd45 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java @@ -148,4 +148,17 @@ public static void tryEndFlow(MessageContext messageContext, String componentNam // closeFlowForcefully(messageContext); } } + + /** + * This method will close the event collector and finish the flow when a Scatter Gather mediator is used. + * + * @param messageContext synapse message context. + */ + public static void closeEventsAfterScatterGather(MessageContext messageContext) { + + if (isOpenTelemetryEnabled()) { + OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler() + .handleScatterGatherFinishEvent(messageContext); + } + } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java index 16522cbeb3..10b861417b 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java @@ -45,4 +45,11 @@ public interface CloseEventHandler { * @param synCtx Message context. */ void handleTryEndFlow(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx); + + /** + * Handles a close flow event. + * + * @param synCtx Message context. + */ + void handleScatterGatherFinishEvent(MessageContext synCtx); } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java index cbf5bf1218..9a5c1f57b9 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java @@ -430,6 +430,16 @@ private void handleCallbackFinishEvent(MessageContext messageContext) { } } + public void handleScatterGatherFinishEvent(MessageContext messageContext) { + TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext); + synchronized (tracingScope.getSpanStore()) { + cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext); + SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper(); + tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext); + tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId()); + } + } + @Override public void handleStateStackInsertion(MessageContext synCtx, String seqName, SequenceType seqType) { TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java index 49b3a3ff27..cda214f563 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java @@ -103,7 +103,8 @@ public class MediatorFactoryFinder implements XMLToObjectMapper { ForEachMediatorFactory.class, JSONTransformMediatorFactory.class, NTLMMediatorFactory.class, - VariableMediatorFactory.class + VariableMediatorFactory.class, + ScatterGatherMediatorFactory.class }; private final static MediatorFactoryFinder instance = new MediatorFactoryFinder(); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java index 174d90511c..eb12240762 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java @@ -77,7 +77,8 @@ public class MediatorSerializerFinder { ForEachMediatorSerializer.class, JSONTransformMediatorSerializer.class, NTLMMediatorSerializer.class, - VariableMediatorSerializer.class + VariableMediatorSerializer.class, + ScatterGatherMediatorSerializer.class }; private final static MediatorSerializerFinder instance = new MediatorSerializerFinder(); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java new file mode 100644 index 0000000000..b630259471 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +import org.apache.axiom.om.OMAttribute; +import org.apache.axiom.om.OMElement; +import org.apache.synapse.Mediator; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.v2.ScatterGather; +import org.jaxen.JaxenException; + +import java.util.Iterator; +import java.util.Properties; +import javax.xml.namespace.QName; + +/** + * The <scatter-gather> mediator is used to copy messages in Synapse to similar messages but with + * different message contexts and aggregate the responses back. + * + *
+ * <scatter-gather parallel-execution=(true | false)>
+ *   <aggregation value-to-aggregate="expression" condition="expression" timeout="long"
+ *     min-messages="expression" max-messages="expression"/>
+ *   <target>
+ *     <sequence>
+ *       (mediator)+
+ *     </sequence>
+ *   </target>+
+ * </scatter-gather>
+ * 
+ */ +public class ScatterGatherMediatorFactory extends AbstractMediatorFactory { + + /** + * This will hold the QName of the clone mediator element in the xml configuration + */ + private static final QName SCATTER_GATHER_Q + = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather"); + private static final QName ELEMENT_AGGREGATE_Q + = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation"); + private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value-to-aggregate"); + private static final QName ATT_CONDITION = new QName("condition"); + private static final QName ATT_TIMEOUT = new QName("timeout"); + private static final QName ATT_MIN_MESSAGES = new QName("min-messages"); + private static final QName ATT_MAX_MESSAGES = new QName("max-messages"); + private static final QName TARGET_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "target"); + private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution"); + + public Mediator createSpecificMediator(OMElement elem, Properties properties) { + + boolean asynchronousExe = true; + + ScatterGather mediator = new ScatterGather(); + processAuditStatus(mediator, elem); + + OMAttribute parallelExecAttr = elem.getAttribute(PARALLEL_EXEC_Q); + if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) { + asynchronousExe = false; + } + + mediator.setParallelExecution(asynchronousExe); + + Iterator targetElements = elem.getChildrenWithName(TARGET_Q); + while (targetElements.hasNext()) { + Target target = TargetFactory.createTarget((OMElement) targetElements.next(), properties); + target.setAsynchronous(asynchronousExe); + mediator.addTarget(target); + } + + OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q); + if (aggregateElement != null) { + OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_VALUE_TO_AGGREGATE); + if (aggregateExpr != null) { + try { + mediator.setAggregationExpression( + SynapsePathFactory.getSynapsePath(aggregateElement, ATT_VALUE_TO_AGGREGATE)); + } catch (JaxenException e) { + handleException("Unable to load the aggregating expression", e); + } + } + + OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION); + if (conditionExpr != null) { + try { + mediator.setCorrelateExpression( + SynapsePathFactory.getSynapsePath(aggregateElement, ATT_CONDITION)); + } catch (JaxenException e) { + handleException("Unable to load the condition expression", e); + } + } + + OMAttribute completeTimeout = aggregateElement.getAttribute(ATT_TIMEOUT); + if (completeTimeout != null) { + mediator.setCompletionTimeoutMillis(Long.parseLong(completeTimeout.getAttributeValue())); + } + + OMAttribute minMessages = aggregateElement.getAttribute(ATT_MIN_MESSAGES); + if (minMessages != null) { + mediator.setMinMessagesToComplete(new ValueFactory().createValue("min-messages", aggregateElement)); + } + + OMAttribute maxMessages = aggregateElement.getAttribute(ATT_MAX_MESSAGES); + if (maxMessages != null) { + mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement)); + } + } + addAllCommentChildrenToList(elem, mediator.getCommentsList()); + return mediator; + } + + /** + * This method will implement the getTagQName method of the MediatorFactory interface + * + * @return QName of the clone element in xml configuration + */ + public QName getTagQName() { + + return SCATTER_GATHER_Q; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java new file mode 100755 index 0000000000..b585078f9a --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +import org.apache.axiom.om.OMElement; +import org.apache.synapse.Mediator; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.v2.ScatterGather; + +/** + * Serializer for {@link ScatterGather} instances. + */ +public class ScatterGatherMediatorSerializer extends AbstractMediatorSerializer { + + public OMElement serializeSpecificMediator(Mediator m) { + + ScatterGather scatterGatherMediator = null; + if (!(m instanceof ScatterGather)) { + handleException("Unsupported mediator passed in for serialization : " + m.getType()); + } else { + scatterGatherMediator = (ScatterGather) m; + } + + assert scatterGatherMediator != null; + OMElement scatterGatherElement = fac.createOMElement("scatter-gather", synNS); + saveTracingState(scatterGatherElement, scatterGatherMediator); + + scatterGatherElement.addAttribute(fac.createOMAttribute( + "parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution()))); + OMElement aggregationElement = fac.createOMElement("aggregation", synNS); + + SynapsePathSerializer.serializePath( + scatterGatherMediator.getAggregationExpression(), aggregationElement, "value-to-aggregate"); + + if (scatterGatherMediator.getCorrelateExpression() != null) { + SynapsePathSerializer.serializePath( + scatterGatherMediator.getAggregationExpression(), aggregationElement, "condition"); + } + + if (scatterGatherMediator.getCompletionTimeoutMillis() != 0) { + aggregationElement.addAttribute(fac.createOMAttribute( + "timeout", nullNS, Long.toString(scatterGatherMediator.getCompletionTimeoutMillis()))); + } + if (scatterGatherMediator.getMinMessagesToComplete() != null) { + new ValueSerializer().serializeValue( + scatterGatherMediator.getMinMessagesToComplete(), "min-messages", aggregationElement); + } + if (scatterGatherMediator.getMaxMessagesToComplete() != null) { + new ValueSerializer().serializeValue( + scatterGatherMediator.getMaxMessagesToComplete(), "max-messages", aggregationElement); + } + scatterGatherElement.addChild(aggregationElement); + + for (Target target : scatterGatherMediator.getTargets()) { + if (target != null) { + scatterGatherElement.addChild(TargetSerializer.serializeTarget(target)); + } + } + serializeComments(scatterGatherElement, scatterGatherMediator.getCommentsList()); + + return scatterGatherElement; + } + + public String getMediatorClassName() { + + return ScatterGather.class.getName(); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java index 1afe6f142c..ae7fffa435 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java @@ -19,13 +19,24 @@ package org.apache.synapse.mediators; -import org.apache.synapse.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.synapse.FaultHandler; +import org.apache.synapse.Mediator; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseException; +import org.apache.synapse.aspects.ComponentType; import org.apache.synapse.aspects.flow.statistics.StatisticsCloseEventListener; +import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; +import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; import org.apache.synapse.carbonext.TenantInfoConfigurator; +import org.apache.synapse.continuation.ContinuationStackManager; +import org.apache.synapse.continuation.SeqContinuationState; import org.apache.synapse.debug.SynapseDebugManager; +import org.apache.synapse.mediators.base.SequenceMediator; import org.apache.synapse.util.logging.LoggingUtils; /** @@ -86,7 +97,25 @@ public void run() { debugManager.advertiseMediationFlowStartPoint(synCtx); } - seq.mediate(synCtx); + // If this is a scatter message, then we need to use the continuation state and continue the mediation + if (isScatterMessage(synCtx)) { + boolean result = seq.mediate(synCtx); + if (result) { + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx); + if (seqContinuationState == null) { + return; + } + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState); + + FlowContinuableMediator mediator = + (FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition()); + + synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true); + mediator.mediate(synCtx, seqContinuationState); + } + } else { + seq.mediate(synCtx); + } //((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard(); } catch (SynapseException syne) { @@ -150,4 +179,16 @@ private void warn(boolean traceOn, String msg, MessageContext msgContext) { public void setStatisticsCloseEventListener(StatisticsCloseEventListener statisticsCloseEventListener) { this.statisticsCloseEventListener = statisticsCloseEventListener; } + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + private static boolean isScatterMessage(MessageContext synCtx) { + + Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); + return isSkipContinuationState != null && isSkipContinuationState; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java index ccf33c72ec..2c28222924 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java @@ -521,4 +521,16 @@ public void setComponentStatisticsId(ArtifactHolder holder) { StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.SEQUENCE, holder); } } + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + private static boolean isScatterMessage(MessageContext synCtx) { + + Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); + return isSkipContinuationState != null && isSkipContinuationState; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java index 3a2f621511..b3f813521c 100755 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java @@ -26,6 +26,7 @@ import org.apache.synapse.SynapseLog; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.mediators.eip.EIPConstants; +import org.apache.synapse.mediators.v2.ScatterGather; import java.util.ArrayList; import java.util.List; @@ -50,6 +51,7 @@ public class Aggregate extends TimerTask { private String correlation = null; /** The AggregateMediator that should be invoked on completion of the aggregation */ private AggregateMediator aggregateMediator = null; + private ScatterGather scatterGatherMediator = null; private List messages = new ArrayList(); private boolean locked = false; private boolean completed = false; @@ -87,6 +89,24 @@ public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMilli this.aggregateMediator = mediator; } + public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMillis, int min, + int max, ScatterGather scatterGatherMediator, FaultHandler faultHandler) { + + this.synEnv = synEnv; + this.correlation = corelation; + if (timeoutMillis > 0) { + expiryTimeMillis = System.currentTimeMillis() + timeoutMillis; + } + if (min > 0) { + minCount = min; + } + if (max > 0) { + maxCount = max; + } + this.faultHandler = faultHandler; + this.scatterGatherMediator = scatterGatherMediator; + } + /** * Add a message to the interlan message list * @@ -118,9 +138,15 @@ public synchronized boolean isComplete(SynapseLog synLog) { // get total messages for this group, from the first message we have collected MessageContext mc = messages.get(0); - Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + - (aggregateMediator.getId() != null ? "." + aggregateMediator.getId() : "")); - + Object prop; + if (aggregateMediator != null) { + prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + + (aggregateMediator.getId() != null ? "." + aggregateMediator.getId() : "")); + } else { + prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + + (scatterGatherMediator.getId() != null ? "." + scatterGatherMediator.getId() : "")); + } + if (prop != null && prop instanceof String) { String[] msgSequence = prop.toString().split( EIPConstants.MESSAGE_SEQUENCE_DELEMITER); @@ -264,8 +290,13 @@ private class AggregateTimeout implements Runnable { public void run() { MessageContext messageContext = aggregate.getLastMessage(); try { - log.warn("Aggregate mediator timeout occurred."); - aggregateMediator.completeAggregate(aggregate); + if (aggregateMediator != null) { + log.warn("Aggregate mediator timeout occurred."); + aggregateMediator.completeAggregate(aggregate); + } else { + log.warn("Scatter Gather mediator timeout occurred."); + scatterGatherMediator.completeAggregate(aggregate); + } } catch (Exception ex) { if (faultHandler != null && messageContext != null) { faultHandler.handleFault(messageContext, ex); diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java new file mode 100644 index 0000000000..589ab2b7d5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -0,0 +1,710 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.mediators.v2; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonSyntaxException; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.OperationContext; +import org.apache.synapse.ContinuationState; +import org.apache.synapse.ManagedLifecycle; +import org.apache.synapse.Mediator; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseException; +import org.apache.synapse.SynapseLog; +import org.apache.synapse.aspects.AspectConfiguration; +import org.apache.synapse.aspects.ComponentType; +import org.apache.synapse.aspects.flow.statistics.StatisticIdentityGenerator; +import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; +import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder; +import org.apache.synapse.aspects.flow.statistics.util.StatisticDataCollectionHelper; +import org.apache.synapse.commons.json.JsonUtil; +import org.apache.synapse.config.xml.SynapsePath; +import org.apache.synapse.continuation.ContinuationStackManager; +import org.apache.synapse.continuation.ReliantContinuationState; +import org.apache.synapse.continuation.SeqContinuationState; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.mediators.AbstractMediator; +import org.apache.synapse.mediators.FlowContinuableMediator; +import org.apache.synapse.mediators.Value; +import org.apache.synapse.mediators.base.SequenceMediator; +import org.apache.synapse.mediators.eip.EIPConstants; +import org.apache.synapse.mediators.eip.EIPUtils; +import org.apache.synapse.mediators.eip.SharedDataHolder; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.eip.aggregator.Aggregate; +import org.apache.synapse.transport.passthru.util.RelayUtils; +import org.apache.synapse.util.MessageHelper; +import org.apache.synapse.util.xpath.SynapseJsonPath; +import org.apache.synapse.util.xpath.SynapseXPath; +import org.jaxen.JaxenException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Timer; +import javax.xml.stream.XMLStreamException; + +public class ScatterGather extends AbstractMediator implements ManagedLifecycle, FlowContinuableMediator { + + private final Object lock = new Object(); + private final Map activeAggregates = Collections.synchronizedMap(new HashMap<>()); + private String id; + private List targets = new ArrayList<>(); + private long completionTimeoutMillis = 0; + private Value maxMessagesToComplete; + private Value minMessagesToComplete; + private SynapsePath correlateExpression = null; + private SynapsePath aggregationExpression = null; + private boolean parallelExecution = true; + private Integer statisticReportingIndex; + + public ScatterGather() { + + id = String.valueOf(new Random().nextLong()); + } + + public void setParallelExecution(boolean parallelExecution) { + + this.parallelExecution = parallelExecution; + } + + public boolean getParallelExecution() { + + return this.parallelExecution; + } + + public String getId() { + + return id; + } + + @Override + public boolean mediate(MessageContext synCtx) { + + boolean aggregationResult = false; + + SynapseLog synLog = getLog(synCtx); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Start : Scatter Gather mediator"); + + if (synLog.isTraceTraceEnabled()) { + synLog.traceTrace("Message : " + synCtx.getEnvelope()); + } + } + + synCtx.setProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : + EIPConstants.EIP_SHARED_DATA_HOLDER, new SharedDataHolder()); + Iterator iter = targets.iterator(); + int i = 0; + while (iter.hasNext()) { + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Submitting " + (i + 1) + " of " + targets.size() + + " messages for " + (parallelExecution ? "parallel processing" : "sequential processing")); + } + + MessageContext clonedMsgCtx = getClonedMessageContext(synCtx, i++, targets.size()); + ContinuationStackManager.addReliantContinuationState(clonedMsgCtx, i - 1, getMediatorPosition()); + boolean result = iter.next().mediate(clonedMsgCtx); + if (!parallelExecution && result) { + aggregationResult = aggregateMessages(clonedMsgCtx, synLog); + } + } + OperationContext opCtx + = ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext(); + if (opCtx != null) { + opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP"); + } + return aggregationResult; + } + + public void init(SynapseEnvironment se) { + + for (Target target : targets) { + ManagedLifecycle seq = target.getSequence(); + if (seq != null) { + seq.init(se); + } + } + } + + public void destroy() { + + for (Target target : targets) { + ManagedLifecycle seq = target.getSequence(); + if (seq != null) { + seq.destroy(); + } + } + } + + /** + * Clone the provided message context as a new message, and set the aggregation ID and the message sequence count + * + * @param synCtx - MessageContext which is subjected to the cloning + * @param messageSequence - the position of this message of the cloned set + * @param messageCount - total of cloned copies + * @return MessageContext the cloned message context + */ + private MessageContext getClonedMessageContext(MessageContext synCtx, int messageSequence, int messageCount) { + + MessageContext newCtx = null; + try { + newCtx = MessageHelper.cloneMessageContext(synCtx); + // Set isServerSide property in the cloned message context + ((Axis2MessageContext) newCtx).getAxis2MessageContext().setServerSide( + ((Axis2MessageContext) synCtx).getAxis2MessageContext().isServerSide()); + // Set the SCATTER_MESSAGES property to the cloned message context which will be used by the MediatorWorker + // to continue the mediation from the continuation state + newCtx.setProperty(SynapseConstants.SCATTER_MESSAGES, true); + if (id != null) { + newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id, synCtx.getMessageID()); + newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id, messageSequence + + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount); + } else { + newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE, messageSequence + + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount); + } + } catch (AxisFault axisFault) { + handleException("Error cloning the message context", axisFault, synCtx); + } + return newCtx; + } + + public List getTargets() { + + return targets; + } + + public void setTargets(List targets) { + + this.targets = targets; + } + + public void addTarget(Target target) { + + this.targets.add(target); + } + + public SynapsePath getAggregationExpression() { + + return aggregationExpression; + } + + public void setAggregationExpression(SynapsePath aggregationExpression) { + + this.aggregationExpression = aggregationExpression; + } + + @Override + public boolean mediate(MessageContext synCtx, ContinuationState continuationState) { + + SynapseLog synLog = getLog(synCtx); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Scatter Gather mediator : Mediating from ContinuationState"); + } + + boolean result; + // If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch + // otherwise start aggregation + if (isContinuationTriggeredFromMediatorWorker(synCtx)) { + if (continuationState.hasChild()) { + int subBranch = ((ReliantContinuationState) continuationState.getChildContState()).getSubBranch(); + SequenceMediator branchSequence = targets.get(subBranch).getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getChildContState().getPosition()); + + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); + } + } else { + result = true; + } + } else { + // If the continuation is triggered from a callback, continue the mediation from the continuation state + int subBranch = ((ReliantContinuationState) continuationState).getSubBranch(); + + SequenceMediator branchSequence = targets.get(subBranch).getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + if (!continuationState.hasChild()) { + result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); + } else { + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition()); + + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); + } + } + // If the mediation is completed, remove the child continuation state from the stack, so the aggregation + // will continue the mediation from the parent continuation state + ContinuationStackManager.removeReliantContinuationState(synCtx); + } + if (result) { + return aggregateMessages(synCtx, synLog); + } + return false; + } + + private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { + + Aggregate aggregate = null; + String correlationIdName = (id != null ? EIPConstants.AGGREGATE_CORRELATION + "." + id : + EIPConstants.AGGREGATE_CORRELATION); + + Object correlationID = synCtx.getProperty(correlationIdName); + String correlation; + + Object result = null; + if (correlateExpression != null) { + try { + result = correlateExpression instanceof SynapseXPath ? correlateExpression.evaluate(synCtx) : + ((SynapseJsonPath) correlateExpression).evaluate(synCtx); + } catch (JaxenException e) { + handleException("Unable to execute the XPATH over the message", e, synCtx); + } + if (result instanceof List) { + if (((List) result).isEmpty()) { + handleException("Failed to evaluate correlate expression: " + correlateExpression.toString(), synCtx); + } + } + if (result instanceof Boolean) { + if (!(Boolean) result) { + return true; + } + } + } + if (result != null) { + while (aggregate == null) { + synchronized (lock) { + if (activeAggregates.containsKey(correlateExpression.toString())) { + aggregate = activeAggregates.get(correlateExpression.toString()); + if (aggregate != null) { + if (!aggregate.getLock()) { + aggregate = null; + } + } + } else { + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Creating new Aggregator - " + + (completionTimeoutMillis > 0 ? "expires in : " + + (completionTimeoutMillis / 1000) + "secs" : + "without expiry time")); + } + if (isAggregationCompleted(synCtx)) { + return false; + } + + Double minMsg = -1.0; + if (minMessagesToComplete != null) { + minMsg = Double.parseDouble(minMessagesToComplete.evaluateValue(synCtx)); + } + Double maxMsg = -1.0; + if (maxMessagesToComplete != null) { + maxMsg = Double.parseDouble(maxMessagesToComplete.evaluateValue(synCtx)); + } + + aggregate = new Aggregate( + synCtx.getEnvironment(), + correlateExpression.toString(), + completionTimeoutMillis, + minMsg.intValue(), + maxMsg.intValue(), this, synCtx.getFaultStack().peek()); + + if (completionTimeoutMillis > 0) { + synCtx.getConfiguration().getSynapseTimer(). + schedule(aggregate, completionTimeoutMillis); + } + aggregate.getLock(); + activeAggregates.put(correlateExpression.toString(), aggregate); + } + } + } + } else if (correlationID instanceof String) { + correlation = (String) correlationID; + while (aggregate == null) { + synchronized (lock) { + if (activeAggregates.containsKey(correlation)) { + aggregate = activeAggregates.get(correlation); + if (aggregate != null) { + if (!aggregate.getLock()) { + aggregate = null; + } + } else { + break; + } + } else { + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Creating new Aggregator - " + + (completionTimeoutMillis > 0 ? "expires in : " + + (completionTimeoutMillis / 1000) + "secs" : + "without expiry time")); + } + if (isAggregationCompleted(synCtx)) { + return false; + } + + Double minMsg = -1.0; + if (minMessagesToComplete != null) { + minMsg = Double.parseDouble(minMessagesToComplete.evaluateValue(synCtx)); + } + Double maxMsg = -1.0; + if (maxMessagesToComplete != null) { + maxMsg = Double.parseDouble(maxMessagesToComplete.evaluateValue(synCtx)); + } + aggregate = new Aggregate( + synCtx.getEnvironment(), + correlation, + completionTimeoutMillis, + minMsg.intValue(), + maxMsg.intValue(), this, synCtx.getFaultStack().peek()); + + if (completionTimeoutMillis > 0) { + synchronized (aggregate) { + if (!aggregate.isCompleted()) { + try { + synCtx.getConfiguration().getSynapseTimer(). + schedule(aggregate, completionTimeoutMillis); + } catch (IllegalStateException e) { + log.warn("Synapse timer already cancelled. Resetting Synapse timer"); + synCtx.getConfiguration().setSynapseTimer(new Timer(true)); + synCtx.getConfiguration().getSynapseTimer(). + schedule(aggregate, completionTimeoutMillis); + } + } + } + } + aggregate.getLock(); + activeAggregates.put(correlation, aggregate); + } + } + } + } else { + synLog.traceOrDebug("Unable to find aggregation correlation property"); + return true; + } + // if there is an aggregate continue on aggregation + if (aggregate != null) { + boolean collected = aggregate.addMessage(synCtx); + if (synLog.isTraceOrDebugEnabled()) { + if (collected) { + synLog.traceOrDebug("Collected a message during aggregation"); + if (synLog.isTraceTraceEnabled()) { + synLog.traceTrace("Collected message : " + synCtx); + } + } + } + if (aggregate.isComplete(synLog)) { + synLog.traceOrDebug("Aggregation completed"); + boolean onCompleteSeqResult = completeAggregate(aggregate); + synLog.traceOrDebug("End : Scatter Gather mediator"); + return onCompleteSeqResult; + } else { + aggregate.releaseLock(); + } + } else { + synLog.traceOrDebug("Unable to find an aggregate for this message - skip"); + return true; + } + return false; + } + + private boolean isAggregationCompleted(MessageContext synCtx) { + + Object aggregateTimeoutHolderObj = + synCtx.getProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : + EIPConstants.EIP_SHARED_DATA_HOLDER); + + if (aggregateTimeoutHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; + if (sharedDataHolder.isAggregationCompleted()) { + if (log.isDebugEnabled()) { + log.debug("Received a response for already completed Aggregate"); + } + return true; + } + } + return false; + } + + public boolean completeAggregate(Aggregate aggregate) { + + boolean markedCompletedNow = false; + boolean wasComplete = aggregate.isCompleted(); + if (wasComplete) { + return false; + } + + if (log.isDebugEnabled()) { + log.debug("Aggregation completed or timed out"); + } + + // cancel the timer + synchronized (this) { + if (!aggregate.isCompleted()) { + aggregate.cancel(); + aggregate.setCompleted(true); + + MessageContext lastMessage = aggregate.getLastMessage(); + if (lastMessage != null) { + Object aggregateTimeoutHolderObj = + lastMessage.getProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : + EIPConstants.EIP_SHARED_DATA_HOLDER); + + if (aggregateTimeoutHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; + sharedDataHolder.markAggregationCompletion(); + } + } + markedCompletedNow = true; + } + } + + if (!markedCompletedNow) { + return false; + } + + MessageContext newSynCtx = getAggregatedMessage(aggregate); + + if (newSynCtx == null) { + log.warn("An aggregation of messages timed out with no aggregated messages", null); + return false; + } + aggregate.clear(); + activeAggregates.remove(aggregate.getCorrelation()); + newSynCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false); + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(newSynCtx); + boolean result = false; + + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + CloseEventCollector.closeEntryEvent(newSynCtx, getMediatorName(), ComponentType.MEDIATOR, + statisticReportingIndex, isContentAltering()); + } + + if (seqContinuationState != null) { + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(newSynCtx, seqContinuationState); + result = sequenceMediator.mediate(newSynCtx, seqContinuationState); + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + sequenceMediator.reportCloseStatistics(newSynCtx, null); + } + } + CloseEventCollector.closeEventsAfterScatterGather(newSynCtx); + return result; + } + + private MessageContext getAggregatedMessage(Aggregate aggregate) { + + MessageContext newCtx = null; + JsonArray jsonArray = new JsonArray(); + JsonElement result; + boolean isJSONAggregation = aggregationExpression instanceof SynapseJsonPath; + + for (MessageContext synCtx : aggregate.getMessages()) { + if (newCtx == null) { + try { + newCtx = MessageHelper.cloneMessageContext(synCtx, true, false, true); + } catch (AxisFault axisFault) { + handleException(aggregate, "Error creating a copy of the message", axisFault, synCtx); + } + + if (log.isDebugEnabled()) { + log.debug("Generating Aggregated message from : " + newCtx.getEnvelope()); + } + if (isJSONAggregation) { + jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression)); + } else { + try { + EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx, (SynapseXPath) aggregationExpression); + } catch (JaxenException e) { + handleException(aggregate, "Error merging aggregation results using XPath : " + + aggregationExpression.toString(), e, synCtx); + } + } + } else { + try { + if (log.isDebugEnabled()) { + log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " + + aggregationExpression); + } + // When the target sequences are not content aware, the message builder wont get triggered. + // Therefore, we need to build the message to do the aggregation. + RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext()); + if (isJSONAggregation) { + jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression)); + } else { + EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx.getEnvelope(), synCtx, (SynapseXPath) + aggregationExpression); + } + + if (log.isDebugEnabled()) { + log.debug("Merged result : " + newCtx.getEnvelope()); + } + } catch (JaxenException e) { + handleException(aggregate, "Error merging aggregation results using XPath : " + + aggregationExpression.toString(), e, synCtx); + } catch (SynapseException e) { + handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx); + } catch (JsonSyntaxException e) { + handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx); + } catch (IOException e) { + handleException(aggregate, "IO Error occurred while building the message", e, synCtx); + } catch (XMLStreamException e) { + handleException(aggregate, "XML Error occurred while building the message", e, synCtx); + } + } + } + + result = jsonArray; + + StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx); + if (isJSONAggregation) { + // setting the new JSON payload to the messageContext + try { + JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), new + ByteArrayInputStream(result.toString().getBytes()), true, true); + } catch (AxisFault axisFault) { + log.error("Error occurred while setting the new JSON payload to the msg context", axisFault); + } + } else { + // Removing the JSON stream after aggregated using XML path. + // This will fix inconsistent behaviour in logging the payload. + ((Axis2MessageContext) newCtx).getAxis2MessageContext() + .removeProperty(org.apache.synapse.commons.json.Constants.ORG_APACHE_SYNAPSE_COMMONS_JSON_JSON_INPUT_STREAM); + } + return newCtx; + } + + public SynapsePath getCorrelateExpression() { + + return correlateExpression; + } + + public void setCorrelateExpression(SynapsePath correlateExpression) { + + this.correlateExpression = correlateExpression; + this.id = null; + } + + public long getCompletionTimeoutMillis() { + + return completionTimeoutMillis; + } + + public void setCompletionTimeoutMillis(long completionTimeoutMillis) { + + this.completionTimeoutMillis = completionTimeoutMillis; + } + + public Value getMinMessagesToComplete() { + + return minMessagesToComplete; + } + + public void setMinMessagesToComplete(Value minMessagesToComplete) { + + this.minMessagesToComplete = minMessagesToComplete; + } + + public Value getMaxMessagesToComplete() { + + return maxMessagesToComplete; + } + + public void setMaxMessagesToComplete(Value maxMessagesToComplete) { + + this.maxMessagesToComplete = maxMessagesToComplete; + } + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + private static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) { + + Boolean isContinuationTriggeredMediatorWorker = + (Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER); + return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker; + } + + @Override + public Integer reportOpenStatistics(MessageContext messageContext, boolean isContentAltering) { + + statisticReportingIndex = OpenEventCollector.reportFlowContinuableEvent(messageContext, getMediatorName(), + ComponentType.MEDIATOR, getAspectConfiguration(), isContentAltering() || isContentAltering); + return statisticReportingIndex; + } + + @Override + public void reportCloseStatistics(MessageContext messageContext, Integer currentIndex) { + + // Do nothing here as the close event is reported in the completeAggregate method + } + + @Override + public void setComponentStatisticsId(ArtifactHolder holder) { + + if (getAspectConfiguration() == null) { + configure(new AspectConfiguration(getMediatorName())); + } + String sequenceId = + StatisticIdentityGenerator.getIdForFlowContinuableMediator(getMediatorName(), ComponentType.MEDIATOR, holder); + getAspectConfiguration().setUniqueId(sequenceId); + for (Target target : targets) { + target.setStatisticIdForMediators(holder); + } + + StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.MEDIATOR, holder); + } + + @Override + public boolean isContentAltering() { + + return true; + } + + private void handleException(Aggregate aggregate, String msg, Exception exception, MessageContext msgContext) { + + aggregate.clear(); + activeAggregates.clear(); + if (exception != null) { + super.handleException(msg, exception, msgContext); + } else { + super.handleException(msg, msgContext); + } + } +} diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java new file mode 100644 index 0000000000..bcef1aec66 --- /dev/null +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +/** + * Factory and Serializer tests for the ScatterGatherMediator + */ + +public class ScatterGatherMediatorSerializationTest extends AbstractTestCase { + + private ScatterGatherMediatorFactory scatterGatherMediatorFactory; + private ScatterGatherMediatorSerializer scatterGatherMediatorSerializer; + + public ScatterGatherMediatorSerializationTest() { + + super(ScatterGatherMediatorSerializer.class.getName()); + scatterGatherMediatorFactory = new ScatterGatherMediatorFactory(); + scatterGatherMediatorSerializer = new ScatterGatherMediatorSerializer(); + } + + public void testScatterGatherSerialization() { + + String inputXML = "" + + "" + + "{ \"pet\": { " + + "\"name\": \"pet1\", \"type\": \"dog\" }, " + + "\"status\": \"success\" }" + + "" + + "" + + "" + + ""; + + assertTrue(serialization(inputXML, scatterGatherMediatorFactory, scatterGatherMediatorSerializer)); + } +} From 94b201fb7e465afa16ce48e67b533e80d1d5cb65 Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Mon, 11 Nov 2024 08:55:02 +0530 Subject: [PATCH 2/6] Skip adding aggregates when the flow is completed --- .../org/apache/synapse/mediators/eip/aggregator/Aggregate.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java index b3f813521c..cbd7998886 100755 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java @@ -115,6 +115,9 @@ public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMilli */ public synchronized boolean addMessage(MessageContext synCtx) { if (maxCount <= 0 || (maxCount > 0 && messages.size() < maxCount)) { + if (messages == null) { + return false; + } messages.add(synCtx); return true; } else { From dad9df574ac6cc263d14d4a481fdea14e5cc1a45 Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Wed, 13 Nov 2024 15:18:29 +0530 Subject: [PATCH 3/6] Refactor scatter-gather synapse syntax --- .../xml/ScatterGatherMediatorFactory.java | 30 +++++++++++-------- .../xml/ScatterGatherMediatorSerializer.java | 7 +++-- ...catterGatherMediatorSerializationTest.java | 8 ++--- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java index b630259471..3af458912e 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java @@ -35,13 +35,11 @@ * *
  * <scatter-gather parallel-execution=(true | false)>
- *   <aggregation value-to-aggregate="expression" condition="expression" timeout="long"
+ *   <aggregation value="expression" condition="expression" timeout="long"
  *     min-messages="expression" max-messages="expression"/>
- *   <target>
- *     <sequence>
- *       (mediator)+
- *     </sequence>
- *   </target>+
+ *   <sequence>
+ *     (mediator)+
+ *   </sequence>+
  * </scatter-gather>
  * 
*/ @@ -54,14 +52,16 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory { = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather"); private static final QName ELEMENT_AGGREGATE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation"); - private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value-to-aggregate"); + private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value"); private static final QName ATT_CONDITION = new QName("condition"); private static final QName ATT_TIMEOUT = new QName("timeout"); private static final QName ATT_MIN_MESSAGES = new QName("min-messages"); private static final QName ATT_MAX_MESSAGES = new QName("max-messages"); - private static final QName TARGET_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "target"); + private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence"); private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution"); + private static final SequenceMediatorFactory fac = new SequenceMediatorFactory(); + public Mediator createSpecificMediator(OMElement elem, Properties properties) { boolean asynchronousExe = true; @@ -76,11 +76,15 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) { mediator.setParallelExecution(asynchronousExe); - Iterator targetElements = elem.getChildrenWithName(TARGET_Q); - while (targetElements.hasNext()) { - Target target = TargetFactory.createTarget((OMElement) targetElements.next(), properties); - target.setAsynchronous(asynchronousExe); - mediator.addTarget(target); + Iterator sequenceListElements = elem.getChildrenWithName(SEQUENCE_Q); + while (sequenceListElements.hasNext()) { + OMElement sequence = (OMElement) sequenceListElements.next(); + if (sequence != null) { + Target target = new Target(); + target.setSequence(fac.createAnonymousSequence(sequence, properties)); + target.setAsynchronous(asynchronousExe); + mediator.addTarget(target); + } } OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java index b585078f9a..78e3e8e44d 100755 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java @@ -46,7 +46,7 @@ public OMElement serializeSpecificMediator(Mediator m) { OMElement aggregationElement = fac.createOMElement("aggregation", synNS); SynapsePathSerializer.serializePath( - scatterGatherMediator.getAggregationExpression(), aggregationElement, "value-to-aggregate"); + scatterGatherMediator.getAggregationExpression(), aggregationElement, "value"); if (scatterGatherMediator.getCorrelateExpression() != null) { SynapsePathSerializer.serializePath( @@ -68,8 +68,9 @@ public OMElement serializeSpecificMediator(Mediator m) { scatterGatherElement.addChild(aggregationElement); for (Target target : scatterGatherMediator.getTargets()) { - if (target != null) { - scatterGatherElement.addChild(TargetSerializer.serializeTarget(target)); + if (target != null && target.getSequence() != null) { + SequenceMediatorSerializer serializer = new SequenceMediatorSerializer(); + serializer.serializeAnonymousSequence(scatterGatherElement, target.getSequence()); } } serializeComments(scatterGatherElement, scatterGatherMediator.getCommentsList()); diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java index bcef1aec66..a8572a5256 100644 --- a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java @@ -37,14 +37,14 @@ public ScatterGatherMediatorSerializationTest() { public void testScatterGatherSerialization() { String inputXML = "" + - "" + + "" + "{ \"pet\": { " + "\"name\": \"pet1\", \"type\": \"dog\" }, " + "\"status\": \"success\" }" + - "" + - "" + + "" + + "" + "" + - ""; + ""; assertTrue(serialization(inputXML, scatterGatherMediatorFactory, scatterGatherMediatorSerializer)); } From b45b4cacefe009606f780aa41b4d77ead00e24a9 Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Tue, 19 Nov 2024 09:38:11 +0530 Subject: [PATCH 4/6] Add synapse expression support and option to set output to a variable --- .../xml/ScatterGatherMediatorFactory.java | 40 +- .../xml/ScatterGatherMediatorSerializer.java | 5 + .../synapse/mediators/MediatorWorker.java | 2 +- .../mediators/base/SequenceMediator.java | 12 - .../synapse/mediators/eip/EIPUtils.java | 4 +- .../mediators/eip/SharedDataHolder.java | 17 + .../synapse/mediators/v2/ScatterGather.java | 370 ++++++++++++++---- ...catterGatherMediatorSerializationTest.java | 3 +- 8 files changed, 365 insertions(+), 88 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java index 3af458912e..81689a7edb 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java @@ -20,7 +20,9 @@ import org.apache.axiom.om.OMAttribute; import org.apache.axiom.om.OMElement; +import org.apache.commons.lang3.StringUtils; import org.apache.synapse.Mediator; +import org.apache.synapse.SynapseException; import org.apache.synapse.mediators.eip.Target; import org.apache.synapse.mediators.v2.ScatterGather; import org.jaxen.JaxenException; @@ -34,7 +36,7 @@ * different message contexts and aggregate the responses back. * *
- * <scatter-gather parallel-execution=(true | false)>
+ * <scatter-gather parallel-execution=(true | false) result-target=(body | variable) content-type=(JSON | XML)>
  *   <aggregation value="expression" condition="expression" timeout="long"
  *     min-messages="expression" max-messages="expression"/>
  *   <sequence>
@@ -59,6 +61,8 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
     private static final QName ATT_MAX_MESSAGES = new QName("max-messages");
     private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
     private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");
+    private static final QName RESULT_TARGET_Q = new QName("result-target");
+    private static final QName CONTENT_TYPE_Q = new QName("content-type");
 
     private static final SequenceMediatorFactory fac = new SequenceMediatorFactory();
 
@@ -73,10 +77,36 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
         if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) {
             asynchronousExe = false;
         }
-
         mediator.setParallelExecution(asynchronousExe);
 
+        OMAttribute contentTypeAttr = elem.getAttribute(CONTENT_TYPE_Q);
+        if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) {
+            String msg = "The 'content-type' attribute is required for the configuration of a Scatter Gather mediator";
+            throw new SynapseException(msg);
+        } else {
+            if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
+                mediator.setContentType(ScatterGather.JSON_TYPE);
+            } else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
+                mediator.setContentType(ScatterGather.XML_TYPE);
+            } else {
+                String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'";
+                throw new SynapseException(msg);
+            }
+        }
+
+        OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q);
+        if (resultTargetAttr == null || StringUtils.isBlank(resultTargetAttr.getAttributeValue())) {
+            String msg = "The 'result-target' attribute is required for the configuration of a Scatter Gather mediator";
+            throw new SynapseException(msg);
+        } else {
+            mediator.setResultTarget(resultTargetAttr.getAttributeValue());
+        }
+
         Iterator sequenceListElements = elem.getChildrenWithName(SEQUENCE_Q);
+        if (!sequenceListElements.hasNext()) {
+            String msg = "A 'sequence' element is required for the configuration of a Scatter Gather mediator";
+            throw new SynapseException(msg);
+        }
         while (sequenceListElements.hasNext()) {
             OMElement sequence = (OMElement) sequenceListElements.next();
             if (sequence != null) {
@@ -97,6 +127,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
                 } catch (JaxenException e) {
                     handleException("Unable to load the aggregating expression", e);
                 }
+            } else {
+                String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator";
+                throw new SynapseException(msg);
             }
 
             OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION);
@@ -123,6 +156,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
             if (maxMessages != null) {
                 mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement));
             }
+        } else {
+            String msg = "The 'aggregation' element is required for the configuration of a Scatter Gather mediator";
+            throw new SynapseException(msg);
         }
         addAllCommentChildrenToList(elem, mediator.getCommentsList());
         return mediator;
diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
index 78e3e8e44d..e63bad4c85 100755
--- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
+++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
@@ -43,6 +43,11 @@ public OMElement serializeSpecificMediator(Mediator m) {
 
         scatterGatherElement.addAttribute(fac.createOMAttribute(
                 "parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution())));
+        scatterGatherElement.addAttribute(fac.createOMAttribute(
+                "result-target", nullNS, scatterGatherMediator.getResultTarget()));
+        scatterGatherElement.addAttribute(fac.createOMAttribute(
+                "content-type", nullNS, scatterGatherMediator.getContentType()));
+
         OMElement aggregationElement = fac.createOMElement("aggregation", synNS);
 
         SynapsePathSerializer.serializePath(
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
index ae7fffa435..2d4258f342 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
@@ -154,7 +154,7 @@ public void run() {
                 debugManager.advertiseMediationFlowTerminatePoint(synCtx);
                 debugManager.releaseMediationFlowLock();
             }
-            if (RuntimeStatisticCollector.isStatisticsEnabled()) {
+            if (RuntimeStatisticCollector.isStatisticsEnabled() && !isScatterMessage(synCtx)) {
                 this.statisticsCloseEventListener.invokeCloseEventEntry(synCtx);
             }
         }
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java
index 2c28222924..ccf33c72ec 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java
@@ -521,16 +521,4 @@ public void setComponentStatisticsId(ArtifactHolder holder) {
             StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.SEQUENCE, holder);
         }
     }
-
-    /**
-     * Check whether the message is a scatter message or not
-     *
-     * @param synCtx MessageContext
-     * @return true if the message is a scatter message
-     */
-    private static boolean isScatterMessage(MessageContext synCtx) {
-
-        Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
-        return isSkipContinuationState != null && isSkipContinuationState;
-    }
 }
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
index f8b6b6f9f5..55870f16b2 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
@@ -189,7 +189,7 @@ public static void enrichEnvelope(SOAPEnvelope envelope, SOAPEnvelope enricher,
         }
     }
 
-    private static boolean isBody(OMElement body, OMElement enrichingElement) {
+    public static boolean isBody(OMElement body, OMElement enrichingElement) {
         try {
             return (body.getLocalName().equals(enrichingElement.getLocalName()) &&
                     body.getNamespace().getNamespaceURI().equals(enrichingElement.getNamespace().getNamespaceURI()));
@@ -198,7 +198,7 @@ private static boolean isBody(OMElement body, OMElement enrichingElement) {
         }
     }
 
-    private static boolean checkNotEmpty(List list) {
+    public static boolean checkNotEmpty(List list) {
         return list != null && !list.isEmpty();
     }
 
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java
index f87c0fffef..e0f782005f 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java
@@ -18,6 +18,8 @@
 
 package org.apache.synapse.mediators.eip;
 
+import org.apache.synapse.MessageContext;
+
 /**
  * This class is used to hold the shared data for a particular message set
  * For an example we can use this to share some data across all the spawned spilt messages in iterate mediator
@@ -29,6 +31,17 @@ public class SharedDataHolder {
      */
     private boolean isAggregationCompleted = false;
 
+    private MessageContext synCtx;
+
+    public SharedDataHolder() {
+
+    }
+
+    public SharedDataHolder(MessageContext synCtx) {
+
+        this.synCtx = synCtx;
+    }
+
     /**
      * Check whether aggregation has been completed.
      *
@@ -45,4 +58,8 @@ public void markAggregationCompletion() {
         isAggregationCompleted = true;
     }
 
+    public MessageContext getSynCtx() {
+
+        return synCtx;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
index 589ab2b7d5..876183b7fe 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
@@ -20,11 +20,18 @@
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 import com.google.gson.JsonSyntaxException;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMNode;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.context.OperationContext;
+import org.apache.http.protocol.HTTP;
 import org.apache.synapse.ContinuationState;
+import org.apache.synapse.JSONObjectExtensionException;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
@@ -39,6 +46,7 @@
 import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
 import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder;
 import org.apache.synapse.aspects.flow.statistics.util.StatisticDataCollectionHelper;
+import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants;
 import org.apache.synapse.commons.json.JsonUtil;
 import org.apache.synapse.config.xml.SynapsePath;
 import org.apache.synapse.continuation.ContinuationStackManager;
@@ -56,10 +64,8 @@
 import org.apache.synapse.mediators.eip.Target;
 import org.apache.synapse.mediators.eip.aggregator.Aggregate;
 import org.apache.synapse.transport.passthru.util.RelayUtils;
+import org.apache.synapse.util.JSONMergeUtils;
 import org.apache.synapse.util.MessageHelper;
-import org.apache.synapse.util.xpath.SynapseJsonPath;
-import org.apache.synapse.util.xpath.SynapseXPath;
-import org.jaxen.JaxenException;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -69,12 +75,19 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Timer;
+import javax.xml.namespace.QName;
 import javax.xml.stream.XMLStreamException;
 
+import static org.apache.synapse.SynapseConstants.XML_CONTENT_TYPE;
+import static org.apache.synapse.transport.passthru.PassThroughConstants.JSON_CONTENT_TYPE;
+
 public class ScatterGather extends AbstractMediator implements ManagedLifecycle, FlowContinuableMediator {
 
+    public static final String JSON_TYPE = "JSON";
+    public static final String XML_TYPE = "XML";
     private final Object lock = new Object();
     private final Map activeAggregates = Collections.synchronizedMap(new HashMap<>());
     private String id;
@@ -86,6 +99,9 @@ public class ScatterGather extends AbstractMediator implements ManagedLifecycle,
     private SynapsePath aggregationExpression = null;
     private boolean parallelExecution = true;
     private Integer statisticReportingIndex;
+    private String contentType;
+    private String resultTarget;
+    private SynapseEnvironment synapseEnv;
 
     public ScatterGather() {
 
@@ -122,8 +138,18 @@ public boolean mediate(MessageContext synCtx) {
             }
         }
 
-        synCtx.setProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id :
-                EIPConstants.EIP_SHARED_DATA_HOLDER, new SharedDataHolder());
+        MessageContext orginalMessageContext = null;
+        if (!isTargetBody()) {
+            try {
+                // Clone the original MessageContext and save it to continue the flow using it when the scatter gather
+                // output is set to a variable
+                orginalMessageContext = MessageHelper.cloneMessageContext(synCtx);
+            } catch (AxisFault e) {
+                handleException("Error cloning the message context", e, synCtx);
+            }
+        }
+
+        synCtx.setProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id, new SharedDataHolder(orginalMessageContext));
         Iterator iter = targets.iterator();
         int i = 0;
         while (iter.hasNext()) {
@@ -144,17 +170,21 @@ public boolean mediate(MessageContext synCtx) {
         if (opCtx != null) {
             opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
         }
+        synCtx.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
         return aggregationResult;
     }
 
-    public void init(SynapseEnvironment se) {
+    public void init(SynapseEnvironment synapseEnv) {
 
+        this.synapseEnv = synapseEnv;
         for (Target target : targets) {
             ManagedLifecycle seq = target.getSequence();
             if (seq != null) {
-                seq.init(se);
+                seq.init(synapseEnv);
             }
         }
+        // Registering the mediator for enabling continuation
+        synapseEnv.updateCallMediatorCount(true);
     }
 
     public void destroy() {
@@ -165,6 +195,8 @@ public void destroy() {
                 seq.destroy();
             }
         }
+        // Unregistering the mediator for continuation
+        synapseEnv.updateCallMediatorCount(false);
     }
 
     /**
@@ -186,14 +218,9 @@ private MessageContext getClonedMessageContext(MessageContext synCtx, int messag
             // Set the SCATTER_MESSAGES property to the cloned message context which will be used by the MediatorWorker
             // to continue the mediation from the continuation state
             newCtx.setProperty(SynapseConstants.SCATTER_MESSAGES, true);
-            if (id != null) {
-                newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id, synCtx.getMessageID());
-                newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id, messageSequence +
-                        EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount);
-            } else {
-                newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE, messageSequence +
-                        EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount);
-            }
+            newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id, synCtx.getMessageID());
+            newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id, messageSequence +
+                    EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount);
         } catch (AxisFault axisFault) {
             handleException("Error cloning the message context", axisFault, synCtx);
         }
@@ -282,28 +309,35 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
     private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
 
         Aggregate aggregate = null;
-        String correlationIdName = (id != null ? EIPConstants.AGGREGATE_CORRELATION + "." + id :
-                EIPConstants.AGGREGATE_CORRELATION);
+        String correlationIdName = EIPConstants.AGGREGATE_CORRELATION + "." + id;
 
         Object correlationID = synCtx.getProperty(correlationIdName);
         String correlation;
 
         Object result = null;
+        // When the target sequences are not content aware, the message builder wont get triggered.
+        // Therefore, we need to build the message to do the aggregation.
+        try {
+            RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext());
+        } catch (IOException | XMLStreamException e) {
+            handleException("Error building the message", e, synCtx);
+        }
+
         if (correlateExpression != null) {
-            try {
-                result = correlateExpression instanceof SynapseXPath ? correlateExpression.evaluate(synCtx) :
-                        ((SynapseJsonPath) correlateExpression).evaluate(synCtx);
-            } catch (JaxenException e) {
-                handleException("Unable to execute the XPATH over the message", e, synCtx);
-            }
+            result = correlateExpression.objectValueOf(synCtx);
             if (result instanceof List) {
                 if (((List) result).isEmpty()) {
                     handleException("Failed to evaluate correlate expression: " + correlateExpression.toString(), synCtx);
                 }
             }
+            if (result instanceof JsonPrimitive) {
+                if (!((JsonPrimitive) result).getAsBoolean()) {
+                    return false;
+                }
+            }
             if (result instanceof Boolean) {
                 if (!(Boolean) result) {
-                    return true;
+                    return false;
                 }
             }
         }
@@ -444,9 +478,7 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
 
     private boolean isAggregationCompleted(MessageContext synCtx) {
 
-        Object aggregateTimeoutHolderObj =
-                synCtx.getProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id :
-                        EIPConstants.EIP_SHARED_DATA_HOLDER);
+        Object aggregateTimeoutHolderObj = synCtx.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id);
 
         if (aggregateTimeoutHolderObj != null) {
             SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj;
@@ -481,8 +513,7 @@ public boolean completeAggregate(Aggregate aggregate) {
                 MessageContext lastMessage = aggregate.getLastMessage();
                 if (lastMessage != null) {
                     Object aggregateTimeoutHolderObj =
-                            lastMessage.getProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id :
-                                    EIPConstants.EIP_SHARED_DATA_HOLDER);
+                            lastMessage.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id);
 
                     if (aggregateTimeoutHolderObj != null) {
                         SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj;
@@ -497,40 +528,222 @@ public boolean completeAggregate(Aggregate aggregate) {
             return false;
         }
 
-        MessageContext newSynCtx = getAggregatedMessage(aggregate);
+        if (isTargetBody()) {
+            MessageContext newSynCtx = getAggregatedMessage(aggregate);
+            return processAggregation(newSynCtx, aggregate);
+        } else {
+            MessageContext originalMessageContext = getOriginalMessageContext(aggregate);
+            if (originalMessageContext != null) {
+                setAggregatedMessageAsVariable(originalMessageContext, aggregate);
+                return processAggregation(originalMessageContext, aggregate);
+            } else {
+                handleException(aggregate, "Error retrieving the original message context", null, aggregate.getLastMessage());
+                return false;
+            }
+        }
+    }
+
+    private boolean processAggregation(MessageContext messageContext, Aggregate aggregate) {
 
-        if (newSynCtx == null) {
+        if (messageContext == null) {
             log.warn("An aggregation of messages timed out with no aggregated messages", null);
             return false;
         }
         aggregate.clear();
         activeAggregates.remove(aggregate.getCorrelation());
-        newSynCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false);
-        SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(newSynCtx);
-        boolean result = false;
+
+        if (isTargetBody()) {
+            // Set content type to the aggregated message
+            setContentType(messageContext);
+        } else {
+            // Update the continuation state to current mediator position as we are using the original message context
+            ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition());
+        }
+
+        SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext);
+        messageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
 
         if (RuntimeStatisticCollector.isStatisticsEnabled()) {
-            CloseEventCollector.closeEntryEvent(newSynCtx, getMediatorName(), ComponentType.MEDIATOR,
+            CloseEventCollector.closeEntryEvent(messageContext, getMediatorName(), ComponentType.MEDIATOR,
                     statisticReportingIndex, isContentAltering());
         }
 
+        boolean result = false;
         if (seqContinuationState != null) {
-            SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(newSynCtx, seqContinuationState);
-            result = sequenceMediator.mediate(newSynCtx, seqContinuationState);
+            SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState);
+            result = sequenceMediator.mediate(messageContext, seqContinuationState);
             if (RuntimeStatisticCollector.isStatisticsEnabled()) {
-                sequenceMediator.reportCloseStatistics(newSynCtx, null);
+                sequenceMediator.reportCloseStatistics(messageContext, null);
             }
         }
-        CloseEventCollector.closeEventsAfterScatterGather(newSynCtx);
+        CloseEventCollector.closeEventsAfterScatterGather(messageContext);
         return result;
     }
 
+    /**
+     * Return the original message context using the SharedDataHolder.
+     *
+     * @param aggregate Aggregate object
+     * @return original message context
+     */
+    private MessageContext getOriginalMessageContext(Aggregate aggregate) {
+
+        MessageContext lastMessage = aggregate.getLastMessage();
+        if (lastMessage != null) {
+            Object aggregateHolderObj = lastMessage.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id);
+            if (aggregateHolderObj != null) {
+                SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateHolderObj;
+                return sharedDataHolder.getSynCtx();
+            }
+        }
+        return null;
+    }
+
+    private void setContentType(MessageContext synCtx) {
+
+        org.apache.axis2.context.MessageContext a2mc = ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+        if (Objects.equals(contentType, JSON_TYPE)) {
+            a2mc.setProperty(Constants.Configuration.MESSAGE_TYPE, JSON_CONTENT_TYPE);
+            a2mc.setProperty(Constants.Configuration.CONTENT_TYPE, JSON_CONTENT_TYPE);
+            setContentTypeHeader(JSON_CONTENT_TYPE, a2mc);
+        } else {
+            a2mc.setProperty(Constants.Configuration.MESSAGE_TYPE, XML_CONTENT_TYPE);
+            a2mc.setProperty(Constants.Configuration.CONTENT_TYPE, XML_CONTENT_TYPE);
+            setContentTypeHeader(XML_CONTENT_TYPE, a2mc);
+        }
+        a2mc.removeProperty("NO_ENTITY_BODY");
+    }
+
+    private void setContentTypeHeader(Object resultValue, org.apache.axis2.context.MessageContext axis2MessageCtx) {
+
+        axis2MessageCtx.setProperty(org.apache.axis2.Constants.Configuration.CONTENT_TYPE, resultValue);
+        Object o = axis2MessageCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
+        Map headers = (Map) o;
+        if (headers != null) {
+            headers.put(HTTP.CONTENT_TYPE, resultValue);
+        }
+    }
+
+    private static void addChildren(List list, OMElement element) {
+
+        for (Object item : list) {
+            if (item instanceof OMElement) {
+                element.addChild((OMElement) item);
+            }
+        }
+    }
+
+    private static List getMatchingElements(MessageContext messageContext, SynapsePath expression) {
+
+        Object o = expression.objectValueOf(messageContext);
+        if (o instanceof OMNode) {
+            List list = new ArrayList();
+            list.add(o);
+            return list;
+        } else if (o instanceof List) {
+            return (List) o;
+        } else {
+            return new ArrayList();
+        }
+    }
+
+    private static void enrichEnvelope(MessageContext messageContext, SynapsePath expression) {
+
+        OMElement enrichingElement;
+        List elementList = getMatchingElements(messageContext, expression);
+        if (EIPUtils.checkNotEmpty(elementList)) {
+            // attach at parent of the first result from the XPath, or to the SOAPBody
+            Object o = elementList.get(0);
+            if (o instanceof OMElement &&
+                    ((OMElement) o).getParent() != null &&
+                    ((OMElement) o).getParent() instanceof OMElement) {
+                enrichingElement = (OMElement) ((OMElement) o).getParent();
+                OMElement body = messageContext.getEnvelope().getBody();
+                if (!EIPUtils.isBody(body, enrichingElement)) {
+                    OMElement nonBodyElem = enrichingElement;
+                    enrichingElement = messageContext.getEnvelope().getBody();
+                    addChildren(elementList, enrichingElement);
+                    while (!EIPUtils.isBody(body, (OMElement) nonBodyElem.getParent())) {
+                        nonBodyElem = (OMElement) nonBodyElem.getParent();
+                    }
+                    nonBodyElem.detach();
+                }
+            }
+        }
+    }
+
+    private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, Aggregate aggregate) {
+
+        Object variable = originalMessageContext.getVariable(resultTarget);
+        if (variable == null) {
+            variable = createNewVariable(originalMessageContext, aggregate);
+            originalMessageContext.setVariable(resultTarget, variable);
+        }
+        if (Objects.equals(contentType, JSON_TYPE)) {
+            setJSONResultToVariable((JsonElement) variable, aggregate);
+        } else if (Objects.equals(contentType, XML_TYPE) && variable instanceof OMElement) {
+            setXMLResultToVariable((OMElement) variable, aggregate);
+        } else {
+            handleInvalidVariableType(variable, aggregate, originalMessageContext);
+        }
+        StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), originalMessageContext);
+    }
+
+    private void handleInvalidVariableType(Object variable, Aggregate aggregate, MessageContext synCtx) {
+
+        String expectedType = Objects.equals(contentType, JSON_TYPE) ? "JSON" : "OMElement";
+        String actualType = variable != null ? variable.getClass().getName() : "null";
+        handleException(aggregate, "Error merging aggregation results to variable: " + resultTarget +
+                " expected a " + expectedType + " but found " + actualType, null, synCtx);
+    }
+
+    private void setJSONResultToVariable(JsonElement variable, Aggregate aggregate) {
+
+        try {
+            for (MessageContext synCtx : aggregate.getMessages()) {
+                Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
+                if (variable instanceof JsonArray) {
+                    variable.getAsJsonArray().add((JsonElement) evaluatedResult);
+                } else if (variable instanceof JsonObject) {
+                    JSONMergeUtils.extendJSONObject((JsonObject) variable,
+                            JSONMergeUtils.ConflictStrategy.MERGE_INTO_ARRAY,
+                            ((JsonObject) evaluatedResult).getAsJsonObject());
+                } else {
+                    handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget +
+                            " expected a JSON type variable but found " + variable.getClass().getName(), null, synCtx);
+                }
+            }
+        } catch (JSONObjectExtensionException e) {
+            handleException(aggregate, "Error merging aggregation results to JSON Object : " +
+                    aggregationExpression.toString(), e, aggregate.getLastMessage());
+        }
+    }
+
+    private void setXMLResultToVariable(OMElement variable, Aggregate aggregate) {
+
+        for (MessageContext synCtx : aggregate.getMessages()) {
+            List list = getMatchingElements(synCtx, aggregationExpression);
+            addChildren(list, variable);
+        }
+    }
+
+    private Object createNewVariable(MessageContext synCtx, Aggregate aggregate) {
+
+        if (Objects.equals(contentType, JSON_TYPE)) {
+            return new JsonArray();
+        } else if (Objects.equals(contentType, XML_TYPE)) {
+            return OMAbstractFactory.getOMFactory().createOMElement(new QName(resultTarget));
+        } else {
+            handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget +
+                    " unknown content type : " + contentType, null, synCtx);
+            return null;
+        }
+    }
+
     private MessageContext getAggregatedMessage(Aggregate aggregate) {
 
         MessageContext newCtx = null;
         JsonArray jsonArray = new JsonArray();
-        JsonElement result;
-        boolean isJSONAggregation = aggregationExpression instanceof SynapseJsonPath;
 
         for (MessageContext synCtx : aggregate.getMessages()) {
             if (newCtx == null) {
@@ -543,58 +756,51 @@ private MessageContext getAggregatedMessage(Aggregate aggregate) {
                 if (log.isDebugEnabled()) {
                     log.debug("Generating Aggregated message from : " + newCtx.getEnvelope());
                 }
-                if (isJSONAggregation) {
-                    jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression));
-                } else {
-                    try {
-                        EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx, (SynapseXPath) aggregationExpression);
-                    } catch (JaxenException e) {
-                        handleException(aggregate, "Error merging aggregation results using XPath : " +
-                                aggregationExpression.toString(), e, synCtx);
+                if (Objects.equals(contentType, JSON_TYPE)) {
+                    Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
+                    if (evaluatedResult instanceof JsonElement) {
+                        jsonArray.add((JsonElement) evaluatedResult);
+                    } else {
+                        handleException(aggregate, "Error merging aggregation results as expression : " +
+                                aggregationExpression.toString() + " did not resolve to a JSON value", null, synCtx);
                     }
+                } else {
+                    enrichEnvelope(synCtx, aggregationExpression);
                 }
             } else {
                 try {
                     if (log.isDebugEnabled()) {
-                        log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " +
+                        log.debug("Merging message : " + synCtx.getEnvelope() + " using expression : " +
                                 aggregationExpression);
                     }
-                    // When the target sequences are not content aware, the message builder wont get triggered.
-                    // Therefore, we need to build the message to do the aggregation.
-                    RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext());
-                    if (isJSONAggregation) {
-                        jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression));
+                    if (Objects.equals(contentType, JSON_TYPE)) {
+                        Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
+                        if (evaluatedResult instanceof JsonElement) {
+                            jsonArray.add((JsonElement) evaluatedResult);
+                        } else {
+                            jsonArray.add(evaluatedResult.toString());
+                        }
                     } else {
-                        EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx.getEnvelope(), synCtx, (SynapseXPath)
-                                aggregationExpression);
+                        enrichEnvelope(synCtx, aggregationExpression);
                     }
 
                     if (log.isDebugEnabled()) {
                         log.debug("Merged result : " + newCtx.getEnvelope());
                     }
-                } catch (JaxenException e) {
-                    handleException(aggregate, "Error merging aggregation results using XPath : " +
-                            aggregationExpression.toString(), e, synCtx);
                 } catch (SynapseException e) {
                     handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx);
                 } catch (JsonSyntaxException e) {
                     handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx);
-                } catch (IOException e) {
-                    handleException(aggregate, "IO Error occurred while building the message", e, synCtx);
-                } catch (XMLStreamException e) {
-                    handleException(aggregate, "XML Error occurred while building the message", e, synCtx);
                 }
             }
         }
 
-        result = jsonArray;
-
         StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx);
-        if (isJSONAggregation) {
+        if (Objects.equals(contentType, JSON_TYPE)) {
             // setting the new JSON payload to the messageContext
             try {
                 JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), new
-                        ByteArrayInputStream(result.toString().getBytes()), true, true);
+                        ByteArrayInputStream(jsonArray.toString().getBytes()), true, true);
             } catch (AxisFault axisFault) {
                 log.error("Error occurred while setting the new JSON payload to the msg context", axisFault);
             }
@@ -615,7 +821,6 @@ public SynapsePath getCorrelateExpression() {
     public void setCorrelateExpression(SynapsePath correlateExpression) {
 
         this.correlateExpression = correlateExpression;
-        this.id = null;
     }
 
     public long getCompletionTimeoutMillis() {
@@ -700,11 +905,36 @@ public boolean isContentAltering() {
     private void handleException(Aggregate aggregate, String msg, Exception exception, MessageContext msgContext) {
 
         aggregate.clear();
-        activeAggregates.clear();
+        activeAggregates.remove(aggregate.getCorrelation());
         if (exception != null) {
             super.handleException(msg, exception, msgContext);
         } else {
             super.handleException(msg, msgContext);
         }
     }
+
+    public String getContentType() {
+
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+
+        this.contentType = contentType;
+    }
+
+    public String getResultTarget() {
+
+        return resultTarget;
+    }
+
+    public void setResultTarget(String resultTarget) {
+
+        this.resultTarget = resultTarget;
+    }
+
+    private boolean isTargetBody() {
+
+        return "body".equalsIgnoreCase(resultTarget);
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
index a8572a5256..67da918d99 100644
--- a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
+++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
@@ -36,7 +36,8 @@ public ScatterGatherMediatorSerializationTest() {
 
     public void testScatterGatherSerialization() {
 
-        String inputXML = "" +
+        String inputXML = "" +
                 "" +
                 "{                    \"pet\": {                        " +
                 "\"name\": \"pet1\",                        \"type\": \"dog\"                    },                    " +

From a85c6227c076d4d11eba21f30496e2cb7930175d Mon Sep 17 00:00:00 2001
From: Sanoj Punchihewa 
Date: Mon, 2 Dec 2024 19:00:00 +0530
Subject: [PATCH 5/6] Fix concurrency issue in aggregation

---
 .../mediators/eip/aggregator/Aggregate.java   | 25 +++++++++++--------
 .../synapse/mediators/v2/ScatterGather.java   |  3 +--
 2 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
index cbd7998886..337a693d57 100755
--- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
@@ -115,9 +115,6 @@ public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMilli
      */
     public synchronized boolean addMessage(MessageContext synCtx) {
         if (maxCount <= 0 || (maxCount > 0 && messages.size() < maxCount)) {
-            if (messages == null) {
-                return false;
-            }
             messages.add(synCtx);
             return true;
         } else {
@@ -261,12 +258,16 @@ public void run() {
                 break;
             }
             if (getLock()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " +
-                            "expired at : " + expiryTimeMillis);
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " +
+                                "expired at : " + expiryTimeMillis);
+                    }
+                    synEnv.getExecutorService().execute(new AggregateTimeout(this));
+                    break;
+                } finally {
+                    releaseLock();
                 }
-                synEnv.getExecutorService().execute(new AggregateTimeout(this));
-                break;
             }
         }
     }
@@ -312,10 +313,14 @@ public void run() {
     }
 
     public synchronized boolean getLock() {
-        return !locked;
+        if (!locked) {
+            locked = true;
+            return true;
+        }
+        return false;
     }
 
-    public void releaseLock() {
+    public synchronized void releaseLock() {
         locked = false;
     }
 
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
index 876183b7fe..c0e5e25f8f 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
@@ -448,7 +448,7 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
             }
         } else {
             synLog.traceOrDebug("Unable to find aggregation correlation property");
-            return true;
+            return false;
         }
         // if there is an aggregate continue on aggregation
         if (aggregate != null) {
@@ -471,7 +471,6 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
             }
         } else {
             synLog.traceOrDebug("Unable to find an aggregate for this message - skip");
-            return true;
         }
         return false;
     }

From e0c7d37a494e0114a90a15eee9d77d3ca34d6ce6 Mon Sep 17 00:00:00 2001
From: Sanoj Punchihewa 
Date: Wed, 4 Dec 2024 07:10:58 +0530
Subject: [PATCH 6/6] Fix review comments

---
 .../management/handling/span/SpanHandler.java |  21 +-
 .../xml/ScatterGatherMediatorFactory.java     |  19 +-
 .../xml/ScatterGatherMediatorSerializer.java  |   7 +-
 .../synapse/mediators/MediatorWorker.java     |  32 +-
 .../mediators/eip/aggregator/Aggregate.java   |  16 +-
 .../synapse/mediators/v2/ScatterGather.java   | 294 ++++++------------
 ...catterGatherMediatorSerializationTest.java |  22 +-
 7 files changed, 167 insertions(+), 244 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java
index 9a5c1f57b9..5a78b2cb1a 100644
--- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java
+++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java
@@ -432,12 +432,21 @@ private void handleCallbackFinishEvent(MessageContext messageContext) {
 
     public void handleScatterGatherFinishEvent(MessageContext messageContext) {
         TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext);
-            synchronized (tracingScope.getSpanStore()) {
-                cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext);
-                SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper();
-                tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext);
-                tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
-            }
+        synchronized (tracingScope.getSpanStore()) {
+            cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext);
+            cleanUpActiveSpans(tracingScope.getSpanStore(), messageContext);
+            SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper();
+            tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext);
+            tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
+        }
+    }
+
+    private void cleanUpActiveSpans(SpanStore spanStore, MessageContext messageContext) {
+        List activeSpanWrappers = spanStore.getActiveSpanWrappers();
+        for (int i = activeSpanWrappers.size() - 1; i > 0; i--) {
+            SpanWrapper spanWrapper = activeSpanWrappers.get(i);
+            spanStore.finishSpan(spanWrapper, messageContext);
+        }
     }
 
     @Override
diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java
index 81689a7edb..f6bbb54dc5 100644
--- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java
+++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java
@@ -54,7 +54,7 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
             = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather");
     private static final QName ELEMENT_AGGREGATE_Q
             = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation");
-    private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value");
+    private static final QName ATT_AGGREGATE_EXPRESSION = new QName("expression");
     private static final QName ATT_CONDITION = new QName("condition");
     private static final QName ATT_TIMEOUT = new QName("timeout");
     private static final QName ATT_MIN_MESSAGES = new QName("min-messages");
@@ -62,6 +62,7 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
     private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
     private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");
     private static final QName RESULT_TARGET_Q = new QName("result-target");
+    private static final QName ROOT_ELEMENT_Q = new QName("root-element");
     private static final QName CONTENT_TYPE_Q = new QName("content-type");
 
     private static final SequenceMediatorFactory fac = new SequenceMediatorFactory();
@@ -87,7 +88,15 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
             if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
                 mediator.setContentType(ScatterGather.JSON_TYPE);
             } else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
-                mediator.setContentType(ScatterGather.XML_TYPE);
+                OMAttribute rootElementAttr = elem.getAttribute(ROOT_ELEMENT_Q);
+                if (rootElementAttr != null && StringUtils.isNotBlank(rootElementAttr.getAttributeValue())) {
+                    mediator.setRootElementName(rootElementAttr.getAttributeValue());
+                    mediator.setContentType(ScatterGather.XML_TYPE);
+                } else {
+                    String msg = "The 'root-element' attribute is required for the configuration of a " +
+                            "Scatter Gather mediator when the 'content-type' is 'XML'";
+                    throw new SynapseException(msg);
+                }
             } else {
                 String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'";
                 throw new SynapseException(msg);
@@ -119,16 +128,16 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
 
         OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q);
         if (aggregateElement != null) {
-            OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_VALUE_TO_AGGREGATE);
+            OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_AGGREGATE_EXPRESSION);
             if (aggregateExpr != null) {
                 try {
                     mediator.setAggregationExpression(
-                            SynapsePathFactory.getSynapsePath(aggregateElement, ATT_VALUE_TO_AGGREGATE));
+                            SynapsePathFactory.getSynapsePath(aggregateElement, ATT_AGGREGATE_EXPRESSION));
                 } catch (JaxenException e) {
                     handleException("Unable to load the aggregating expression", e);
                 }
             } else {
-                String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator";
+                String msg = "The 'expression' attribute is required for the configuration of a Scatter Gather mediator";
                 throw new SynapseException(msg);
             }
 
diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
index e63bad4c85..9585b59db6 100755
--- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
+++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.synapse.config.xml;
 
 import org.apache.axiom.om.OMElement;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.mediators.eip.Target;
 import org.apache.synapse.mediators.v2.ScatterGather;
@@ -47,11 +48,15 @@ public OMElement serializeSpecificMediator(Mediator m) {
                 "result-target", nullNS, scatterGatherMediator.getResultTarget()));
         scatterGatherElement.addAttribute(fac.createOMAttribute(
                 "content-type", nullNS, scatterGatherMediator.getContentType()));
+        if (StringUtils.isNotBlank(scatterGatherMediator.getRootElementName())) {
+            scatterGatherElement.addAttribute(fac.createOMAttribute(
+                    "root-element", nullNS, scatterGatherMediator.getRootElementName()));
+        }
 
         OMElement aggregationElement = fac.createOMElement("aggregation", synNS);
 
         SynapsePathSerializer.serializePath(
-                scatterGatherMediator.getAggregationExpression(), aggregationElement, "value");
+                scatterGatherMediator.getAggregationExpression(), aggregationElement, "expression");
 
         if (scatterGatherMediator.getCorrelateExpression() != null) {
             SynapsePathSerializer.serializePath(
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
index 2d4258f342..706d792b91 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
@@ -97,24 +97,20 @@ public void run() {
                 debugManager.advertiseMediationFlowStartPoint(synCtx);
             }
 
+            boolean result = seq.mediate(synCtx);
             // If this is a scatter message, then we need to use the continuation state and continue the mediation
-            if (isScatterMessage(synCtx)) {
-                boolean result = seq.mediate(synCtx);
-                if (result) {
-                    SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx);
-                    if (seqContinuationState == null) {
-                        return;
-                    }
-                    SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState);
-
-                    FlowContinuableMediator mediator =
-                            (FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition());
-
-                    synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true);
-                    mediator.mediate(synCtx, seqContinuationState);
+            if (isScatterMessage(synCtx) && result) {
+                SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx);
+                if (seqContinuationState == null) {
+                    return;
                 }
-            } else {
-                seq.mediate(synCtx);
+                SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState);
+
+                FlowContinuableMediator mediator =
+                        (FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition());
+
+                synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true);
+                mediator.mediate(synCtx, seqContinuationState);
             }
             //((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();
 
@@ -188,7 +184,7 @@ public void setStatisticsCloseEventListener(StatisticsCloseEventListener statist
      */
     private static boolean isScatterMessage(MessageContext synCtx) {
 
-        Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
-        return isSkipContinuationState != null && isSkipContinuationState;
+        Boolean isScatterMessage = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
+        return isScatterMessage != null && isScatterMessage;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
index 337a693d57..d650b454e2 100755
--- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
@@ -31,6 +31,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TimerTask;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * An instance of this class is created to manage each aggregation group, and it holds
@@ -53,7 +54,7 @@ public class Aggregate extends TimerTask {
     private AggregateMediator aggregateMediator = null;
     private ScatterGather scatterGatherMediator = null;
     private List messages = new ArrayList();
-    private boolean locked = false;
+    private ReentrantLock lock = new ReentrantLock();
     private boolean completed = false;
     private SynapseEnvironment synEnv = null;
 
@@ -313,15 +314,15 @@ public void run() {
     }
 
     public synchronized boolean getLock() {
-        if (!locked) {
-            locked = true;
-            return true;
-        }
-        return false;
+
+        return lock.tryLock();
     }
 
     public synchronized void releaseLock() {
-        locked = false;
+
+        if (lock.isHeldByCurrentThread()) {
+            lock.unlock();
+        }
     }
 
     public boolean isCompleted() {
@@ -331,5 +332,4 @@ public boolean isCompleted() {
     public void setCompleted(boolean completed) {
         this.completed = completed;
     }
-
 }
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
index c0e5e25f8f..5dbb70b9c9 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
@@ -20,18 +20,19 @@
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
 import com.google.gson.JsonSyntaxException;
 import org.apache.axiom.om.OMAbstractFactory;
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMNode;
+import org.apache.axiom.om.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.context.OperationContext;
 import org.apache.http.protocol.HTTP;
 import org.apache.synapse.ContinuationState;
-import org.apache.synapse.JSONObjectExtensionException;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
@@ -59,12 +60,10 @@
 import org.apache.synapse.mediators.Value;
 import org.apache.synapse.mediators.base.SequenceMediator;
 import org.apache.synapse.mediators.eip.EIPConstants;
-import org.apache.synapse.mediators.eip.EIPUtils;
 import org.apache.synapse.mediators.eip.SharedDataHolder;
 import org.apache.synapse.mediators.eip.Target;
 import org.apache.synapse.mediators.eip.aggregator.Aggregate;
 import org.apache.synapse.transport.passthru.util.RelayUtils;
-import org.apache.synapse.util.JSONMergeUtils;
 import org.apache.synapse.util.MessageHelper;
 
 import java.io.ByteArrayInputStream;
@@ -100,6 +99,7 @@ public class ScatterGather extends AbstractMediator implements ManagedLifecycle,
     private boolean parallelExecution = true;
     private Integer statisticReportingIndex;
     private String contentType;
+    private String rootElementName;
     private String resultTarget;
     private SynapseEnvironment synapseEnv;
 
@@ -265,6 +265,7 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
         // If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch
         // otherwise start aggregation
         if (isContinuationTriggeredFromMediatorWorker(synCtx)) {
+            synLog.traceOrDebug("Continuation is triggered from a mediator worker");
             if (continuationState.hasChild()) {
                 int subBranch = ((ReliantContinuationState) continuationState.getChildContState()).getSubBranch();
                 SequenceMediator branchSequence = targets.get(subBranch).getSequence();
@@ -280,6 +281,7 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
                 result = true;
             }
         } else {
+            synLog.traceOrDebug("Continuation is triggered from a callback");
             // If the continuation is triggered from a callback, continue the mediation from the continuation state
             int subBranch = ((ReliantContinuationState) continuationState).getSubBranch();
 
@@ -312,9 +314,10 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
         String correlationIdName = EIPConstants.AGGREGATE_CORRELATION + "." + id;
 
         Object correlationID = synCtx.getProperty(correlationIdName);
-        String correlation;
+        String correlation = (String) correlationID;
+        synLog.traceOrDebug("Aggregating messages started for correlation : " + correlation);
 
-        Object result = null;
+        boolean isAggregationConditionMet = false;
         // When the target sequences are not content aware, the message builder wont get triggered.
         // Therefore, we need to build the message to do the aggregation.
         try {
@@ -324,71 +327,12 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
         }
 
         if (correlateExpression != null) {
-            result = correlateExpression.objectValueOf(synCtx);
-            if (result instanceof List) {
-                if (((List) result).isEmpty()) {
-                    handleException("Failed to evaluate correlate expression: " + correlateExpression.toString(), synCtx);
-                }
-            }
-            if (result instanceof JsonPrimitive) {
-                if (!((JsonPrimitive) result).getAsBoolean()) {
-                    return false;
-                }
-            }
-            if (result instanceof Boolean) {
-                if (!(Boolean) result) {
-                    return false;
-                }
+            String expressionResult = correlateExpression.stringValueOf(synCtx);
+            if ("true".equalsIgnoreCase(expressionResult)) {
+                isAggregationConditionMet = true;
             }
         }
-        if (result != null) {
-            while (aggregate == null) {
-                synchronized (lock) {
-                    if (activeAggregates.containsKey(correlateExpression.toString())) {
-                        aggregate = activeAggregates.get(correlateExpression.toString());
-                        if (aggregate != null) {
-                            if (!aggregate.getLock()) {
-                                aggregate = null;
-                            }
-                        }
-                    } else {
-                        if (synLog.isTraceOrDebugEnabled()) {
-                            synLog.traceOrDebug("Creating new Aggregator - " +
-                                    (completionTimeoutMillis > 0 ? "expires in : "
-                                            + (completionTimeoutMillis / 1000) + "secs" :
-                                            "without expiry time"));
-                        }
-                        if (isAggregationCompleted(synCtx)) {
-                            return false;
-                        }
-
-                        Double minMsg = -1.0;
-                        if (minMessagesToComplete != null) {
-                            minMsg = Double.parseDouble(minMessagesToComplete.evaluateValue(synCtx));
-                        }
-                        Double maxMsg = -1.0;
-                        if (maxMessagesToComplete != null) {
-                            maxMsg = Double.parseDouble(maxMessagesToComplete.evaluateValue(synCtx));
-                        }
-
-                        aggregate = new Aggregate(
-                                synCtx.getEnvironment(),
-                                correlateExpression.toString(),
-                                completionTimeoutMillis,
-                                minMsg.intValue(),
-                                maxMsg.intValue(), this, synCtx.getFaultStack().peek());
-
-                        if (completionTimeoutMillis > 0) {
-                            synCtx.getConfiguration().getSynapseTimer().
-                                    schedule(aggregate, completionTimeoutMillis);
-                        }
-                        aggregate.getLock();
-                        activeAggregates.put(correlateExpression.toString(), aggregate);
-                    }
-                }
-            }
-        } else if (correlationID instanceof String) {
-            correlation = (String) correlationID;
+        if (correlateExpression == null || isAggregationConditionMet) {
             while (aggregate == null) {
                 synchronized (lock) {
                     if (activeAggregates.containsKey(correlation)) {
@@ -397,8 +341,6 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
                             if (!aggregate.getLock()) {
                                 aggregate = null;
                             }
-                        } else {
-                            break;
                         }
                     } else {
                         if (synLog.isTraceOrDebugEnabled()) {
@@ -446,9 +388,6 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
                     }
                 }
             }
-        } else {
-            synLog.traceOrDebug("Unable to find aggregation correlation property");
-            return false;
         }
         // if there is an aggregate continue on aggregation
         if (aggregate != null) {
@@ -463,9 +402,7 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
             }
             if (aggregate.isComplete(synLog)) {
                 synLog.traceOrDebug("Aggregation completed");
-                boolean onCompleteSeqResult = completeAggregate(aggregate);
-                synLog.traceOrDebug("End : Scatter Gather mediator");
-                return onCompleteSeqResult;
+                return completeAggregate(aggregate);
             } else {
                 aggregate.releaseLock();
             }
@@ -482,9 +419,7 @@ private boolean isAggregationCompleted(MessageContext synCtx) {
         if (aggregateTimeoutHolderObj != null) {
             SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj;
             if (sharedDataHolder.isAggregationCompleted()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Received a response for already completed Aggregate");
-                }
+                log.debug("Received a response for already completed Aggregate");
                 return true;
             }
         }
@@ -498,10 +433,7 @@ public boolean completeAggregate(Aggregate aggregate) {
         if (wasComplete) {
             return false;
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug("Aggregation completed or timed out");
-        }
+        log.debug("Aggregation completed or timed out");
 
         // cancel the timer
         synchronized (this) {
@@ -567,6 +499,7 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr
                     statisticReportingIndex, isContentAltering());
         }
 
+        getLog(messageContext).traceOrDebug("End : Scatter Gather mediator");
         boolean result = false;
         if (seqContinuationState != null) {
             SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState);
@@ -646,145 +579,64 @@ private static List getMatchingElements(MessageContext messageContext, SynapsePa
         }
     }
 
-    private static void enrichEnvelope(MessageContext messageContext, SynapsePath expression) {
-
-        OMElement enrichingElement;
-        List elementList = getMatchingElements(messageContext, expression);
-        if (EIPUtils.checkNotEmpty(elementList)) {
-            // attach at parent of the first result from the XPath, or to the SOAPBody
-            Object o = elementList.get(0);
-            if (o instanceof OMElement &&
-                    ((OMElement) o).getParent() != null &&
-                    ((OMElement) o).getParent() instanceof OMElement) {
-                enrichingElement = (OMElement) ((OMElement) o).getParent();
-                OMElement body = messageContext.getEnvelope().getBody();
-                if (!EIPUtils.isBody(body, enrichingElement)) {
-                    OMElement nonBodyElem = enrichingElement;
-                    enrichingElement = messageContext.getEnvelope().getBody();
-                    addChildren(elementList, enrichingElement);
-                    while (!EIPUtils.isBody(body, (OMElement) nonBodyElem.getParent())) {
-                        nonBodyElem = (OMElement) nonBodyElem.getParent();
-                    }
-                    nonBodyElem.detach();
-                }
-            }
-        }
-    }
-
     private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, Aggregate aggregate) {
 
-        Object variable = originalMessageContext.getVariable(resultTarget);
-        if (variable == null) {
-            variable = createNewVariable(originalMessageContext, aggregate);
-            originalMessageContext.setVariable(resultTarget, variable);
-        }
+        Object variable = null;
         if (Objects.equals(contentType, JSON_TYPE)) {
-            setJSONResultToVariable((JsonElement) variable, aggregate);
-        } else if (Objects.equals(contentType, XML_TYPE) && variable instanceof OMElement) {
-            setXMLResultToVariable((OMElement) variable, aggregate);
+            log.debug("Merging aggregated JSON responses to variable");
+            variable = new JsonArray();
+            setJSONResultToVariable((JsonArray) variable, aggregate);
+        } else if (Objects.equals(contentType, XML_TYPE)) {
+            log.debug("Merging aggregated XML responses to variable");
+            variable = OMAbstractFactory.getOMFactory().createOMElement(new QName(rootElementName));
+            setXMLResultToRootOMElement((OMElement) variable, aggregate);
         } else {
-            handleInvalidVariableType(variable, aggregate, originalMessageContext);
+            handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget +
+                    " unknown content type : " + contentType, null, originalMessageContext);
         }
+        originalMessageContext.setVariable(resultTarget, variable);
         StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), originalMessageContext);
     }
 
-    private void handleInvalidVariableType(Object variable, Aggregate aggregate, MessageContext synCtx) {
+    private void setJSONResultToVariable(JsonArray variable, Aggregate aggregate) {
 
-        String expectedType = Objects.equals(contentType, JSON_TYPE) ? "JSON" : "OMElement";
-        String actualType = variable != null ? variable.getClass().getName() : "null";
-        handleException(aggregate, "Error merging aggregation results to variable: " + resultTarget +
-                " expected a " + expectedType + " but found " + actualType, null, synCtx);
+        for (MessageContext synCtx : aggregate.getMessages()) {
+            Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
+            variable.add((JsonElement) evaluatedResult);
+        }
     }
 
-    private void setJSONResultToVariable(JsonElement variable, Aggregate aggregate) {
+    private void setXMLResultToRootOMElement(OMElement element, Aggregate aggregate) {
 
         try {
             for (MessageContext synCtx : aggregate.getMessages()) {
-                Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
-                if (variable instanceof JsonArray) {
-                    variable.getAsJsonArray().add((JsonElement) evaluatedResult);
-                } else if (variable instanceof JsonObject) {
-                    JSONMergeUtils.extendJSONObject((JsonObject) variable,
-                            JSONMergeUtils.ConflictStrategy.MERGE_INTO_ARRAY,
-                            ((JsonObject) evaluatedResult).getAsJsonObject());
-                } else {
-                    handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget +
-                            " expected a JSON type variable but found " + variable.getClass().getName(), null, synCtx);
-                }
+                OMElement cloneResult = AXIOMUtil.stringToOM(aggregationExpression.stringValueOf(synCtx));
+                cloneResult.buildWithAttachments();
+                element.addChild(cloneResult);
             }
-        } catch (JSONObjectExtensionException e) {
-            handleException(aggregate, "Error merging aggregation results to JSON Object : " +
-                    aggregationExpression.toString(), e, aggregate.getLastMessage());
-        }
-    }
-
-    private void setXMLResultToVariable(OMElement variable, Aggregate aggregate) {
-
-        for (MessageContext synCtx : aggregate.getMessages()) {
-            List list = getMatchingElements(synCtx, aggregationExpression);
-            addChildren(list, variable);
-        }
-    }
-
-    private Object createNewVariable(MessageContext synCtx, Aggregate aggregate) {
-
-        if (Objects.equals(contentType, JSON_TYPE)) {
-            return new JsonArray();
-        } else if (Objects.equals(contentType, XML_TYPE)) {
-            return OMAbstractFactory.getOMFactory().createOMElement(new QName(resultTarget));
-        } else {
-            handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget +
-                    " unknown content type : " + contentType, null, synCtx);
-            return null;
+        } catch (XMLStreamException e) {
+            handleException(aggregate, "Error reading XML element: " + aggregationExpression.toString(), e,
+                    aggregate.getLastMessage());
         }
     }
 
     private MessageContext getAggregatedMessage(Aggregate aggregate) {
 
         MessageContext newCtx = null;
-        JsonArray jsonArray = new JsonArray();
-
-        for (MessageContext synCtx : aggregate.getMessages()) {
-            if (newCtx == null) {
-                try {
-                    newCtx = MessageHelper.cloneMessageContext(synCtx, true, false, true);
-                } catch (AxisFault axisFault) {
-                    handleException(aggregate, "Error creating a copy of the message", axisFault, synCtx);
-                }
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Generating Aggregated message from : " + newCtx.getEnvelope());
-                }
-                if (Objects.equals(contentType, JSON_TYPE)) {
-                    Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
-                    if (evaluatedResult instanceof JsonElement) {
-                        jsonArray.add((JsonElement) evaluatedResult);
-                    } else {
-                        handleException(aggregate, "Error merging aggregation results as expression : " +
-                                aggregationExpression.toString() + " did not resolve to a JSON value", null, synCtx);
-                    }
-                } else {
-                    enrichEnvelope(synCtx, aggregationExpression);
-                }
-            } else {
+        if (Objects.equals(contentType, JSON_TYPE)) {
+            JsonArray jsonArray = new JsonArray();
+            log.debug("Merging aggregated JSON responses to body");
+            for (MessageContext synCtx : aggregate.getMessages()) {
                 try {
                     if (log.isDebugEnabled()) {
                         log.debug("Merging message : " + synCtx.getEnvelope() + " using expression : " +
                                 aggregationExpression);
                     }
-                    if (Objects.equals(contentType, JSON_TYPE)) {
-                        Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
-                        if (evaluatedResult instanceof JsonElement) {
-                            jsonArray.add((JsonElement) evaluatedResult);
-                        } else {
-                            jsonArray.add(evaluatedResult.toString());
-                        }
+                    Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
+                    if (evaluatedResult instanceof JsonElement) {
+                        jsonArray.add((JsonElement) evaluatedResult);
                     } else {
-                        enrichEnvelope(synCtx, aggregationExpression);
-                    }
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("Merged result : " + newCtx.getEnvelope());
+                        jsonArray.add(evaluatedResult.toString());
                     }
                 } catch (SynapseException e) {
                     handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx);
@@ -792,26 +644,52 @@ private MessageContext getAggregatedMessage(Aggregate aggregate) {
                     handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx);
                 }
             }
-        }
-
-        StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx);
-        if (Objects.equals(contentType, JSON_TYPE)) {
             // setting the new JSON payload to the messageContext
             try {
+                newCtx = MessageHelper.cloneMessageContext(aggregate.getLastMessage(), false, false, true);
+                SOAPEnvelope newEnvelope = createNewSoapEnvelope(aggregate.getLastMessage().getEnvelope());
+                newCtx.setEnvelope(newEnvelope);
                 JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), new
                         ByteArrayInputStream(jsonArray.toString().getBytes()), true, true);
             } catch (AxisFault axisFault) {
-                log.error("Error occurred while setting the new JSON payload to the msg context", axisFault);
+                handleException(aggregate, "Error occurred while setting the new JSON payload to the message context",
+                        axisFault, newCtx);
+            }
+        } else if (Objects.equals(contentType, XML_TYPE)) {
+            log.debug("Merging aggregated XML responses to body");
+            OMElement rootElement = OMAbstractFactory.getOMFactory().createOMElement(new QName(rootElementName));
+            setXMLResultToRootOMElement(rootElement, aggregate);
+            try {
+                newCtx = MessageHelper.cloneMessageContext(aggregate.getLastMessage(), false, false, true);
+                SOAPEnvelope newEnvelope = createNewSoapEnvelope(aggregate.getLastMessage().getEnvelope());
+                newEnvelope.getBody().addChild(rootElement);
+                newCtx.setEnvelope(newEnvelope);
+            } catch (AxisFault axisFault) {
+                handleException(aggregate, "Error creating a copy of the message", axisFault, aggregate.getLastMessage());
             }
-        } else {
             // Removing the JSON stream after aggregated using XML path.
             // This will fix inconsistent behaviour in logging the payload.
             ((Axis2MessageContext) newCtx).getAxis2MessageContext()
                     .removeProperty(org.apache.synapse.commons.json.Constants.ORG_APACHE_SYNAPSE_COMMONS_JSON_JSON_INPUT_STREAM);
+        } else {
+            handleException(aggregate, "Error aggregating results. Unknown content type : " + contentType, null,
+                    aggregate.getLastMessage());
         }
+        StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx);
         return newCtx;
     }
 
+    private SOAPEnvelope createNewSoapEnvelope(SOAPEnvelope envelope) {
+
+        SOAPFactory fac;
+        if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(envelope.getBody().getNamespace().getNamespaceURI())) {
+            fac = OMAbstractFactory.getSOAP11Factory();
+        } else {
+            fac = OMAbstractFactory.getSOAP12Factory();
+        }
+        return fac.getDefaultEnvelope();
+    }
+
     public SynapsePath getCorrelateExpression() {
 
         return correlateExpression;
@@ -932,6 +810,16 @@ public void setResultTarget(String resultTarget) {
         this.resultTarget = resultTarget;
     }
 
+    public String getRootElementName() {
+
+        return rootElementName;
+    }
+
+    public void setRootElementName(String rootElementName) {
+
+        this.rootElementName = rootElementName;
+    }
+
     private boolean isTargetBody() {
 
         return "body".equalsIgnoreCase(resultTarget);
diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
index 67da918d99..9ecbd3f6f8 100644
--- a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
+++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
@@ -34,11 +34,27 @@ public ScatterGatherMediatorSerializationTest() {
         scatterGatherMediatorSerializer = new ScatterGatherMediatorSerializer();
     }
 
-    public void testScatterGatherSerialization() {
+    public void testScatterGatherXMLTypeSerialization() {
 
         String inputXML = "" +
-                "" +
+                "content-type=\"XML\" root-element=\"result\" parallel-execution=\"true\">" +
+                "" +
+                "{                    \"pet\": {                        " +
+                "\"name\": \"pet1\",                        \"type\": \"dog\"                    },                    " +
+                "\"status\": \"success\"                    }" +
+                "" +
+                "" +
+                "" +
+                "";
+
+        assertTrue(serialization(inputXML, scatterGatherMediatorFactory, scatterGatherMediatorSerializer));
+    }
+
+    public void testScatterGatherJSONTypeSerialization() {
+
+        String inputXML = "" +
+                "" +
                 "{                    \"pet\": {                        " +
                 "\"name\": \"pet1\",                        \"type\": \"dog\"                    },                    " +
                 "\"status\": \"success\"                    }" +