Skip to content

Commit

Permalink
Merge branch 'wso2:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
shnrndk authored Nov 6, 2023
2 parents 97d23a3 + 218ed0c commit 3848c18
Show file tree
Hide file tree
Showing 73 changed files with 1,281 additions and 275 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ WSO2 developers can be contacted via the mailing lists:

| Branch | Build Status |
| :------------ |:-------------
| wso2-synapse master | [![Build Status](https://wso2.org/jenkins/view/wso2-dependencies/job/forked-dependencies/job/wso2-synapse/badge/icon)](https://wso2.org/jenkins/view/wso2-dependencies/job/forked-dependencies/job/wso2-synapse)
| wso2-synapse master | [![Build Status](https://wso2.org/jenkins/job/forked-dependencies/job/wso2-synapse/badge/icon)](https://wso2.org/jenkins/job/forked-dependencies/job/wso2-synapse/)
2 changes: 1 addition & 1 deletion modules/commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.synapse</groupId>
<artifactId>Apache-Synapse</artifactId>
<version>4.0.0-wso2v44-SNAPSHOT</version>
<version>4.0.0-wso2v61-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,15 @@ public static void transformElement(OMElement element, boolean processAttrbs,
}

private static void removeIndentations(OMElement elem) {
Iterator children = elem.getChildren();
Iterator children;
try {
children = elem.getChildren();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while removing indentations from OMElement. Error>>> " + e.getLocalizedMessage());
}
return;
}
while (children.hasNext()) {
OMNode child = (OMNode) children.next();
if (child instanceof OMText) {
Expand All @@ -678,7 +686,15 @@ private static void removeIndentations(OMElement elem) {
* @param properties Required properties
*/
private static void removeIndentations(OMElement elem, Map properties) {
Iterator children = elem.getChildren();
Iterator children;
try {
children = elem.getChildren();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while removing indentations from OMElement. Error>>> " + e.getLocalizedMessage());
}
return;
}
boolean preserveSpaces = properties.get(Constants.PRESERVE_SPACES) != null
&& Boolean.parseBoolean((String) properties.get(Constants.PRESERVE_SPACES));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ private void consumeName(ScopeInfo info) throws XMLStreamException, IOException
}
}
source.endObject();
} else if (source.peek() == JsonStreamToken.START_OBJECT) {
source.startObject();
while (source.peek() == JsonStreamToken.NAME) {
readAttrNsDecl(source.name(), source.value().text);
}
source.endObject();
} else {
throw new IllegalStateException("Expected attribute value");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.commons.throttle.core.internal.DistributedThrottleProcessor;
import org.apache.synapse.commons.throttle.core.internal.ThrottleServiceDataHolder;

import java.io.Serializable;
import java.util.UUID;
Expand All @@ -43,13 +45,22 @@ public abstract class CallerContext implements Serializable, Cloneable {
private long nextTimeWindow = 0;
/* The globalCount to keep track number of request */
private AtomicLong globalCount = new AtomicLong(0);

private long localQuota;
private String roleId;
private long unitTime;
/*
This is specific for each API EP
if this is true, then syncing of throttle parameter with global redis counters be synced will be done in sync mode
*/
private boolean isThrottleParamSyncingModeSync;
private ThrottleProperties throttleProperties;


/**
* Count to keep track of local (specific to this node) number of requests
*/
private AtomicLong localCount = new AtomicLong(0);
private AtomicLong localHits = new AtomicLong(0);

/**
* Used for debugging purposes. *
Expand Down Expand Up @@ -83,6 +94,10 @@ public CallerContext(String ID) {
throw new InstantiationError("Couldn't create a CallContext for an empty " +
"remote caller ID");
}
if (throttleProperties == null) {
throttleProperties = ThrottleServiceDataHolder.getInstance().getThrottleProperties();
}

this.id = ID.trim();
}

Expand Down Expand Up @@ -111,7 +126,10 @@ private void initAccess(CallerConfiguration configuration, ThrottleContext throt
this.roleId = configuration.getID();
//Also we need to pick counter value associated with time window.
throttleContext.addCallerContext(this, this.id);
throttleContext.replicateTimeWindow(this.id);

if (!ThrottleServiceDataHolder.getInstance().getThrottleProperties().isThrottleSyncAsyncHybridModeEnabled()) {
throttleContext.replicateTimeWindow(this.id);
}
}

/**
Expand Down Expand Up @@ -140,7 +158,6 @@ private boolean canAccessIfUnitTimeNotOver(CallerConfiguration configuration,
// Send the current state to others (clustered env)
throttleContext.flushCallerContext(this, id);
// can complete access

} else {
//else , if caller has not already prohibit
if (this.nextAccessTime == 0) {
Expand Down Expand Up @@ -190,7 +207,7 @@ private boolean canAccessIfUnitTimeNotOver(CallerConfiguration configuration,
throttleContext.replicateTimeWindow(this.id);
throttleContext.addAndFlushCallerContext(this, this.id);

if(log.isDebugEnabled()) {
if (log.isDebugEnabled()) {
log.debug("Caller=" + this.getId() + " has reset counters and added for replication when unit "
+ "time is not over");
}
Expand Down Expand Up @@ -356,6 +373,7 @@ public void cleanUpCallers(CallerConfiguration configuration,
*/
public boolean canAccess(ThrottleContext throttleContext, CallerConfiguration configuration,
long currentTime) throws ThrottleException {
RequestContext requestContext = new RequestContext(currentTime);
boolean canAccess;
if (configuration == null) {
if (log.isDebugEnabled()) {
Expand All @@ -374,16 +392,35 @@ public boolean canAccess(ThrottleContext throttleContext, CallerConfiguration co
initAccess(configuration, throttleContext, currentTime);
}
// if unit time period (session time) is not over
if (this.nextTimeWindow > currentTime) {
canAccess = canAccessIfUnitTimeNotOver(configuration, throttleContext, currentTime);
} else {
canAccess = canAccessIfUnitTimeOver(configuration, throttleContext, currentTime);
if (log.isDebugEnabled()) {
log.debug("### NEW REQUEST RECEIVED ! - currentTime: " + currentTime);
}

DistributedThrottleProcessor distributedThrottleProcessor = ThrottleServiceDataHolder.getInstance()
.getDistributedThrottleProcessor();
if (distributedThrottleProcessor != null && distributedThrottleProcessor.isEnable()) {
long startTime = System.currentTimeMillis();
canAccess = distributedThrottleProcessor.canAccessBasedOnUnitTime(this, configuration, throttleContext,
requestContext);
long duration = System.currentTimeMillis() - startTime;
if (log.isDebugEnabled()) {
log.debug("LATENCY FOR THROTTLE PROCESSING: " + duration + " ms");
}
} else {
canAccess = canAccessBasedOnUnitTime(configuration, throttleContext, currentTime);
}
return canAccess;
}

private boolean canAccessBasedOnUnitTime(CallerConfiguration configuration, ThrottleContext throttleContext, long currentTime) {
if (this.nextTimeWindow > currentTime) {
return canAccessIfUnitTimeNotOver(configuration, throttleContext, currentTime);
} else {
return canAccessIfUnitTimeOver(configuration, throttleContext, currentTime);
}
}


/**
* Returns the next time window
*
Expand All @@ -410,13 +447,28 @@ public void setGlobalCounter(long counter) {
}

public void setLocalCounter(long counter) {
if (log.isTraceEnabled()) {
log.trace("changing local counter from:" + localCount.get() + " to:" + counter);
}
localCount.set(counter);
}

public long getLocalCounter() {
return localCount.get();
}

public void setLocalHits(long counter) {
localHits.set(counter);
}

public long getLocalHits() {
return localHits.get();
}

public void incrementLocalHits() {
localHits.incrementAndGet();
}

public void resetLocalCounter() {
localCount.set(0);
}
Expand Down Expand Up @@ -459,4 +511,28 @@ public String getRoleId() {
public void setRoleId(String roleId) {
this.roleId = roleId;
}

public void setIsThrottleParamSyncingModeSync(boolean isThrottleParamSyncingModeSync) {
this.isThrottleParamSyncingModeSync = isThrottleParamSyncingModeSync;
}

public boolean isThrottleParamSyncingModeSync() {
return isThrottleParamSyncingModeSync;
}

public long getLocalQuota() {
return localQuota;
}

public void setLocalQuota(long localQuota) {
this.localQuota = localQuota;
}

public long getNextAccessTime() {
return nextAccessTime;
}

public void setNextAccessTime(long nextAccessTime) {
this.nextAccessTime = nextAccessTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public interface DistributedCounterManager {
*/
public void setCounter(String key, long value);

/**
* Sets the Distributed counter with the given value while setting expiry time too.
*
* @param key counter key name
* @param value counter value
* @param expiryTime expiry time in milliseconds
*/
public void setCounterWithExpiry(String key, long value, long expiryTime);
/**
* This method used to add and return the distributed counter value.
*
Expand All @@ -55,14 +63,23 @@ public interface DistributedCounterManager {
public void removeCounter(String key);

/**
* This method used to update distributed counter asynchronously.
* This method is used to get and then increment distributed counter asynchronously.
*
* @param key key to check in distributed map.
* @param value value to add to distributed counter.
* @return the original distributed counter value.
*/
public long asyncGetAndAddCounter(String key, long value);

/**
* This method is used to increment distributed counter asynchronously.
*
* @param key key to update in distributed map.
* @param value value to increment
* @return the updated distributed counter value.
*/
public long asyncAddCounter(String key, long value);

/**
* This method used to alter the DistributedCounter.
*
Expand All @@ -73,11 +90,21 @@ public interface DistributedCounterManager {
public long asyncGetAndAlterCounter(String key, long value);

/**
* This method returns shared TimeStamp of distributed Key.
* This method is used to get and then alter and then set expiry time of the DistributedCounter.
*
* @param key key to check in distributed map.
* @return timestamp value of key.
* @param key key to alter in distributed counter.
* @param value value to alter in distributed counter.
* @param expiryTimeStamp expiry time to set.
* @return the original distributed counter value.
*/
public long asyncGetAlterAndSetExpiryOfCounter(String key, long value, long expiryTimeStamp);

/**
* This method returns shared TimeStamp of distributed Key.
*
* @param key key to check in distributed map.
* @return timestamp value of key.
*/
public long getTimestamp(String key);

/**
Expand All @@ -88,6 +115,15 @@ public interface DistributedCounterManager {
*/
public void setTimestamp(String key, long timeStamp);

/**
* This method set the Timestamp to distributed map with an expiry time.
*
* @param key key to add in distributed map.
* @param timeStamp timestamp to add.
* @param expiryTimeStamp expiry timestamp to set
*/
public void setTimestampWithExpiry(String key, long timeStamp, long expiryTimeStamp);

/**
* This method removes the timestamp relevant to key.
*
Expand All @@ -100,4 +136,14 @@ public interface DistributedCounterManager {
public String getType();

void setExpiry(String key, long expiryTimeStamp);

public long getTtl(String key);

public long setLock(String key, String value);

public boolean setLockWithExpiry(String key, String value, long expiryTimeStamp);

public long getKeyLockRetrievalTimeout();

public void removeLock(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.commons.throttle.core;

public class RequestContext {
private long requestTime;

public RequestContext(long requestTime) {
this.requestTime = System.currentTimeMillis();
}

public long getRequestTime() {
return requestTime;
}

public void setRequestTime(long requestTime) {
this.requestTime = requestTime;
}
}
Loading

0 comments on commit 3848c18

Please sign in to comment.