Skip to content

Commit

Permalink
AuroraClusterMonitor reliability changes
Browse files Browse the repository at this point in the history
This updates the transitive dependencies and attempts to change AuroraClusterMonitor logic for improved reliability.
There are several changes involved in this:
* First the connection is checked for isValid before it used to check master / slave status.  This hopefully prevents errors from invalid connections which might remove the server from selection

* Errors during connection are now delayed.  This allows the monitor to be constructed while some of the cluster is unhealthy.  The delayed error will still be found and logged quickly, it just wont prevent the monitor from being constructed (which may otherwise prevent an application from being usable / startup)

* Errors during regular scheduled state check are handled differently.  If an error occurs from an expedited state check the health status will still transition immediately.  This is desired because a connection has already witnessed an error so we should treat that seriously.  However if an error occurs from a regular check it will be ignored initially, with a new check scheduled quickly.  It's possible that new connections can't be established but existing ones are working, so we are attempting to delay impacting those existing connections use.  In the future we may want to inspect the error type to make even more decisions here.

* Added the ability to get notified on every cluster state check error, but only log when the error state changes
  • Loading branch information
jentfoo committed May 22, 2019
1 parent e189bb7 commit e1c650f
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .classpath
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<classpathentry kind="lib" path="build/dependencies/byte-buddy-1.7.9.jar"/>
<classpathentry kind="lib" path="build/dependencies/byte-buddy-agent-1.7.9.jar"/>
<classpathentry kind="lib" path="build/dependencies/mockito-core-2.13.0.jar"/>
<classpathentry kind="lib" path="mysqlAuroraArc/build/dependencies/mysql-connector-java-8.0.15.jar"/>
<classpathentry kind="lib" path="arcCommon/build/dependencies/HikariCP-3.3.1.jar"/>
<classpathentry kind="lib" path="mysqlAuroraArc/build/dependencies/mysql-connector-java-8.0.16.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@
* The connection will appear valid (from {@link #isValid(int)} and non-closed UNTIL the exception
* is thrown. After that point the connection will appear as if it was closed.
*
* @since 0.9
* @since 0.10
*/
public class ErrorSqlConnection implements Connection {
abstract class AbstractErrorSqlConnection implements Connection {
private final Runnable errorThrownListener;
private final SQLException sqlError;
private final RuntimeException runtimeError;
private volatile boolean closed = false;
private volatile boolean errorThrown = false;

/**
* Construct a new {@link ErrorSqlConnection}.
* Construct a new {@link AbstractErrorSqlConnection}.
*
* @param errorThrownListener Listener to be invoked when error is realized (ie thrown)
* @param error Error to throw once Connection is attempted to be used
*/
public ErrorSqlConnection(Runnable errorThrownListener, SQLException error) {
public AbstractErrorSqlConnection(Runnable errorThrownListener, SQLException error) {
ArgumentVerifier.assertNotNull(error, "error");

if (errorThrownListener == null) {
Expand All @@ -57,12 +57,12 @@ public ErrorSqlConnection(Runnable errorThrownListener, SQLException error) {
}

/**
* Construct a new {@link ErrorSqlConnection}.
* Construct a new {@link AbstractErrorSqlConnection}.
*
* @param errorThrownListener Listener to be invoked when error is realized (ie thrown)
* @param error Error to throw once Connection is attempted to be used
*/
public ErrorSqlConnection(Runnable errorThrownListener, RuntimeException error) {
public AbstractErrorSqlConnection(Runnable errorThrownListener, RuntimeException error) {
ArgumentVerifier.assertNotNull(error, "error");

if (errorThrownListener == null) {
Expand Down Expand Up @@ -95,11 +95,6 @@ public boolean isClosed() {
return errorThrown || closed;
}

@Override
public boolean isValid(int timeout) {
return ! isClosed();
}

@Override
public void setClientInfo(Properties arg0) throws SQLClientInfoException {
// ignored
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.threadly.db;

import java.sql.Connection;
import java.sql.SQLException;

/**
* Implementation of {@link Connection} which is perpetually in a state of error. Any operation on
* this connection will result in an exception being thrown.
* <p>
* The connection will appear invalid when checked by {@link #isValid(int)}, always returning
* {@code false}.
*
* @since 0.10
*/
public class ErrorInvalidSqlConnection extends AbstractErrorSqlConnection {
/**
* Construct a new {@link ErrorInvalidSqlConnection}.
*
* @param errorThrownListener Listener to be invoked when error is realized (ie thrown)
* @param error Error to throw once Connection is attempted to be used
*/
public ErrorInvalidSqlConnection(Runnable errorThrownListener, SQLException error) {
super(errorThrownListener, error);
}

/**
* Construct a new {@link ErrorInvalidSqlConnection}.
*
* @param errorThrownListener Listener to be invoked when error is realized (ie thrown)
* @param error Error to throw once Connection is attempted to be used
*/
public ErrorInvalidSqlConnection(Runnable errorThrownListener, RuntimeException error) {
super(errorThrownListener, error);
}

@Override
public boolean isValid(int timeout) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.threadly.db;

import java.sql.Connection;
import java.sql.SQLException;

/**
* Implementation of {@link Connection} which is perpetually in a state of error. Any operation on
* this connection will result in an exception being thrown.
* <p>
* The connection will appear valid (from {@link #isValid(int)} and non-closed UNTIL the exception
* is thrown. After that point the connection will appear as if it was closed.
*
* @since 0.10
*/
public class ErrorValidSqlConnection extends AbstractErrorSqlConnection {
/**
* Construct a new {@link ErrorValidSqlConnection}.
*
* @param errorThrownListener Listener to be invoked when error is realized (ie thrown)
* @param error Error to throw once Connection is attempted to be used
*/
public ErrorValidSqlConnection(Runnable errorThrownListener, SQLException error) {
super(errorThrownListener, error);
}

/**
* Construct a new {@link ErrorValidSqlConnection}.
*
* @param errorThrownListener Listener to be invoked when error is realized (ie thrown)
* @param error Error to throw once Connection is attempted to be used
*/
public ErrorValidSqlConnection(Runnable errorThrownListener, RuntimeException error) {
super(errorThrownListener, error);
}

@Override
public boolean isValid(int timeout) {
return ! isClosed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.threadly.concurrent.CentralThreadlyPool;
import org.threadly.concurrent.ReschedulingOperation;
import org.threadly.concurrent.SchedulerService;
import org.threadly.concurrent.SubmitterScheduler;
import org.threadly.db.ErrorInvalidSqlConnection;
import org.threadly.db.aurora.DelegateAuroraDriver.IllegalDriverStateException;
import org.threadly.util.ArgumentVerifier;
import org.threadly.util.ExceptionHandler;

/**
* Class which monitors a "cluster" of aurora servers. It is expected that for each given cluster
Expand All @@ -32,16 +36,19 @@
*/
public class AuroraClusterMonitor {
protected static final Logger LOG = Logger.getLogger(AuroraClusterMonitor.class.getSimpleName());
protected static final int CONNECTION_VALID_CHECK_TIMEOUT = 10_000;

protected static final int MAXIMUM_THREAD_POOL_SIZE = 64;
protected static final SchedulerService MONITOR_SCHEDULER;
protected static final ConcurrentMap<AuroraServersKey, AuroraClusterMonitor> MONITORS;
private static volatile long checkFrequencyMillis = 500;
private static volatile BiConsumer<AuroraServer, Throwable> errorExceptionHandler;

static {
MONITOR_SCHEDULER = CentralThreadlyPool.threadPool(MAXIMUM_THREAD_POOL_SIZE, "auroraMonitor");

MONITORS = new ConcurrentHashMap<>();
setExceptionHandler(null);
}

/**
Expand All @@ -66,6 +73,21 @@ public static void setServerCheckDelayMillis(long millis) {
}
}
}

/**
* Set an {@link ExceptionHandler} to be invoked on EVERY error when checking the cluster state.
* By default state changes will be logged, but due to potential high volumes logs will only
* occur on server state changes. This sets an {@link ExceptionHandler} which will be invoked on
* any error, even if the server is already in an unhealthy condition.
*
* @param handler Handler to be invoked or {@code null} if exceptions should be dropped.
*/
public static void setExceptionHandler(BiConsumer<AuroraServer, Throwable> handler) {
if (handler == null) {
handler = (server, error) -> { /* ignored */ };
}
errorExceptionHandler = handler;
}

/**
* Return a monitor instance for a given set of servers. This instance will be consistent as
Expand Down Expand Up @@ -263,11 +285,11 @@ protected ClusterChecker(SchedulerService scheduler, long checkIntervalMillis,
masterServer = new AtomicReference<>();

for (AuroraServer server : clusterServers) {
ServerMonitor monitor = new ServerMonitor(driver, server, this);
ServerMonitor monitor = new ServerMonitor(scheduler, driver, server, this);
allServers.put(server, monitor);

if (masterServer.get() == null) { // check in thread till we find the master
monitor.run();
monitor.run(true);
if (monitor.isHealthy()) {
if (monitor.isMasterServer()) {
masterServer.set(server);
Expand Down Expand Up @@ -335,7 +357,7 @@ protected void expediteServerCheck(ServerMonitor serverMonitor) {
if (serversWaitingExpeditiedCheck.addIfAbsent(serverMonitor.server)) {
scheduler.execute(() -> {
try {
serverMonitor.run();
serverMonitor.run(true);
} finally {
serversWaitingExpeditiedCheck.remove(serverMonitor.server);
}
Expand Down Expand Up @@ -402,26 +424,24 @@ protected void run() {
* queried.
*/
protected static class ServerMonitor implements Runnable {
protected final SubmitterScheduler scheduler;
protected final DelegateAuroraDriver driver;
protected final AuroraServer server;
protected final ReschedulingOperation clusterStateChecker;
protected final AtomicBoolean running;
protected Connection serverConnection;
protected volatile Throwable lastError;
protected Connection serverConnection; // guarded by running AtomicBoolean
protected Throwable lastError; // guarded by running AtomicBoolean
protected volatile Throwable stateError;
protected volatile boolean masterServer;

protected ServerMonitor(DelegateAuroraDriver driver, AuroraServer server,
ReschedulingOperation clusterStateChecker) {
protected ServerMonitor(SubmitterScheduler scheduler, DelegateAuroraDriver driver,
AuroraServer server, ReschedulingOperation clusterStateChecker) {
this.scheduler = scheduler;
this.driver = driver;
this.server = server;
this.clusterStateChecker = clusterStateChecker;
this.running = new AtomicBoolean(false);
try {
reconnect();
} catch (SQLException e) {
throw new RuntimeException("Could not connect to monitor cluster member: " +
server + ", error is fatal", e);
}
reconnect();
lastError = null;
masterServer = false;
}
Expand All @@ -431,12 +451,16 @@ public String toString() {
return (masterServer ? "m:" : "r:") + server;
}

protected void reconnect() throws SQLException {
Connection newConnection =
driver.connect(server.hostAndPortString() +
"/?connectTimeout=10000&socketTimeout=10000" +
"&serverTimezone=UTC&useSSL=false",
server.getProperties());
protected void reconnect() {
Connection newConnection;
try {
newConnection = driver.connect(server.hostAndPortString() +
"/?connectTimeout=10000&socketTimeout=10000" +
driver.getStatusConnectURLParams(),
server.getProperties());
} catch (SQLException e) {
newConnection = new ErrorInvalidSqlConnection(null, e);
}
if (serverConnection != null) { // only attempt to replace once we have a new connection without exception
try {
serverConnection.close();
Expand All @@ -448,7 +472,7 @@ protected void reconnect() throws SQLException {
}

public boolean isHealthy() {
return lastError == null;
return stateError == null;
}

public boolean isMasterServer() {
Expand All @@ -457,45 +481,77 @@ public boolean isMasterServer() {

@Override
public void run() {
run(false);
}

public void run(boolean expedited) {
// if already running ignore start
if (running.compareAndSet(false, true)) {
try {
updateState();
updateState(expedited);
} finally {
running.set(false);
}
}
}

protected void updateState() {
protected void updateState(boolean expedited) {
// we can't set our class state directly, as we need to indicate if it has changed
boolean currentlyMasterServer = false;
Throwable currentError = null;
try {
if (! serverConnection.isValid(CONNECTION_VALID_CHECK_TIMEOUT)) {
// try to avoid an error that will mark as unhealthy, if invalid because exception was
// thrown during reconnect the failure will be thrown when the connection is to be used
reconnect();
}
currentlyMasterServer = driver.isMasterServer(server, serverConnection);
} catch (IllegalDriverStateException e) {
// these exceptions are handled different
LOG.severe(e.getMessage());
currentlyMasterServer = false;
} catch (Throwable t) {
currentError = t;
if (! t.equals(lastError)) {
LOG.log(Level.WARNING,
"Setting aurora server " + server + " as unhealthy due to error checking state", t);
}
} finally {
if (currentlyMasterServer != masterServer || ((lastError == null) != (currentError == null))) {
lastError = currentError;
boolean updateClusterState = false;
if (currentlyMasterServer != masterServer) {
masterServer = currentlyMasterServer;
updateClusterState = true;
}
// NO else if, do the above checks in addition
if (currentError == null) {
if (lastError != null) {
LOG.log(Level.INFO, "Aurora server " + server + " is healthy again");
lastError = null;
if (stateError != null) {
stateError = null;
updateClusterState = true;
}
}
} else if (lastError == null && ! expedited) {
LOG.log(Level.WARNING,
"Scheduling aurora server " + server + " recheck after first error", currentError);
lastError = currentError;
// don't update state status yet, wait till next failure, but schedule a recheck soon
// schedule as a lambda so it wont be confused with potential .remove(Runnable) invocations
// TODO - we may not always want to expedite this check, ie if existing connections can work
scheduler.schedule(() -> run(true), checkFrequencyMillis / 2);
} else {
lastError = stateError = currentError;
if (stateError == null) {
LOG.log(Level.WARNING,
"Setting aurora server " + server + " as unhealthy due to error", currentError);
updateClusterState = true;
}
}

if (updateClusterState) {
clusterStateChecker.signalToRun();
}
// reconnect after state has been reflected, don't block till it is for sure known we are unhealthy
if (currentError != null) {
try {
reconnect();
} catch (SQLException e) {
// ignore exceptions here, will retry on the next go around
}
reconnect();
errorExceptionHandler.accept(server, currentError);
}
}
}
Expand Down
Loading

0 comments on commit e1c650f

Please sign in to comment.