Skip to content

Commit

Permalink
Porting redis throttling improvement related changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rmsamitha committed Sep 19, 2023
1 parent b067702 commit 7a6ad06
Show file tree
Hide file tree
Showing 13 changed files with 477 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.commons.throttle.core.internal.DistributedThrottleProcessor;
import org.apache.synapse.commons.throttle.core.internal.ThrottleServiceDataHolder;

import java.io.Serializable;
import java.util.UUID;
Expand All @@ -43,13 +45,22 @@ public abstract class CallerContext implements Serializable, Cloneable {
private long nextTimeWindow = 0;
/* The globalCount to keep track number of request */
private AtomicLong globalCount = new AtomicLong(0);

private long localQuota;
private String roleId;
private long unitTime;
/*
This is specific for each API EP
if this is true, then syncing of throttle parameter with global redis counters be synced will be done in sync mode
*/
private boolean isThrottleParamSyncingModeSync;
private ThrottleProperties throttleProperties;


/**
* Count to keep track of local (specific to this node) number of requests
*/
private AtomicLong localCount = new AtomicLong(0);
private AtomicLong localHits = new AtomicLong(0);

/**
* Used for debugging purposes. *
Expand Down Expand Up @@ -83,6 +94,10 @@ public CallerContext(String ID) {
throw new InstantiationError("Couldn't create a CallContext for an empty " +
"remote caller ID");
}
if (throttleProperties == null) {
throttleProperties = ThrottleServiceDataHolder.getInstance().getThrottleProperties();
}

this.id = ID.trim();
}

Expand Down Expand Up @@ -111,7 +126,10 @@ private void initAccess(CallerConfiguration configuration, ThrottleContext throt
this.roleId = configuration.getID();
//Also we need to pick counter value associated with time window.
throttleContext.addCallerContext(this, this.id);
throttleContext.replicateTimeWindow(this.id);

if (!ThrottleServiceDataHolder.getInstance().getThrottleProperties().isThrottleSyncAsyncHybridModeEnabled()) {
throttleContext.replicateTimeWindow(this.id);
}
}

/**
Expand Down Expand Up @@ -140,7 +158,6 @@ private boolean canAccessIfUnitTimeNotOver(CallerConfiguration configuration,
// Send the current state to others (clustered env)
throttleContext.flushCallerContext(this, id);
// can complete access

} else {
//else , if caller has not already prohibit
if (this.nextAccessTime == 0) {
Expand Down Expand Up @@ -190,7 +207,7 @@ private boolean canAccessIfUnitTimeNotOver(CallerConfiguration configuration,
throttleContext.replicateTimeWindow(this.id);
throttleContext.addAndFlushCallerContext(this, this.id);

if(log.isDebugEnabled()) {
if (log.isDebugEnabled()) {
log.debug("Caller=" + this.getId() + " has reset counters and added for replication when unit "
+ "time is not over");
}
Expand Down Expand Up @@ -356,6 +373,7 @@ public void cleanUpCallers(CallerConfiguration configuration,
*/
public boolean canAccess(ThrottleContext throttleContext, CallerConfiguration configuration,
long currentTime) throws ThrottleException {
RequestContext requestContext = new RequestContext(currentTime);
boolean canAccess;
if (configuration == null) {
if (log.isDebugEnabled()) {
Expand All @@ -374,16 +392,35 @@ public boolean canAccess(ThrottleContext throttleContext, CallerConfiguration co
initAccess(configuration, throttleContext, currentTime);
}
// if unit time period (session time) is not over
if (this.nextTimeWindow > currentTime) {
canAccess = canAccessIfUnitTimeNotOver(configuration, throttleContext, currentTime);
} else {
canAccess = canAccessIfUnitTimeOver(configuration, throttleContext, currentTime);
if (log.isDebugEnabled()) {
log.debug("### NEW REQUEST RECEIVED ! - currentTime: " + currentTime);
}

DistributedThrottleProcessor distributedThrottleProcessor = ThrottleServiceDataHolder.getInstance()
.getDistributedThrottleProcessor();
if (distributedThrottleProcessor != null && distributedThrottleProcessor.isEnable()) {
long startTime = System.currentTimeMillis();
canAccess = distributedThrottleProcessor.canAccessBasedOnUnitTime(this, configuration, throttleContext,
requestContext);
long duration = System.currentTimeMillis() - startTime;
if (log.isDebugEnabled()) {
log.debug("LATENCY FOR THROTTLE PROCESSING: " + duration + " ms");
}
} else {
canAccess = canAccessBasedOnUnitTime(configuration, throttleContext, currentTime);
}
return canAccess;
}

private boolean canAccessBasedOnUnitTime(CallerConfiguration configuration, ThrottleContext throttleContext, long currentTime) {
if (this.nextTimeWindow > currentTime) {
return canAccessIfUnitTimeNotOver(configuration, throttleContext, currentTime);
} else {
return canAccessIfUnitTimeOver(configuration, throttleContext, currentTime);
}
}


/**
* Returns the next time window
*
Expand All @@ -410,13 +447,28 @@ public void setGlobalCounter(long counter) {
}

public void setLocalCounter(long counter) {
if (log.isTraceEnabled()) {
log.trace("changing local counter from:" + localCount.get() + " to:" + counter);
}
localCount.set(counter);
}

public long getLocalCounter() {
return localCount.get();
}

public void setLocalHits(long counter) {
localHits.set(counter);
}

public long getLocalHits() {
return localHits.get();
}

public void incrementLocalHits() {
localHits.incrementAndGet();
}

public void resetLocalCounter() {
localCount.set(0);
}
Expand Down Expand Up @@ -459,4 +511,28 @@ public String getRoleId() {
public void setRoleId(String roleId) {
this.roleId = roleId;
}

public void setIsThrottleParamSyncingModeSync(boolean isThrottleParamSyncingModeSync) {
this.isThrottleParamSyncingModeSync = isThrottleParamSyncingModeSync;
}

public boolean isThrottleParamSyncingModeSync() {
return isThrottleParamSyncingModeSync;
}

public long getLocalQuota() {
return localQuota;
}

public void setLocalQuota(long localQuota) {
this.localQuota = localQuota;
}

public long getNextAccessTime() {
return nextAccessTime;
}

public void setNextAccessTime(long nextAccessTime) {
this.nextAccessTime = nextAccessTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public interface DistributedCounterManager {
*/
public void setCounter(String key, long value);

/**
* Sets the Distributed counter with the given value while setting expiry time too.
*
* @param key counter key name
* @param value counter value
* @param expiryTime expiry time in milliseconds
*/
public void setCounterWithExpiry(String key, long value, long expiryTime);
/**
* This method used to add and return the distributed counter value.
*
Expand All @@ -55,14 +63,23 @@ public interface DistributedCounterManager {
public void removeCounter(String key);

/**
* This method used to update distributed counter asynchronously.
* This method is used to get and then increment distributed counter asynchronously.
*
* @param key key to check in distributed map.
* @param value value to add to distributed counter.
* @return the original distributed counter value.
*/
public long asyncGetAndAddCounter(String key, long value);

/**
* This method is used to increment distributed counter asynchronously.
*
* @param key key to update in distributed map.
* @param value value to increment
* @return the updated distributed counter value.
*/
public long asyncAddCounter(String key, long value);

/**
* This method used to alter the DistributedCounter.
*
Expand All @@ -73,11 +90,21 @@ public interface DistributedCounterManager {
public long asyncGetAndAlterCounter(String key, long value);

/**
* This method returns shared TimeStamp of distributed Key.
* This method is used to get and then alter and then set expiry time of the DistributedCounter.
*
* @param key key to check in distributed map.
* @return timestamp value of key.
* @param key key to alter in distributed counter.
* @param value value to alter in distributed counter.
* @param expiryTimeStamp expiry time to set.
* @return the original distributed counter value.
*/
public long asyncGetAlterAndSetExpiryOfCounter(String key, long value, long expiryTimeStamp);

/**
* This method returns shared TimeStamp of distributed Key.
*
* @param key key to check in distributed map.
* @return timestamp value of key.
*/
public long getTimestamp(String key);

/**
Expand All @@ -88,6 +115,15 @@ public interface DistributedCounterManager {
*/
public void setTimestamp(String key, long timeStamp);

/**
* This method set the Timestamp to distributed map with an expiry time.
*
* @param key key to add in distributed map.
* @param timeStamp timestamp to add.
* @param expiryTimeStamp expiry timestamp to set
*/
public void setTimestampWithExpiry(String key, long timeStamp, long expiryTimeStamp);

/**
* This method removes the timestamp relevant to key.
*
Expand All @@ -100,4 +136,14 @@ public interface DistributedCounterManager {
public String getType();

void setExpiry(String key, long expiryTimeStamp);

public long getTtl(String key);

public long setLock(String key, String value);

public boolean setLockWithExpiry(String key, String value, long expiryTimeStamp);

public long getKeyLockRetrievalTimeout();

public void removeLock(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.commons.throttle.core;

public class RequestContext {
private long requestTime;

public RequestContext(long requestTime) {
this.requestTime = System.currentTimeMillis();
}

public long getRequestTime() {
return requestTime;
}

public void setRequestTime(long requestTime) {
this.requestTime = requestTime;
}
}
Loading

0 comments on commit 7a6ad06

Please sign in to comment.