Skip to content

Commit

Permalink
perf: perform the getPrimary lookup only once during the bulk process…
Browse files Browse the repository at this point in the history
…ing path

We can avoid doing the (potentially) expensive getPrimary call on the bulk get path with
a little refactor. This retains the same functionality but pushes the validation logic
and error emit into a callback, which allows metrics to work as-is.
  • Loading branch information
jasonk000 committed Nov 18, 2024
1 parent 11b47ec commit 72803eb
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.function.BiPredicate;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -187,39 +188,36 @@ public Integer getMaxHashLength() {
return this.maxHashLength.get();
}

private Collection<String> validateReadQueueSize(Collection<String> canonicalKeys, EVCache.Call call) {
if (evcacheMemcachedClient.getNodeLocator() == null) return canonicalKeys;
final Collection<String> retKeys = new ArrayList<>(canonicalKeys.size());
for (String key : canonicalKeys) {
final MemcachedNode node = evcacheMemcachedClient.getNodeLocator().getPrimary(key);
if (node instanceof EVCacheNode) {
final EVCacheNode evcNode = (EVCacheNode) node;
if (!evcNode.isAvailable(call)) {
continue;
}
private boolean validateReadQueueSize(MemcachedNode node, String key, EVCache.Call call) {
if (!(node instanceof EVCacheNode)) {
return true;
}

final int size = evcNode.getReadQueueSize();
final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2);
// if (log.isDebugEnabled()) log.debug("Bulk Current Read Queue
// Size - " + size + " for app " + appName + " & zone " + zone +
// " ; node " + node);
if (!canAddToOpQueue) {
final String hostName;
if(evcNode.getSocketAddress() instanceof InetSocketAddress) {
hostName = ((InetSocketAddress)evcNode.getSocketAddress()).getHostName();
} else {
hostName = evcNode.getSocketAddress().toString();
}
final EVCacheNode evcNode = (EVCacheNode) node;
if (!evcNode.isAvailable(call)) {
return false;
}

incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName);
if (log.isDebugEnabled()) log.debug("Read Queue Full on Bulk Operation for app : " + appName
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get() * 2);
} else {
retKeys.add(key);
}
final int size = evcNode.getReadQueueSize();
final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2);

if (!canAddToOpQueue) {
final String hostName;
if (evcNode.getSocketAddress() instanceof InetSocketAddress) {
hostName = ((InetSocketAddress) evcNode.getSocketAddress()).getHostName();
} else {
hostName = evcNode.getSocketAddress().toString();
}

incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName);
if (log.isDebugEnabled()) {
log.debug("Read Queue Full on Bulk Operation for app : " + appName
+ "; zone : " + zone + "; Current Size : " + size
+ "; Max Size : " + maxReadQueueSize.get() * 2);
}
}
return retKeys;

return canAddToOpQueue;
}

private void incrementFailure(String metric, EVCache.Call call) {
Expand Down Expand Up @@ -966,16 +964,16 @@ public <T> Single<T> getAndTouch(String key, Transcoder<T> transcoder, int timeT
}
}

public <T> Map<String, T> getBulk(Collection<String> _canonicalKeys, Transcoder<T> tc, boolean _throwException,
public <T> Map<String, T> getBulk(Collection<String> canonicalKeys, Transcoder<T> tc, boolean _throwException,
boolean hasZF) throws Exception {
final Collection<String> canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.BULK);
final Map<String, T> returnVal;
try {
if (tc == null) tc = (Transcoder<T>) getTranscoder();
if (enableChunking.get()) {
returnVal = assembleChunks(_canonicalKeys, tc, hasZF);
returnVal = assembleChunks(canonicalKeys, tc, hasZF);
} else {
returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null)
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, key, Call.BULK);
returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator)
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF);
}
} catch (Exception e) {
Expand All @@ -985,24 +983,24 @@ public <T> Map<String, T> getBulk(Collection<String> _canonicalKeys, Transcoder<
return returnVal;
}

public <T> CompletableFuture<Map<String, T>> getAsyncBulk(Collection<String> _canonicalKeys, Transcoder<T> tc) {
final Collection<String> canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.COMPLETABLE_FUTURE_GET_BULK);
public <T> CompletableFuture<Map<String, T>> getAsyncBulk(Collection<String> canonicalKeys, Transcoder<T> tc) {
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, key, Call.COMPLETABLE_FUTURE_GET_BULK);
if (tc == null) tc = (Transcoder<T>) getTranscoder();
return evcacheMemcachedClient
.asyncGetBulk(canonicalKeys, tc, null)
.asyncGetBulk(canonicalKeys, tc, null, validator)
.getAsyncSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS);

}

public <T> Single<Map<String, T>> getBulk(Collection<String> _canonicalKeys, final Transcoder<T> transcoder, boolean _throwException,
public <T> Single<Map<String, T>> getBulk(Collection<String> canonicalKeys, final Transcoder<T> transcoder, boolean _throwException,
boolean hasZF, Scheduler scheduler) {
try {
final Collection<String> canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.BULK);
final Transcoder<T> tc = (transcoder == null) ? (Transcoder<T>) getTranscoder() : transcoder;
if (enableChunking.get()) {
return assembleChunks(_canonicalKeys, tc, hasZF, scheduler);
return assembleChunks(canonicalKeys, tc, hasZF, scheduler);
} else {
return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null)
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, key, Call.BULK);
return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator)
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler);
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -192,6 +193,13 @@ public void complete() {
public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
final Transcoder<T> tc,
EVCacheGetOperationListener<T> listener) {
return asyncGetBulk(keys, tc, listener, (node, key) -> true);
}

public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
final Transcoder<T> tc,
EVCacheGetOperationListener<T> listener,
BiPredicate<MemcachedNode, String> nodeValidator) {
final Map<String, Future<T>> m = new ConcurrentHashMap<String, Future<T>>();

// Break the gets down into groups by key
Expand All @@ -202,7 +210,7 @@ public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
for (String key : keys) {
StringUtils.validateKey(key, opFact instanceof BinaryOperationFactory);
final MemcachedNode primaryNode = locator.getPrimary(key);
if (primaryNode.isActive()) {
if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) {
Collection<String> ks = chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>());
ks.add(key);
}
Expand Down

0 comments on commit 72803eb

Please sign in to comment.