From 16b4e9d02ebf4769dcc4dc051e163edb5c8db2bf Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Tue, 18 Apr 2023 09:37:55 +0200 Subject: [PATCH] Suspend SqlConnectionPool during CRaC snapshotting Signed-off-by: Radim Vansa --- pom.xml | 1 + vertx-sql-client/pom.xml | 5 ++++ .../impl/pool/SqlConnectionPool.java | 28 ++++++++++++++++++- 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3de7f2a8a..960cd34c2 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,7 @@ true 1.17.6 + 0.1.3 diff --git a/vertx-sql-client/pom.xml b/vertx-sql-client/pom.xml index 332e294a8..acb7d0fc5 100644 --- a/vertx-sql-client/pom.xml +++ b/vertx-sql-client/pom.xml @@ -56,6 +56,11 @@ + + org.crac + crac + ${org.crac.version} + io.vertx vertx-core diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index b1335518c..af3d34483 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -32,10 +32,14 @@ import io.vertx.sqlclient.spi.DatabaseMetadata; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.crac.Core; +import org.crac.Resource; + /** * Todo : * @@ -43,7 +47,7 @@ * * @author Julien Viet */ -public class SqlConnectionPool { +public class SqlConnectionPool implements Resource { private final Function> connectionProvider; private final VertxInternal vertx; @@ -101,6 +105,7 @@ public EventLoopContext apply(ContextInternal contextInternal) { } else { pool.contextProvider(ctx -> ctx.owner().createEventLoopContext(ctx.nettyEventLoop(), null, Thread.currentThread().getContextClassLoader())); } + Core.getGlobalContext().register(this); } private final PoolConnector connector = new PoolConnector() { @@ -258,6 +263,27 @@ public Future close() { return promise.future(); } + @Override + public void beforeCheckpoint(org.crac.Context ctx) throws Exception { + CompletableFuture cf = new CompletableFuture<>(); + pool.suspend(result -> { + if (result.failed()) { + cf.completeExceptionally(result.cause()); + } + CompositeFuture.join(result.result().stream().map( + f -> f.compose(pooled -> Future.future(p -> pooled.conn.close(pooled, p))) + ).collect(Collectors.toList())).mapEmpty() + .onSuccess(ignored -> cf.complete(null)) + .onFailure(cf::completeExceptionally); + }); + cf.get(); + } + + @Override + public void afterRestore(org.crac.Context ctx) { + pool.resume(); + } + public class PooledConnection implements Connection, Connection.Holder { private final ConnectionFactory factory;