diff --git a/modules/core/src/main/java/org/apache/synapse/SequenceFlowObserver.java b/modules/core/src/main/java/org/apache/synapse/SequenceFlowObserver.java new file mode 100644 index 0000000000..15e17edd7d --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/SequenceFlowObserver.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse; + +public interface SequenceFlowObserver { + + /** + * Set the observer name + * + * @param name handler name + */ + void setName(String name); + + /** + * This method should implement the logic to run at the start of the flow + */ + void start(MessageContext synCtx, String seqName); + + /** + * This method should implement the logic to run at the end of the flow + */ + void complete(MessageContext synCtx, String seqName); + +} diff --git a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java index 93f02bf81e..07c061e189 100644 --- a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java @@ -171,6 +171,8 @@ public static final class Axis2Param { /** The name of the synapse handlers file */ public static final String SYNAPSE_HANDLER_FILE = "synapse-handlers.xml"; + public static final String SEQUENCE_OBSERVERS_FILE = "sequence-observers.xml"; + /** the name of the property used for synapse library based class loading */ public static final String SYNAPSE_LIB_LOADER = "synapse.lib.classloader"; /** conf directory name **/ diff --git a/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java b/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java new file mode 100644 index 0000000000..94ef9cadc1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java @@ -0,0 +1,111 @@ +/** + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config; + +import org.apache.axiom.om.OMElement; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.SequenceFlowObserver; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseException; +import org.apache.synapse.commons.util.MiscellaneousUtil; + +import javax.xml.namespace.QName; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class SequenceFlowObserversLoader { + + private static final QName ROOT_Q = new QName("observers"); + private static final QName OBSERVER_Q = new QName("observer"); + private static final QName CLASS_Q = new QName("class"); + private static final QName NAME_ATT = new QName("name"); + + private static Log log = LogFactory.getLog(SequenceFlowObserversLoader.class); + + public static List loadObservers() { + List observers = new ArrayList<>(); + OMElement observersConfig = + MiscellaneousUtil.loadXMLConfig(SynapseConstants.SEQUENCE_OBSERVERS_FILE); + if (observersConfig != null) { + + if (!ROOT_Q.equals(observersConfig.getQName())) { + handleException("Invalid sequence observer configuration file"); + } + + Iterator iterator = observersConfig.getChildrenWithName(OBSERVER_Q); + while (iterator.hasNext()) { + OMElement observerElem = (OMElement) iterator.next(); + + String name = null; + if (observerElem.getAttribute(NAME_ATT) != null) { + name = observerElem.getAttributeValue(NAME_ATT); + } else { + handleException("Name not defined in one or more sequence observer"); + } + + if (observerElem.getAttribute(CLASS_Q) != null) { + String className = observerElem.getAttributeValue(CLASS_Q); + if (StringUtils.isNotBlank(className)) { + SequenceFlowObserver observer = createObserver(className); + if (observer != null) { + observers.add(observer); + observer.setName(name); + } + } else { + handleException("Class name is null for sequence observer name : " + name); + } + } else { + handleException("Class name not defined for sequence observer named : " + name); + } + + } + } + return observers; + } + + private static SequenceFlowObserver createObserver(String classFQName) { + Object obj = null; + try { + obj = Class.forName(classFQName).newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + handleException("Error creating Sequence observer for class name : " + classFQName, e); + } + + if (obj instanceof SequenceFlowObserver) { + return (SequenceFlowObserver) obj; + } else { + handleException("Error creating Sequence observer. The Sequence observer should be of type " + + "org.apache.synapse.SequenceFlowObserver"); + } + return null; + } + + private static void handleException(String msg) { + log.error(msg); + throw new SynapseException(msg); + } + + private static void handleException(String msg, Exception ex) { + log.error(msg, ex); + throw new SynapseException(msg, ex); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java b/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java index e5761fb54c..b2572617a8 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java +++ b/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java @@ -21,6 +21,7 @@ import org.apache.axiom.util.blob.OverflowBlob; import org.apache.synapse.MessageContext; +import org.apache.synapse.SequenceFlowObserver; import org.apache.synapse.ServerContextInformation; import org.apache.synapse.SynapseHandler; import org.apache.synapse.aspects.flow.statistics.store.MessageDataStore; @@ -232,6 +233,20 @@ public interface SynapseEnvironment { */ public void registerSynapseHandler(SynapseHandler handler); + /** + * Get all sequence observers + * + * @return list of sequence observers + */ + public List getSequenceObservers(); + + /** + * Register a sequence observer to the synapse environment + * + * @param observer sequence observer + */ + public void registerSequenceObservers(SequenceFlowObserver observer); + /** * Get the global timeout interval for callbacks * diff --git a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java index 00d8dd0ec9..5c63c1efa1 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java +++ b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java @@ -33,6 +33,7 @@ import org.apache.synapse.ContinuationState; import org.apache.synapse.Mediator; import org.apache.synapse.MessageContext; +import org.apache.synapse.SequenceFlowObserver; import org.apache.synapse.ServerContextInformation; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; @@ -46,6 +47,7 @@ import org.apache.synapse.carbonext.TenantInfoConfigurator; import org.apache.synapse.commons.json.JsonUtil; import org.apache.synapse.commons.util.ext.TenantInfoInitiator; +import org.apache.synapse.config.SequenceFlowObserversLoader; import org.apache.synapse.config.SynapseConfigUtils; import org.apache.synapse.config.SynapseConfiguration; import org.apache.synapse.config.SynapseHandlersLoader; @@ -100,6 +102,7 @@ public class Axis2SynapseEnvironment implements SynapseEnvironment { private SynapseTaskManager taskManager; private RESTRequestHandler restHandler; private List synapseHandlers; + private List sequenceObservers; private long globalTimeout = SynapseConstants.DEFAULT_GLOBAL_TIMEOUT; private SynapseDebugManager synapseDebugManager; @@ -207,6 +210,7 @@ public Axis2SynapseEnvironment(SynapseConfiguration synCfg) { restHandler = new RESTRequestHandler(); synapseHandlers = SynapseHandlersLoader.loadHandlers(); + sequenceObservers = SequenceFlowObserversLoader.loadObservers(); this.globalTimeout = SynapseConfigUtils.getGlobalTimeoutInterval(); @@ -1053,6 +1057,26 @@ public void registerSynapseHandler(SynapseHandler handler) { synapseHandlers.add(handler); } + /** + * Get all sequence observers + * + * @return list of sequence observers + */ + @Override + public List getSequenceObservers() { + return sequenceObservers; + } + + /** + * Register a sequence observer to the synapse environment + * + * @param observer sequence observer + */ + @Override + public void registerSequenceObservers(SequenceFlowObserver observer) { + sequenceObservers.add(observer); + } + @Override public long getGlobalTimeout() { return globalTimeout; diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java index 9ae14b1951..ead1f90a90 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java @@ -26,6 +26,7 @@ import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.Mediator; import org.apache.synapse.MessageContext; +import org.apache.synapse.SequenceFlowObserver; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; import org.apache.synapse.SynapseLog; @@ -34,6 +35,7 @@ import org.apache.synapse.config.SynapsePropertiesLoader; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.mediators.base.SequenceMediator; import org.apache.synapse.transport.passthru.PassThroughConstants; import org.apache.synapse.transport.passthru.util.RelayUtils; import org.apache.synapse.transport.util.MessageHandlerProvider; @@ -80,6 +82,12 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) { // to pass it on; else, do nothing -> i.e. let the parents state flow setEffectiveTraceState(synCtx); int myEffectiveTraceState = synCtx.getTracingState(); + if (this instanceof SequenceMediator & mediatorPosition == 0) { + List observers = synCtx.getEnvironment().getSequenceObservers(); + for (SequenceFlowObserver observer : observers) { + observer.start(synCtx, ((SequenceMediator) this).getName()); + } + } try { SynapseLog synLog = getLog(synCtx); if (synLog.isTraceOrDebugEnabled()) { @@ -104,6 +112,14 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) { returnVal = false; break; } + if (i == mediators.size() - 1) { + if (this instanceof SequenceMediator) { + List observers = synCtx.getEnvironment().getSequenceObservers(); + for (SequenceFlowObserver observer : observers) { + observer.complete(synCtx, ((SequenceMediator) this).getName()); + } + } + } mediator.reportCloseStatistics(synCtx, statisticReportingIndex); } else { synCtx.setTracingState(myEffectiveTraceState); @@ -111,6 +127,14 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) { returnVal = false; break; } + if (i == mediators.size() - 1) { + if (this instanceof SequenceMediator) { + List observers = synCtx.getEnvironment().getSequenceObservers(); + for (SequenceFlowObserver observer : observers) { + observer.complete(synCtx, ((SequenceMediator) this).getName()); + } + } + } } } } catch (SynapseException synEx) {