From 7021dce4500c5fc353713c241a0d4769890b9729 Mon Sep 17 00:00:00 2001 From: Kevin Wooten Date: Thu, 30 Mar 2023 14:06:29 -0700 Subject: [PATCH] Add minSize to pool to maintain a minimum # of connections --- .../vertx/sqlclient/PoolOptionsConverter.java | 6 ++++ .../java/io/vertx/sqlclient/PoolOptions.java | 31 +++++++++++++++++++ .../io/vertx/sqlclient/impl/PoolImpl.java | 13 +++++--- .../impl/pool/SqlConnectionPool.java | 25 ++++++++++++++- 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java index 88cda6b73..baf0aa54a 100644 --- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java +++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java @@ -65,6 +65,11 @@ public static void fromJson(Iterable> json, obj.setMaxWaitQueueSize(((Number)member.getValue()).intValue()); } break; + case "minSize": + if (member.getValue() instanceof Number) { + obj.setMinSize(((Number)member.getValue()).intValue()); + } + break; case "name": if (member.getValue() instanceof String) { obj.setName((String)member.getValue()); @@ -104,6 +109,7 @@ public static void toJson(PoolOptions obj, java.util.Map json) { } json.put("maxSize", obj.getMaxSize()); json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize()); + json.put("minSize", obj.getMinSize()); if (obj.getName() != null) { json.put("name", obj.getName()); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index 0239dae8d..a26bd3438 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -32,6 +32,11 @@ @DataObject(generateConverter = true) public class PoolOptions { + /** + * The default minimum number of connections a client will keep open in the pool = 0 + */ + public static final int DEFAULT_MIN_SIZE = 0; + /** * The default maximum number of connections a client will pool = 4 */ @@ -92,6 +97,7 @@ public class PoolOptions { */ public static final int DEFAULT_EVENT_LOOP_SIZE = 0; + private int minSize = DEFAULT_MIN_SIZE; private int maxSize = DEFAULT_MAX_SIZE; private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; @@ -113,6 +119,7 @@ public PoolOptions(JsonObject json) { } public PoolOptions(PoolOptions other) { + minSize = other.minSize; maxSize = other.maxSize; maxWaitQueueSize = other.maxWaitQueueSize; idleTimeout = other.idleTimeout; @@ -122,6 +129,30 @@ public PoolOptions(PoolOptions other) { eventLoopSize = other.eventLoopSize; } + /** + * @return the minimum pool size + */ + public int getMinSize() { + return minSize; + } + + /** + * Set the minimum pool size + * + * @param minSize the minimum pool size + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setMinSize(int minSize) { + if (minSize < 0) { + throw new IllegalArgumentException("Min size cannot be negative"); + } + if (minSize > maxSize) { + throw new IllegalArgumentException("Min size cannot be greater than max size"); + } + this.minSize = minSize; + return this; + } + /** * @return the maximum pool size */ diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index 7b28da062..93f4d125b 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -68,7 +68,9 @@ public PoolImpl(VertxInternal vertx, this.timerID = -1L; this.pipelined = pipelined; this.vertx = vertx; - this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); + this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, + afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMinSize(), poolOptions.getMaxSize(), + pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); this.closeFuture = closeFuture; } @@ -77,10 +79,11 @@ public Pool init() { if ((idleTimeout > 0 || maxLifetime > 0) && cleanerPeriod > 0) { synchronized (this) { timerID = vertx.setTimer(cleanerPeriod, id -> { - runEviction(); + runInvariantsCheck(); }); } } + pool.checkMin(connectionTimeout); return this; } @@ -92,17 +95,17 @@ public Pool connectionProvider(Function> connecti return this; } - private void runEviction() { + private void runInvariantsCheck() { synchronized (this) { if (timerID == -1) { // Cancelled return; } timerID = vertx.setTimer(cleanerPeriod, id -> { - runEviction(); + runInvariantsCheck(); }); } - pool.evict(); + pool.checkInvariants(connectionTimeout); } @Override diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index f8f3d8690..45eebdb55 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -54,6 +54,7 @@ public class SqlConnectionPool { private final boolean pipelined; private final long idleTimeout; private final long maxLifetime; + private final int minSize; private final int maxSize; public SqlConnectionPool(Function> connectionProvider, @@ -63,10 +64,17 @@ public SqlConnectionPool(Function> connectionProv VertxInternal vertx, long idleTimeout, long maxLifetime, + int minSize, int maxSize, boolean pipelined, int maxWaitQueueSize, int eventLoopSize) { + if (minSize < 0) { + throw new IllegalArgumentException("Pool min size must be > 0"); + } + if (minSize > maxSize) { + throw new IllegalArgumentException("Pool min size must be <= max size"); + } if (maxSize < 1) { throw new IllegalArgumentException("Pool max size must be > 0"); } @@ -78,6 +86,7 @@ public SqlConnectionPool(Function> connectionProv this.pipelined = pipelined; this.idleTimeout = idleTimeout; this.maxLifetime = maxLifetime; + this.minSize = minSize; this.maxSize = maxSize; this.hook = hook; this.connectionProvider = connectionProvider; @@ -145,7 +154,7 @@ public int size() { return pool.size(); } - public void evict() { + public void checkInvariants(long connectionTimeout) { long now = System.currentTimeMillis(); pool.evict(conn -> conn.shouldEvict(now), ar -> { if (ar.succeeded()) { @@ -153,10 +162,24 @@ public void evict() { for (PooledConnection conn : res) { conn.close(Promise.promise()); } + checkMin(connectionTimeout); } }); } + public void checkMin(long connectionTimeout) { + if (pool.size() < minSize) { + ContextInternal context = vertx.getOrCreateContext(); + for (int i = 0; i < minSize; ++i) { + acquire(context, connectionTimeout, (ar) -> { + if (ar.succeeded()) { + ar.result().cleanup(Promise.promise()); + } + }); + } + } + } + public Future execute(ContextInternal context, CommandBase cmd) { Promise> p = context.promise(); pool.acquire(context, 0, p);