Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce retries when pinging servers. #155

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
this._pool = poolManager.getEVCacheClientPool(_appName);
});

_pool.pingServers();
_pool.pingServers(true);

setupMonitoring();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public String getStatusCode(StatusCode sc) {
public static final String INTERNAL_POOL_SG_CONFIG = "internal.evc.client.pool.asg.config";
public static final String INTERNAL_POOL_CONFIG = "internal.evc.client.pool.config";
public static final String INTERNAL_POOL_REFRESH = "internal.evc.client.pool.refresh";
public static final String INTERNAL_PING_SERVER = "internal.evc.client.ping.server";
public static final String INTERNAL_PING_SERVER_FAILURES = "internal.evc.client.ping.server.failures";

public static final String INTERNAL_BOOTSTRAP_EUREKA = "internal.evc.client.pool.bootstrap.eureka";

Expand All @@ -309,7 +311,6 @@ public String getStatusCode(StatusCode sc) {
public static final String POOL_REFRESH_ASYNC = "refreshAsync";
public static final String POOL_OPERATIONS = "operations";


/**
* Metric Tags Names
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
Expand Down Expand Up @@ -979,7 +976,7 @@ private void updateMemcachedReadInstancesByZone() {
}

private void cleanupMemcachedInstances(boolean force) {
pingServers();
pingServers(false);
for (Iterator<Entry<ServerGroup, List<EVCacheClient>>> it = memcachedInstancesByServerGroup.entrySet().iterator(); it.hasNext();) {
final Entry<ServerGroup, List<EVCacheClient>> serverGroupEntry = it.next();
final List<EVCacheClient> instancesInAServerGroup = serverGroupEntry.getValue();
Expand Down Expand Up @@ -1109,7 +1106,7 @@ private synchronized void refresh(boolean force) throws IOException {
}
updateMemcachedReadInstancesByZone();
updateQueueStats();
if (_pingServers.get()) pingServers();
if (_pingServers.get()) pingServers(false);
} catch (Throwable t) {
log.error("Exception while refreshing the Server list", t);
} finally {
Expand Down Expand Up @@ -1167,23 +1164,91 @@ private void updateQueueStats() {
}
}

public void pingServers() {
public void pingServers(Boolean bootTimeCheck) {
final long start = System.currentTimeMillis();

try {
final Map<ServerGroup, List<EVCacheClient>> allServers = getAllInstancesByZone();

for (Entry<ServerGroup, List<EVCacheClient>> entry : allServers.entrySet()) {
final List<EVCacheClient> listOfClients = entry.getValue();
for (EVCacheClient client : listOfClients) {
final Map<SocketAddress, String> versions = client.getVersions();
for (Entry<SocketAddress, String> vEntry : versions.entrySet()) {
if (log.isDebugEnabled()) log.debug("Host : " + vEntry.getKey() + " : " + vEntry.getValue());
if (!bootTimeCheck) {
// Just log versions and continue if not a boot time check
try {
final Map<SocketAddress, String> versions = client.getVersions();
if (log.isDebugEnabled()) {
for (Entry<SocketAddress, String> vEntry : versions.entrySet()) {
log.debug("Host : {} Version : {}", vEntry.getKey(), vEntry.getValue());
}
}
continue;
} catch (Exception e) {
log.warn("Error getting versions for client: {}", client, e);
continue;
}
}

long startTime = System.currentTimeMillis();
long timeoutMs = 3000; // 3 seconds
boolean success = false;

while (System.currentTimeMillis() - startTime < timeoutMs && !success) {
try {
final Map<SocketAddress, String> versions = client.getVersions();
boolean allNodesOk = true;

for (Entry<SocketAddress, String> vEntry : versions.entrySet()) {
String version = vEntry.getValue();
if (!version.matches("\\d+\\.\\d+\\.\\d+")) {
allNodesOk = false;
log.warn("Node not ready or invalid version: {}, response: {}",
vEntry.getKey(), version);
break;
}
}

if (allNodesOk) {
if (log.isDebugEnabled()) {
for (Entry<SocketAddress, String> vEntry : versions.entrySet()) {
log.debug("Host : {} Version : {}", vEntry.getKey(), vEntry.getValue());
}
}
success = true;
break;
}

Thread.sleep(100); // 100ms delay between retries
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while pinging servers for client: {}", client);
break;
} catch (Exception e) {
log.warn("Error while pinging server for client: {}", client, e);
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}

if (!success && bootTimeCheck) {
log.warn("Failed to get valid version from client {} within timeout", client);
EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_PING_SERVER_FAILURES,tagList).increment();
}
}
}

if (duetClientPool != null)
duetClientPool.pingServers();
if (duetClientPool != null) {
duetClientPool.pingServers(false);
}
} catch (Throwable t) {
log.error("Error while pinging the servers", t);
log.warn("Error while pinging the servers", t);
EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_PING_SERVER_FAILURES,tagList).increment();
} finally {
EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.INTERNAL_PING_SERVER, tagList, Duration.ofMillis(100)).record(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
}
}

Expand Down
Loading