From 8edcf9c799b5725b274b7f091a32595b5c4be51a Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Mon, 28 Oct 2024 13:22:35 +0100 Subject: [PATCH] fix: improve WriteSafeRegistry lock method If the supplier of the lock method fails with an exception, the lock is not released. --- .../neonbee/internal/WriteSafeRegistry.java | 11 ++- .../internal/WriteSafeRegistryTest.java | 95 +++++++++++++++++++ 2 files changed, 101 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java index dcd3a3fb..e8cf2f80 100644 --- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -63,14 +63,15 @@ public Future get(String sharedMapKey) { * @param futureSupplier supplier for the future to be secured by the lock * @return the futureSupplier */ - private Future lock(String sharedMapKey, Supplier> futureSupplier) { + protected Future lock(String sharedMapKey, Supplier> futureSupplier) { logger.debug("Get lock for {}", sharedMapKey); return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> { logger.error("Error acquiring lock for {}", sharedMapKey, throwable); - }).compose(lock -> futureSupplier.get().onComplete(anyResult -> { - logger.debug("Releasing lock for {}", sharedMapKey); - lock.release(); - })); + }).compose(lock -> Future.future(event -> futureSupplier.get().onComplete(event)) + .onComplete(anyResult -> { + logger.debug("Releasing lock for {}", sharedMapKey); + lock.release(); + })); } private Future addValue(String sharedMapKey, Object value) { diff --git a/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java b/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java index c232e95a..25ddeafe 100644 --- a/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java +++ b/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java @@ -2,12 +2,18 @@ import static com.google.common.truth.Truth.assertThat; +import java.util.concurrent.TimeUnit; + import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import io.vertx.core.Future; import io.vertx.core.Vertx; +import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.core.json.JsonArray; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.Timeout; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; @@ -58,4 +64,93 @@ void get(Vertx vertx, VertxTestContext context) { context.completeNow(); })).onFailure(context::failNow); } + + @Test + @DisplayName("test lock method") + void lock(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(4); + String lockedname = "test-lock-key1"; + + // execute the lockTest twice to make sure that you can acquire the lock multiple times + lockTest(lockedname, context, registry, checkpoints) + .compose(unused -> lockTest(lockedname, context, registry, checkpoints)); + } + + @Test + // The used timeout for the lock is set to 10 seconds + // @see io.vertx.core.shareddata.impl.SharedDataImpl#DEFAULT_LOCK_TIMEOUT + @Timeout(value = 12, timeUnit = TimeUnit.SECONDS) + @DisplayName("test acquire lock twice") + void acquireLockTwice(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(5); + + String lockedname = "test-lock-key2"; + registry.lock(lockedname, () -> { + checkpoints.flag(); + // try to acquire the lock to make sure that it is locked + return registry.lock(lockedname, () -> { + context.failNow("should not be called because lock cannot be acquired"); + return Future.succeededFuture(); + }) + .onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(NoStackTraceThrowable.class); + assertThat(cause).hasMessageThat().isEqualTo("Timed out waiting to get lock"); + checkpoints.flag(); + })) + .recover(throwable -> Future.succeededFuture()); + }) + .onSuccess(unused -> checkpoints.flag()) + .onFailure(context::failNow) + // execute the lockTest to make sure that you can acquire the lock again + .compose(unused -> lockTest(lockedname, context, registry, checkpoints)); + } + + private static Future lockTest(String lockName, VertxTestContext context, WriteSafeRegistry registry, + Checkpoint checkpoints) { + return registry.lock(lockName, () -> { + checkpoints.flag(); + return Future.succeededFuture(); + }) + .onSuccess(unused -> checkpoints.flag()) + .onFailure(context::failNow); + } + + @Test + @DisplayName("test lock supplier retuning null") + void lockNPE(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(2); + + registry.lock("lockNPE", () -> { + checkpoints.flag(); + return null; + }).onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(NullPointerException.class); + checkpoints.flag(); + })); + } + + @Test + @DisplayName("test lock supplier throws exception") + void lockISE(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(2); + + registry.lock("lockISE", () -> { + checkpoints.flag(); + throw new IllegalStateException("Illegal state"); + }).onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(IllegalStateException.class); + checkpoints.flag(); + })); + } }