diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java index 3b03a5293..1a05cbbd0 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java @@ -22,6 +22,7 @@ import io.vertx.core.Handler; import io.vertx.core.VertxOptions; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.Repeat; @@ -592,4 +593,45 @@ private void testConnectionClosedInProvider(TestContext ctx, boolean immediately })); })); } + + @Test + public void testConnectionTimeoutWhenExecutingDirectly(TestContext ctx) { + PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2).setAlwaysUseTimeout(true)); + final Async latch = ctx.async(2); + pool.getConnection(ctx.asyncAssertSuccess(conn -> { + conn + .query("SELECT id, message from immutable") + .execute(ctx.asyncAssertSuccess(rows -> { + ctx.assertEquals(12, rows.size()); + latch.countDown(); + })); + })); + + pool.getConnection(ctx.asyncAssertSuccess(conn -> { + conn + .query("SELECT id, message from immutable") + .execute(ctx.asyncAssertSuccess(rows -> { + ctx.assertEquals(12, rows.size()); + latch.countDown(); + })); + })); + + latch.awaitSuccess(); + final long timerId = vertx.setTimer(10000L, id -> { + ctx.fail("Timeout exceeded without completing"); + }); + //Used both connections + Async async = ctx.async(10); + for (int i = 0; i < 10; i++) { + pool + .query("SELECT id, message from immutable") + .execute(ctx.asyncAssertFailure(t -> { + ctx.assertTrue(t instanceof NoStackTraceThrowable); + ctx.assertEquals("Timeout", t.getMessage()); + async.countDown(); + })); + } + + async.handler(v -> vertx.cancelTimer(timerId)); + } } diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java index 73a68fe91..1cd446d25 100644 --- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java +++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java @@ -20,6 +20,11 @@ public class PoolOptionsConverter { static void fromJson(Iterable> json, PoolOptions obj) { for (java.util.Map.Entry member : json) { switch (member.getKey()) { + case "alwaysUseTimeout": + if (member.getValue() instanceof Boolean) { + obj.setAlwaysUseTimeout((Boolean)member.getValue()); + } + break; case "connectionTimeout": if (member.getValue() instanceof Number) { obj.setConnectionTimeout(((Number)member.getValue()).intValue()); @@ -89,6 +94,7 @@ static void toJson(PoolOptions obj, JsonObject json) { } static void toJson(PoolOptions obj, java.util.Map json) { + json.put("alwaysUseTimeout", obj.isAlwaysUseTimeout()); json.put("connectionTimeout", obj.getConnectionTimeout()); if (obj.getConnectionTimeoutUnit() != null) { json.put("connectionTimeoutUnit", obj.getConnectionTimeoutUnit().name()); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index d309cb67a..ecfd47677 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -94,6 +94,11 @@ public class PoolOptions { */ public static final int DEFAULT_EVENT_LOOP_SIZE = 0; + /** + * Default honor timeout when scheduling commands is false + */ + public static final boolean DEFAULT_ALWAYS_USE_TIMEOUT = false; + private int maxSize = DEFAULT_MAX_SIZE; private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; @@ -106,6 +111,7 @@ public class PoolOptions { private boolean shared = DEFAULT_SHARED_POOL; private String name = DEFAULT_NAME; private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE; + private boolean alwaysUseTimeout = DEFAULT_ALWAYS_USE_TIMEOUT; public PoolOptions() { } @@ -122,6 +128,7 @@ public PoolOptions(PoolOptions other) { shared= other.shared; name = other.name; eventLoopSize = other.eventLoopSize; + alwaysUseTimeout = other.alwaysUseTimeout; } /** @@ -360,6 +367,27 @@ public PoolOptions setEventLoopSize(int eventLoopSize) { return this; } + /** + * @return Whether the pool will always use timeout, even when sending commands directly to execute. + */ + public boolean isAlwaysUseTimeout() { return alwaysUseTimeout; } + + /** + * Sets whether always honor the pool's timeout. + *

+ * This basically affects the pool's schedule method, which will submit the command regardless of whether there's + * an available connection or not. This settings allows the caller to have a consistent max wait time across every + * method. + *

+ * The default is {@code false}. + * @param alwaysUseTimeout Whether to use the configured connection timeout when scheduling commands + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setAlwaysUseTimeout(boolean alwaysUseTimeout) { + this.alwaysUseTimeout = alwaysUseTimeout; + return this; + } + public JsonObject toJson() { JsonObject json = new JsonObject(); PoolOptionsConverter.toJson(this, json); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index 724913297..5c534143d 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable { private volatile Handler connectionInitializer; private long timerID; private volatile Function> connectionProvider; + private final boolean alwaysUseTimeout; public static final String PROPAGATABLE_CONNECTION = "propagatable_connection"; @@ -65,6 +66,7 @@ public PoolImpl(VertxInternal vertx, this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit()); this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit()); this.cleanerPeriod = poolOptions.getPoolCleanerPeriod(); + this.alwaysUseTimeout = poolOptions.isAlwaysUseTimeout(); this.timerID = -1L; this.pipelined = pipelined; this.vertx = vertx; @@ -169,7 +171,24 @@ public Future getConnection() { @Override public Future schedule(ContextInternal context, CommandBase cmd) { - return pool.execute(context, cmd); + if (!alwaysUseTimeout) { + return pool.execute(context, cmd); + } + PromiseInternal promise = context.promise(); + //Acquires the connection honoring the pool's connection timeout + acquire(context, connectionTimeout, promise); + return promise.future().compose(pooled -> { + //We need to 'init' the connection or close will fail. + pooled.init(pooled); + return pooled.schedule(context, cmd) + .eventually(() -> { + Promise p = Promise.promise(); + pooled.close(pooled, p); + return p.future(); + } + ); + } + ); } private void acquire(ContextInternal context, long timeout, Handler> completionHandler) {