Skip to content

Commit

Permalink
fix: improve WriteSafeRegistry lock method
Browse files Browse the repository at this point in the history
If the supplier of the lock method fails with an exception, the lock is not released.
  • Loading branch information
halber committed Oct 29, 2024
1 parent 2a54852 commit 8edcf9c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/main/java/io/neonbee/internal/WriteSafeRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ public Future<JsonArray> get(String sharedMapKey) {
* @param futureSupplier supplier for the future to be secured by the lock
* @return the futureSupplier
*/
private Future<Void> lock(String sharedMapKey, Supplier<Future<Void>> futureSupplier) {
protected Future<Void> lock(String sharedMapKey, Supplier<Future<Void>> futureSupplier) {
logger.debug("Get lock for {}", sharedMapKey);
return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> {
logger.error("Error acquiring lock for {}", sharedMapKey, throwable);
}).compose(lock -> futureSupplier.get().onComplete(anyResult -> {
logger.debug("Releasing lock for {}", sharedMapKey);
lock.release();
}));
}).compose(lock -> Future.<Void>future(event -> futureSupplier.get().onComplete(event))
.onComplete(anyResult -> {
logger.debug("Releasing lock for {}", sharedMapKey);
lock.release();
}));
}

private Future<Void> addValue(String sharedMapKey, Object value) {
Expand Down
95 changes: 95 additions & 0 deletions src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

import static com.google.common.truth.Truth.assertThat;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.json.JsonArray;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.Timeout;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;

Expand Down Expand Up @@ -58,4 +64,93 @@ void get(Vertx vertx, VertxTestContext context) {
context.completeNow();
})).onFailure(context::failNow);
}

@Test
@DisplayName("test lock method")
void lock(Vertx vertx, VertxTestContext context) {
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);

Checkpoint checkpoints = context.checkpoint(4);
String lockedname = "test-lock-key1";

// execute the lockTest twice to make sure that you can acquire the lock multiple times
lockTest(lockedname, context, registry, checkpoints)
.compose(unused -> lockTest(lockedname, context, registry, checkpoints));
}

@Test
// The used timeout for the lock is set to 10 seconds
// @see io.vertx.core.shareddata.impl.SharedDataImpl#DEFAULT_LOCK_TIMEOUT
@Timeout(value = 12, timeUnit = TimeUnit.SECONDS)
@DisplayName("test acquire lock twice")
void acquireLockTwice(Vertx vertx, VertxTestContext context) {
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);

Checkpoint checkpoints = context.checkpoint(5);

String lockedname = "test-lock-key2";
registry.lock(lockedname, () -> {
checkpoints.flag();
// try to acquire the lock to make sure that it is locked
return registry.lock(lockedname, () -> {
context.failNow("should not be called because lock cannot be acquired");
return Future.succeededFuture();
})
.onSuccess(unused -> context.failNow("should not be successful"))
.onFailure(cause -> context.verify(() -> {
assertThat(cause).isInstanceOf(NoStackTraceThrowable.class);
assertThat(cause).hasMessageThat().isEqualTo("Timed out waiting to get lock");
checkpoints.flag();
}))
.recover(throwable -> Future.succeededFuture());
})
.onSuccess(unused -> checkpoints.flag())
.onFailure(context::failNow)
// execute the lockTest to make sure that you can acquire the lock again
.compose(unused -> lockTest(lockedname, context, registry, checkpoints));
}

private static Future<Void> lockTest(String lockName, VertxTestContext context, WriteSafeRegistry<String> registry,
Checkpoint checkpoints) {
return registry.lock(lockName, () -> {
checkpoints.flag();
return Future.succeededFuture();
})
.onSuccess(unused -> checkpoints.flag())
.onFailure(context::failNow);
}

@Test
@DisplayName("test lock supplier retuning null")
void lockNPE(Vertx vertx, VertxTestContext context) {
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);

Checkpoint checkpoints = context.checkpoint(2);

registry.lock("lockNPE", () -> {
checkpoints.flag();
return null;
}).onSuccess(unused -> context.failNow("should not be successful"))
.onFailure(cause -> context.verify(() -> {
assertThat(cause).isInstanceOf(NullPointerException.class);
checkpoints.flag();
}));
}

@Test
@DisplayName("test lock supplier throws exception")
void lockISE(Vertx vertx, VertxTestContext context) {
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);

Checkpoint checkpoints = context.checkpoint(2);

registry.lock("lockISE", () -> {
checkpoints.flag();
throw new IllegalStateException("Illegal state");
}).onSuccess(unused -> context.failNow("should not be successful"))
.onFailure(cause -> context.verify(() -> {
assertThat(cause).isInstanceOf(IllegalStateException.class);
checkpoints.flag();
}));
}
}

0 comments on commit 8edcf9c

Please sign in to comment.