Skip to content

Commit

Permalink
Merge pull request #2301 from SanojPunchihewa/foreach-skip-aggregation
Browse files Browse the repository at this point in the history
Add option to continue the parent flow without waiting for aggregation in Foreach mediator V2
  • Loading branch information
SanojPunchihewa authored Jan 27, 2025
2 parents 73fddf3 + df5ae52 commit 8b9da60
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class ForEachMediatorFactory extends AbstractMediatorFactory {
private static final QName RESULT_TARGET_Q = new QName("result-target");
private static final QName RESULT_TYPE_Q = new QName("result-type");
private static final QName ATT_COUNTER_VARIABLE = new QName("counter-variable");
private static final QName ATT_CONTINUE_WITHOUT_AGGREGATION = new QName("continue-without-aggregation");

public QName getTagQName() {
return FOREACH_Q;
Expand Down Expand Up @@ -155,20 +156,26 @@ public Mediator createForEachMediatorV2(OMElement elem, Properties properties) {
}
mediator.setParallelExecution(asynchronousExe);

OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q);
if (resultTargetAttr != null && StringUtils.isNotBlank(resultTargetAttr.getAttributeValue())) {
OMAttribute contentTypeAttr = elem.getAttribute(RESULT_TYPE_Q);
if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) {
handleException("The 'result-type' attribute is required when the 'result-target' attribute is present");
} else {
if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ForEachMediatorV2.JSON_TYPE);
} else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ForEachMediatorV2.XML_TYPE);
String continueWithoutAggregationAttr = elem.getAttributeValue(ATT_CONTINUE_WITHOUT_AGGREGATION);
// If the continue-without-aggregation attribute is set to true, the mediator will not wait for the aggregation
if ("true".equalsIgnoreCase(continueWithoutAggregationAttr)) {
mediator.setContinueWithoutAggregation(true);
} else {
OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q);
if (resultTargetAttr != null && StringUtils.isNotBlank(resultTargetAttr.getAttributeValue())) {
OMAttribute contentTypeAttr = elem.getAttribute(RESULT_TYPE_Q);
if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) {
handleException("The 'result-type' attribute is required when the 'result-target' attribute is present");
} else {
handleException("The 'result-type' attribute should be either 'JSON' or 'XML'");
if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ForEachMediatorV2.JSON_TYPE);
} else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ForEachMediatorV2.XML_TYPE);
} else {
handleException("The 'result-type' attribute should be either 'JSON' or 'XML'");
}
mediator.setResultTarget(resultTargetAttr.getAttributeValue());
}
mediator.setResultTarget(resultTargetAttr.getAttributeValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,22 @@ protected OMElement serializeSpecificMediator(Mediator m) {

if (forEachMediatorV2.getCollectionExpression() != null) {
SynapsePathSerializer.serializePath(forEachMediatorV2.getCollectionExpression(),
forEachElem, "collection");
forEachMediatorV2.getCollectionExpression().getExpression(), forEachElem, "collection");
} else {
handleException("Missing collection of the ForEach which is required.");
}
forEachElem.addAttribute(fac.createOMAttribute(
"parallel-execution", nullNS, Boolean.toString(forEachMediatorV2.getParallelExecution())));
if (forEachMediatorV2.getResultTarget() != null) {
if (forEachMediatorV2.isContinueWithoutAggregation()) {
forEachElem.addAttribute(fac.createOMAttribute(
"result-target", nullNS, forEachMediatorV2.getResultTarget()));
forEachElem.addAttribute(fac.createOMAttribute(
"result-type", nullNS, forEachMediatorV2.getContentType()));
"continue-without-aggregation", nullNS, "true"));
} else {
if (forEachMediatorV2.getResultTarget() != null) {
forEachElem.addAttribute(fac.createOMAttribute(
"result-target", nullNS, forEachMediatorV2.getResultTarget()));
forEachElem.addAttribute(fac.createOMAttribute(
"result-type", nullNS, forEachMediatorV2.getContentType()));
}
}
if (forEachMediatorV2.getCounterVariable() != null) {
forEachElem.addAttribute(fac.createOMAttribute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class ForEachMediatorV2 extends AbstractMediator implements ManagedLifecy
private String contentType;
private String resultTarget = null;
private String counterVariableName = null;
private boolean continueWithoutAggregation = false;
private SynapseEnvironment synapseEnv;

public ForEachMediatorV2() {
Expand All @@ -118,9 +119,11 @@ public boolean mediate(MessageContext synCtx) {
}

try {
// Clone the original MessageContext and save it to continue the flow
MessageContext clonedMessageContext = MessageHelper.cloneMessageContext(synCtx);
synCtx.setProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id, new SharedDataHolder(clonedMessageContext));
if (!continueWithoutAggregation) {
// Clone the original MessageContext and save it to continue the flow after aggregation
MessageContext clonedMessageContext = MessageHelper.cloneMessageContext(synCtx);
synCtx.setProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id, new SharedDataHolder(clonedMessageContext));
}

Object collection = collectionExpression.objectValueOf(synCtx);

Expand All @@ -136,7 +139,7 @@ public boolean mediate(MessageContext synCtx) {
MessageContext iteratedMsgCtx = getIteratedMessage(synCtx, msgNumber++, msgCount, item);
ContinuationStackManager.addReliantContinuationState(iteratedMsgCtx, 0, getMediatorPosition());
boolean result = target.mediate(iteratedMsgCtx);
if (!parallelExecution && result) {
if (!parallelExecution && result && !continueWithoutAggregation) {
aggregationResult = aggregateMessages(iteratedMsgCtx, synLog);
}
}
Expand All @@ -152,7 +155,7 @@ public boolean mediate(MessageContext synCtx) {
MessageContext iteratedMsgCtx = getIteratedMessage(synCtx, msgNumber++, msgCount, item);
ContinuationStackManager.addReliantContinuationState(iteratedMsgCtx, 0, getMediatorPosition());
boolean result = target.mediate(iteratedMsgCtx);
if (!parallelExecution && result) {
if (!parallelExecution && result && !continueWithoutAggregation) {
aggregationResult = aggregateMessages(iteratedMsgCtx, synLog);
}
}
Expand All @@ -162,14 +165,17 @@ public boolean mediate(MessageContext synCtx) {
} catch (AxisFault e) {
handleException("Error executing Foreach mediator", e, synCtx);
}

OperationContext opCtx
= ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext();
if (opCtx != null) {
opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
if (continueWithoutAggregation) {
return true;
} else {
OperationContext opCtx
= ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext();
if (opCtx != null) {
opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
}
synCtx.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
return aggregationResult;
}
synCtx.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
return aggregationResult;
}

private MessageContext getIteratedMessage(MessageContext synCtx, int msgNumber, int msgCount, Object node) throws AxisFault {
Expand Down Expand Up @@ -273,7 +279,10 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
}

// If this a continue without aggregation scenario, return false to end the mediation
if (continueWithoutAggregation) {
return false;
}
if (result) {
return aggregateMessages(synCtx, synLog);
}
Expand Down Expand Up @@ -739,4 +748,14 @@ public void setCounterVariable(String counterVariableName) {

this.counterVariableName = counterVariableName;
}

public void setContinueWithoutAggregation(boolean continueWithoutAggregation) {

this.continueWithoutAggregation = continueWithoutAggregation;
}

public boolean isContinueWithoutAggregation() {

return continueWithoutAggregation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,17 @@ public void testForEachMediatorSerialization_Sequence_ID_Comments() throws Excep
assertTrue(serialization(inputXml, foreachMediatorFactory, foreachMediatorSerializer));
assertTrue(serialization(inputXml, foreachMediatorSerializer));
}

public void testForEachMediatorV2_continueWithoutAggregation() throws Exception {
String inputXML = "<foreach collection=\"${payload.array}\" parallel-execution=\"true\" " +
"continue-without-aggregation=\"true\" xmlns=\"http://ws.apache.org/ns/synapse\">" +
"<sequence>" +
"<log>" +
"<message>Processing payload ${payload}</message>" +
"</log>" +
"</sequence>" +
"</foreach>";
assertTrue(serialization(inputXML, foreachMediatorFactory, foreachMediatorSerializer));
assertTrue(serialization(inputXML, foreachMediatorSerializer));
}
}

0 comments on commit 8b9da60

Please sign in to comment.