Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Honor pool connection timeout when executing queries directly in the pool #1338

Open
wants to merge 8 commits into
base: 4.x
Choose a base branch
from
42 changes: 42 additions & 0 deletions vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.Handler;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.Repeat;
Expand Down Expand Up @@ -592,4 +593,45 @@ private void testConnectionClosedInProvider(TestContext ctx, boolean immediately
}));
}));
}

@Test
public void testConnectionTimeoutWhenExecutingDirectly(TestContext ctx) {
PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2).setAlwaysUseTimeout(true));
final Async latch = ctx.async(2);
pool.getConnection(ctx.asyncAssertSuccess(conn -> {
conn
.query("SELECT id, message from immutable")
.execute(ctx.asyncAssertSuccess(rows -> {
ctx.assertEquals(12, rows.size());
latch.countDown();
}));
}));

pool.getConnection(ctx.asyncAssertSuccess(conn -> {
conn
.query("SELECT id, message from immutable")
.execute(ctx.asyncAssertSuccess(rows -> {
ctx.assertEquals(12, rows.size());
latch.countDown();
}));
}));

latch.awaitSuccess();
final long timerId = vertx.setTimer(10000L, id -> {
ctx.fail("Timeout exceeded without completing");
});
//Used both connections
Async async = ctx.async(10);
for (int i = 0; i < 10; i++) {
pool
.query("SELECT id, message from immutable")
.execute(ctx.asyncAssertFailure(t -> {
ctx.assertTrue(t instanceof NoStackTraceThrowable);
ctx.assertEquals("Timeout", t.getMessage());
async.countDown();
}));
}

async.handler(v -> vertx.cancelTimer(timerId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public class PoolOptionsConverter {
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, PoolOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "alwaysUseTimeout":
if (member.getValue() instanceof Boolean) {
obj.setAlwaysUseTimeout((Boolean)member.getValue());
}
break;
case "connectionTimeout":
if (member.getValue() instanceof Number) {
obj.setConnectionTimeout(((Number)member.getValue()).intValue());
Expand Down Expand Up @@ -89,6 +94,7 @@ static void toJson(PoolOptions obj, JsonObject json) {
}

static void toJson(PoolOptions obj, java.util.Map<String, Object> json) {
json.put("alwaysUseTimeout", obj.isAlwaysUseTimeout());
json.put("connectionTimeout", obj.getConnectionTimeout());
if (obj.getConnectionTimeoutUnit() != null) {
json.put("connectionTimeoutUnit", obj.getConnectionTimeoutUnit().name());
Expand Down
28 changes: 28 additions & 0 deletions vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public class PoolOptions {
*/
public static final int DEFAULT_EVENT_LOOP_SIZE = 0;

/**
* Default honor timeout when scheduling commands is false
*/
public static final boolean DEFAULT_ALWAYS_USE_TIMEOUT = false;

private int maxSize = DEFAULT_MAX_SIZE;
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
Expand All @@ -106,6 +111,7 @@ public class PoolOptions {
private boolean shared = DEFAULT_SHARED_POOL;
private String name = DEFAULT_NAME;
private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE;
private boolean alwaysUseTimeout = DEFAULT_ALWAYS_USE_TIMEOUT;

public PoolOptions() {
}
Expand All @@ -122,6 +128,7 @@ public PoolOptions(PoolOptions other) {
shared= other.shared;
name = other.name;
eventLoopSize = other.eventLoopSize;
alwaysUseTimeout = other.alwaysUseTimeout;
}

/**
Expand Down Expand Up @@ -360,6 +367,27 @@ public PoolOptions setEventLoopSize(int eventLoopSize) {
return this;
}

/**
* @return Whether the pool will always use timeout, even when sending commands directly to execute.
*/
public boolean isAlwaysUseTimeout() { return alwaysUseTimeout; }

/**
* Sets whether always honor the pool's timeout.
* <p>
* This basically affects the pool's schedule method, which will submit the command regardless of whether there's
* an available connection or not. This settings allows the caller to have a consistent max wait time across every
* method.
* </p>
* The default is {@code false}.
* @param alwaysUseTimeout Whether to use the configured connection timeout when scheduling commands
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setAlwaysUseTimeout(boolean alwaysUseTimeout) {
this.alwaysUseTimeout = alwaysUseTimeout;
return this;
}

public JsonObject toJson() {
JsonObject json = new JsonObject();
PoolOptionsConverter.toJson(this, json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
private volatile Handler<SqlConnectionPool.PooledConnection> connectionInitializer;
private long timerID;
private volatile Function<Context, Future<SqlConnection>> connectionProvider;
private final boolean alwaysUseTimeout;

public static final String PROPAGATABLE_CONNECTION = "propagatable_connection";

Expand All @@ -65,6 +66,7 @@ public PoolImpl(VertxInternal vertx,
this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit());
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
this.alwaysUseTimeout = poolOptions.isAlwaysUseTimeout();
this.timerID = -1L;
this.pipelined = pipelined;
this.vertx = vertx;
Expand Down Expand Up @@ -169,7 +171,24 @@ public Future<SqlConnection> getConnection() {

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
return pool.execute(context, cmd);
if (!alwaysUseTimeout) {
return pool.execute(context, cmd);
}
PromiseInternal<SqlConnectionPool.PooledConnection> promise = context.promise();
//Acquires the connection honoring the pool's connection timeout
acquire(context, connectionTimeout, promise);
return promise.future().compose(pooled -> {
//We need to 'init' the connection or close will fail.
pooled.init(pooled);
return pooled.schedule(context, cmd)
.eventually(() -> {
Promise<Void> p = Promise.promise();
pooled.close(pooled, p);
return p.future();
}
);
}
);
}

private void acquire(ContextInternal context, long timeout, Handler<AsyncResult<SqlConnectionPool.PooledConnection>> completionHandler) {
Expand Down
Loading