Skip to content

Commit

Permalink
Update continuation stack usage in Scatter Gather mediator
Browse files Browse the repository at this point in the history
  • Loading branch information
SanojPunchihewa committed Jan 7, 2025
1 parent 44331a7 commit 477dfd6
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.aspects.ComponentType;
import org.apache.synapse.aspects.flow.statistics.StatisticsCloseEventListener;
import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector;
import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants;
import org.apache.synapse.carbonext.TenantInfoConfigurator;
import org.apache.synapse.continuation.ContinuationStackManager;
import org.apache.synapse.continuation.SeqContinuationState;
import org.apache.synapse.debug.SynapseDebugManager;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.mediators.v2.ScatterGatherUtils;
import org.apache.synapse.util.logging.LoggingUtils;

/**
Expand Down Expand Up @@ -99,18 +96,14 @@ public void run() {

boolean result = seq.mediate(synCtx);
// If this is a scatter message, then we need to use the continuation state and continue the mediation
if (isScatterMessage(synCtx) && result) {
if (ScatterGatherUtils.isScatterMessage(synCtx) && result) {
SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx);
if (seqContinuationState == null) {
return;
}
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState);

FlowContinuableMediator mediator =
(FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition());

synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true);
mediator.mediate(synCtx, seqContinuationState);
sequenceMediator.mediate(synCtx, seqContinuationState);
}
//((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();

Expand Down Expand Up @@ -150,7 +143,7 @@ public void run() {
debugManager.advertiseMediationFlowTerminatePoint(synCtx);
debugManager.releaseMediationFlowLock();
}
if (RuntimeStatisticCollector.isStatisticsEnabled() && !isScatterMessage(synCtx)) {
if (RuntimeStatisticCollector.isStatisticsEnabled() && !ScatterGatherUtils.isScatterMessage(synCtx)) {
this.statisticsCloseEventListener.invokeCloseEventEntry(synCtx);
}
}
Expand All @@ -175,16 +168,4 @@ private void warn(boolean traceOn, String msg, MessageContext msgContext) {
public void setStatisticsCloseEventListener(StatisticsCloseEventListener statisticsCloseEventListener) {
this.statisticsCloseEventListener = statisticsCloseEventListener;
}

/**
* Check whether the message is a scatter message or not
*
* @param synCtx MessageContext
* @return true if the message is a scatter message
*/
private static boolean isScatterMessage(MessageContext synCtx) {

Boolean isScatterMessage = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
return isScatterMessage != null && isScatterMessage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder;
import org.apache.synapse.mediators.v2.ScatterGatherUtils;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.apache.synapse.aspects.ComponentType;
import org.apache.synapse.continuation.ContinuationStackManager;
Expand Down Expand Up @@ -157,7 +158,7 @@ public boolean mediate(MessageContext synCtx) {

boolean result = super.mediate(synCtx);

if (result && !skipAddition) {
if (result && !skipAddition && !ScatterGatherUtils.isScatterMessage(synCtx)) {
// if flow completed remove the previously added SeqContinuationState
ContinuationStackManager.removeSeqContinuationState(synCtx, sequenceType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,6 @@ public ForEachMediatorV2() {
id = String.valueOf(new Random().nextLong());
}

/**
* Check whether the message is a foreach message or not
*
* @param synCtx MessageContext
* @return true if the message is a foreach message
*/
private static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) {

Boolean isContinuationTriggeredMediatorWorker =
(Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER);
return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker;
}

@Override
public boolean mediate(MessageContext synCtx) {

Expand Down Expand Up @@ -262,44 +249,33 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
}

boolean result;
// If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch
// otherwise start aggregation
if (isContinuationTriggeredFromMediatorWorker(synCtx)) {
synLog.traceOrDebug("Continuation is triggered from a mediator worker");
if (continuationState.hasChild()) {
SequenceMediator branchSequence = target.getSequence();
boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
FlowContinuableMediator mediator =
(FlowContinuableMediator) branchSequence.getChild(continuationState.getChildContState().getPosition());

result = mediator.mediate(synCtx, continuationState.getChildContState());
if (isStatisticsEnabled) {
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
} else {
SequenceMediator branchSequence = target.getSequence();
boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
// If there are no children and the continuation was triggered from a mediator worker start aggregation
// otherwise mediate through the sub branch sequence
if (!continuationState.hasChild()) {
if (ScatterGatherUtils.isContinuationTriggeredFromMediatorWorker(synCtx)) {
synLog.traceOrDebug("Continuation is triggered from a mediator worker");
result = true;
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence");
result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
}
} else {
synLog.traceOrDebug("Continuation is triggered from a callback");
// If the continuation is triggered from a callback, continue the mediation from the continuation state
SequenceMediator branchSequence = target.getSequence();
boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
if (!continuationState.hasChild()) {
result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
} else {
FlowContinuableMediator mediator =
(FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition());
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state");
FlowContinuableMediator mediator =
(FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition());

result = mediator.mediate(synCtx, continuationState.getChildContState());
if (isStatisticsEnabled) {
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
result = mediator.mediate(synCtx, continuationState.getChildContState());
if (isStatisticsEnabled) {
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
}

if (result) {
// If the mediation is completed, remove the child continuation state from the stack, so the aggregation
// will continue the mediation from the parent continuation state
ContinuationStackManager.removeReliantContinuationState(synCtx);
}
if (result) {
return aggregateMessages(synCtx, synLog);
}
return false;
Expand Down Expand Up @@ -432,7 +408,6 @@ private boolean completeAggregate(ForEachAggregate aggregate) {
activeAggregates.remove(aggregate.getCorrelation());
// Update the continuation state to current mediator position as we are using the original message context
ContinuationStackManager.updateSeqContinuationState(originalMessageContext, getMediatorPosition());
SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(originalMessageContext);

getLog(originalMessageContext).traceOrDebug("End : Foreach mediator");
boolean result = false;
Expand All @@ -443,14 +418,19 @@ private boolean completeAggregate(ForEachAggregate aggregate) {
CloseEventCollector.closeEntryEvent(originalMessageContext, getMediatorName(), ComponentType.MEDIATOR,
statisticReportingIndex, isContentAltering());
}

if (seqContinuationState != null) {
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(originalMessageContext, seqContinuationState);
result = sequenceMediator.mediate(originalMessageContext, seqContinuationState);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
sequenceMediator.reportCloseStatistics(originalMessageContext, null);
do {
SeqContinuationState seqContinuationState =
(SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(originalMessageContext);
if (seqContinuationState != null) {
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(originalMessageContext, seqContinuationState);
result = sequenceMediator.mediate(originalMessageContext, seqContinuationState);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
sequenceMediator.reportCloseStatistics(originalMessageContext, null);
}
} else {
break;
}
}
} while (result && !originalMessageContext.getContinuationStateStack().isEmpty());
CloseEventCollector.closeEventsAfterScatterGather(originalMessageContext);
return result;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.gson.JsonSyntaxException;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMNode;
import org.apache.axiom.om.util.AXIOMUtil;
import org.apache.axiom.soap.SOAP11Constants;
import org.apache.axiom.soap.SOAPEnvelope;
Expand Down Expand Up @@ -262,47 +261,34 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
}

boolean result;
// If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch
// otherwise start aggregation
if (isContinuationTriggeredFromMediatorWorker(synCtx)) {
synLog.traceOrDebug("Continuation is triggered from a mediator worker");
if (continuationState.hasChild()) {
int subBranch = ((ReliantContinuationState) continuationState.getChildContState()).getSubBranch();
SequenceMediator branchSequence = targets.get(subBranch).getSequence();
boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
FlowContinuableMediator mediator =
(FlowContinuableMediator) branchSequence.getChild(continuationState.getChildContState().getPosition());

result = mediator.mediate(synCtx, continuationState.getChildContState());
if (isStatisticsEnabled) {
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
} else {
int subBranch = ((ReliantContinuationState) continuationState).getSubBranch();

SequenceMediator branchSequence = targets.get(subBranch).getSequence();
boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
// If there are no children and the continuation was triggered from a mediator worker start aggregation
// otherwise mediate through the sub branch sequence
if (!continuationState.hasChild()) {
if (ScatterGatherUtils.isContinuationTriggeredFromMediatorWorker(synCtx)) {
synLog.traceOrDebug("Continuation is triggered from a mediator worker");
result = true;
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence");
result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
}
} else {
synLog.traceOrDebug("Continuation is triggered from a callback");
// If the continuation is triggered from a callback, continue the mediation from the continuation state
int subBranch = ((ReliantContinuationState) continuationState).getSubBranch();
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state");
FlowContinuableMediator mediator =
(FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition());

SequenceMediator branchSequence = targets.get(subBranch).getSequence();
boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
if (!continuationState.hasChild()) {
result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
} else {
FlowContinuableMediator mediator =
(FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition());

result = mediator.mediate(synCtx, continuationState.getChildContState());
if (isStatisticsEnabled) {
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
result = mediator.mediate(synCtx, continuationState.getChildContState());
if (isStatisticsEnabled) {
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
}
if (result) {
// If the mediation is completed, remove the child continuation state from the stack, so the aggregation
// will continue the mediation from the parent continuation state
ContinuationStackManager.removeReliantContinuationState(synCtx);
}
if (result) {
return aggregateMessages(synCtx, synLog);
}
return false;
Expand Down Expand Up @@ -490,8 +476,6 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr
// Update the continuation state to current mediator position as we are using the original message context
ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition());
}

SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext);
messageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);

if (RuntimeStatisticCollector.isStatisticsEnabled()) {
Expand All @@ -501,13 +485,19 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr

getLog(messageContext).traceOrDebug("End : Scatter Gather mediator");
boolean result = false;
if (seqContinuationState != null) {
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState);
result = sequenceMediator.mediate(messageContext, seqContinuationState);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
sequenceMediator.reportCloseStatistics(messageContext, null);
do {
SeqContinuationState seqContinuationState =
(SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext);
if (seqContinuationState != null) {
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState);
result = sequenceMediator.mediate(messageContext, seqContinuationState);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
sequenceMediator.reportCloseStatistics(messageContext, null);
}
} else {
break;
}
}
} while (result && !messageContext.getContinuationStateStack().isEmpty());
CloseEventCollector.closeEventsAfterScatterGather(messageContext);
return result;
}
Expand Down Expand Up @@ -556,29 +546,6 @@ private void setContentTypeHeader(Object resultValue, org.apache.axis2.context.M
}
}

private static void addChildren(List list, OMElement element) {

for (Object item : list) {
if (item instanceof OMElement) {
element.addChild((OMElement) item);
}
}
}

private static List getMatchingElements(MessageContext messageContext, SynapsePath expression) {

Object o = expression.objectValueOf(messageContext);
if (o instanceof OMNode) {
List list = new ArrayList();
list.add(o);
return list;
} else if (o instanceof List) {
return (List) o;
} else {
return new ArrayList();
}
}

private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, Aggregate aggregate) {

Object variable = null;
Expand Down Expand Up @@ -730,19 +697,6 @@ public void setMaxMessagesToComplete(Value maxMessagesToComplete) {
this.maxMessagesToComplete = maxMessagesToComplete;
}

/**
* Check whether the message is a scatter message or not
*
* @param synCtx MessageContext
* @return true if the message is a scatter message
*/
private static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) {

Boolean isContinuationTriggeredMediatorWorker =
(Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER);
return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker;
}

@Override
public Integer reportOpenStatistics(MessageContext messageContext, boolean isContentAltering) {

Expand Down
Loading

0 comments on commit 477dfd6

Please sign in to comment.