From 58dc3cd7fa31bc627cd84f9524cbcfb24a53ff74 Mon Sep 17 00:00:00 2001 From: Pablo Saavedra Date: Mon, 6 May 2024 09:16:55 -0300 Subject: [PATCH] Addressed PR comments: Added an option to enable the new behavior, while still ignoring timeout for legacy clients --- PgTimeoutTester.java | 84 +++++++++++++++++++ .../java/io/vertx/pgclient/PgPoolTest.java | 2 +- .../java/io/vertx/sqlclient/PoolOptions.java | 28 +++++++ .../io/vertx/sqlclient/impl/PoolImpl.java | 7 +- 4 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 PgTimeoutTester.java diff --git a/PgTimeoutTester.java b/PgTimeoutTester.java new file mode 100644 index 000000000..8340d96e6 --- /dev/null +++ b/PgTimeoutTester.java @@ -0,0 +1,84 @@ +package io.vertx.sqlclient.templates.impl; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgPool; +import io.vertx.sqlclient.PoolOptions; + +import java.util.ArrayList; +import java.util.List; + +public class PgTimeoutTester { + public static void main(String[] args) { + Vertx vertx = Vertx.vertx(); + + PgConnectOptions dbConfig = new PgConnectOptions() + .setPort(5432) + .setConnectTimeout(2000) + .setHost("localhost") + .setDatabase("postgres") + .setUser("postgres") + .setPassword("postgres"); + + PoolOptions poolConfig = new PoolOptions() + .setMaxSize(1) // One connection in Pool + .setConnectionTimeout(2); // 2 seconds + + PgPool pool = PgPool.pool(vertx, dbConfig, poolConfig); + + //connectionTimeOut(pool, vertx); + poolTimeOut(pool, vertx); + } + + private static void connectionTimeOut(PgPool pool, Vertx vertx) { + //First query + pool.getConnection() + .onFailure(err -> { + err.printStackTrace(); + vertx.close(); + }) + .compose(conn0 -> + conn0.query("SELECT 1").execute() + .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) + /*.eventually(ign -> conn0.close())*/); // Don't close connection to trigger timeout while getting one below + + //Second query + pool.getConnection() + .onFailure(err -> { + err.printStackTrace(); + vertx.close(); + }) + .compose(conn0 -> + conn0.query("SELECT 2").execute() + .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) + .eventually(ign -> conn0.close())); + } + + private static void poolTimeOut(PgPool pool, Vertx vertx) { + //First query + pool.getConnection() + .onFailure(err -> { + err.printStackTrace(); + vertx.close(); + }) + .compose(conn0 -> + conn0.query("SELECT 1").execute() + .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) + .eventually(ign -> conn0.close()));// Don't close connection to trigger timeout while getting one below + + List> futures = new ArrayList<>(); + //N queries + for (int i = 2; i < 10; i++) { + Future f = pool.query("SELECT " + i).execute() + .onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0))) + .onFailure(err -> { + err.printStackTrace(); + vertx.close(); + }); + futures.add(f); + } + + Future.all(futures).onComplete(c -> vertx.close()); + } +} diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java index ea508d13a..1a05cbbd0 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java @@ -596,7 +596,7 @@ private void testConnectionClosedInProvider(TestContext ctx, boolean immediately @Test public void testConnectionTimeoutWhenExecutingDirectly(TestContext ctx) { - PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2)); + PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2).setAlwaysUseTimeout(true)); final Async latch = ctx.async(2); pool.getConnection(ctx.asyncAssertSuccess(conn -> { conn diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index d309cb67a..ecfd47677 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -94,6 +94,11 @@ public class PoolOptions { */ public static final int DEFAULT_EVENT_LOOP_SIZE = 0; + /** + * Default honor timeout when scheduling commands is false + */ + public static final boolean DEFAULT_ALWAYS_USE_TIMEOUT = false; + private int maxSize = DEFAULT_MAX_SIZE; private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; @@ -106,6 +111,7 @@ public class PoolOptions { private boolean shared = DEFAULT_SHARED_POOL; private String name = DEFAULT_NAME; private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE; + private boolean alwaysUseTimeout = DEFAULT_ALWAYS_USE_TIMEOUT; public PoolOptions() { } @@ -122,6 +128,7 @@ public PoolOptions(PoolOptions other) { shared= other.shared; name = other.name; eventLoopSize = other.eventLoopSize; + alwaysUseTimeout = other.alwaysUseTimeout; } /** @@ -360,6 +367,27 @@ public PoolOptions setEventLoopSize(int eventLoopSize) { return this; } + /** + * @return Whether the pool will always use timeout, even when sending commands directly to execute. + */ + public boolean isAlwaysUseTimeout() { return alwaysUseTimeout; } + + /** + * Sets whether always honor the pool's timeout. + *

+ * This basically affects the pool's schedule method, which will submit the command regardless of whether there's + * an available connection or not. This settings allows the caller to have a consistent max wait time across every + * method. + *

+ * The default is {@code false}. + * @param alwaysUseTimeout Whether to use the configured connection timeout when scheduling commands + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setAlwaysUseTimeout(boolean alwaysUseTimeout) { + this.alwaysUseTimeout = alwaysUseTimeout; + return this; + } + public JsonObject toJson() { JsonObject json = new JsonObject(); PoolOptionsConverter.toJson(this, json); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index a848e617d..ee02baf01 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable { private volatile Handler connectionInitializer; private long timerID; private volatile Function> connectionProvider; + private final boolean alwaysUseTimeout; public static final String PROPAGATABLE_CONNECTION = "propagatable_connection"; @@ -65,6 +66,7 @@ public PoolImpl(VertxInternal vertx, this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit()); this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit()); this.cleanerPeriod = poolOptions.getPoolCleanerPeriod(); + this.alwaysUseTimeout = poolOptions.isAlwaysUseTimeout(); this.timerID = -1L; this.pipelined = pipelined; this.vertx = vertx; @@ -169,6 +171,9 @@ public Future getConnection() { @Override public Future schedule(ContextInternal context, CommandBase cmd) { + if (alwaysUseTimeout) { + return pool.execute(context, cmd); + } PromiseInternal promise = context.promise(); //Acquires the connection honoring the pool's connection timeout acquire(context, connectionTimeout, promise); @@ -176,7 +181,7 @@ public Future schedule(ContextInternal context, CommandBase cmd) { //We need to 'init' the connection or close will fail. pooled.init(pooled); return pooled.schedule(context, cmd) - .eventually(v -> { + .eventually(() -> { Promise p = Promise.promise(); pooled.close(pooled, p); return p.future();