Skip to content

Commit

Permalink
refactor: extract SharedRegistry class
Browse files Browse the repository at this point in the history
  • Loading branch information
halber committed Oct 30, 2024
1 parent 8edcf9c commit 2d79ea4
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 58 deletions.
118 changes: 118 additions & 0 deletions src/main/java/io/neonbee/internal/SharedRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.neonbee.internal;

import static io.vertx.core.Future.succeededFuture;

import io.neonbee.NeonBee;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.SharedData;

/**
* A registry to manage values in the {@link SharedData} shared map.
* <p>
* This class is a generic implementation of a registry that can be used to store values in a shared map.
*
* @param <T>
*/
public class SharedRegistry<T> implements Registry<T> {

private final LoggingFacade logger = LoggingFacade.create();

private final SharedData sharedData;

private final String registryName;

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param registryName the name of the map registry
*/
public SharedRegistry(Vertx vertx, String registryName) {
this(registryName, new SharedDataAccessor(vertx, SharedRegistry.class));
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param registryName the name of the map registry
* @param sharedData the shared data
*/
public SharedRegistry(String registryName, SharedData sharedData) {
this.registryName = registryName;
this.sharedData = sharedData;
}

/**
* Register a value in the async shared map of {@link NeonBee} by key.
*
* @param sharedMapKey the shared map key
* @param value the value to register
* @return the future
*/
@Override
public Future<Void> register(String sharedMapKey, T value) {
logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value);

Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey))
.map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray())
.compose(valueArray -> {
if (!valueArray.contains(value)) {
valueArray.add(value);
}

if (logger.isInfoEnabled()) {
logger.info("Registered verticle {} in shared map.", value);
}

return sharedMap.compose(map -> map.put(sharedMapKey, valueArray));
});
}

/**
* Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey.
*
* @param sharedMapKey the shared map key
* @param value the value to unregister
* @return the future
*/
@Override
public Future<Void> unregister(String sharedMapKey, T value) {
logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);

Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray)
.compose(values -> {
if (values == null) {
return succeededFuture();
}

if (logger.isInfoEnabled()) {
logger.info("Unregistered verticle {} in shared map.", value);
}

values.remove(value);
return sharedMap.compose(map -> map.put(sharedMapKey, values));
});
}

@Override
public Future<JsonArray> get(String sharedMapKey) {
return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o);
}

/**
* Shared map that is used as registry.
*
* @return Future to the shared map
*/
public Future<AsyncMap<String, Object>> getSharedMap() {
return sharedData.getAsyncMap(registryName);
}
}
106 changes: 48 additions & 58 deletions src/main/java/io/neonbee/internal/WriteSafeRegistry.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.neonbee.internal;

import static io.vertx.core.Future.succeededFuture;

import java.util.function.Supplier;

import io.neonbee.NeonBee;
Expand All @@ -10,6 +8,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.SharedData;

/**
* A registry to manage values in the {@link SharedDataAccessor} shared map.
Expand All @@ -22,7 +21,9 @@ public class WriteSafeRegistry<T> implements Registry<T> {

private final LoggingFacade logger = LoggingFacade.create();

private final SharedDataAccessor sharedDataAccessor;
private final SharedData sharedData;

private final Registry<T> sharedRegistry;

private final String registryName;

Expand All @@ -33,8 +34,30 @@ public class WriteSafeRegistry<T> implements Registry<T> {
* @param registryName the name of the map registry
*/
public WriteSafeRegistry(Vertx vertx, String registryName) {
this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class));
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param registryName the name of the map registry
* @param sharedData the shared data
*/
public WriteSafeRegistry(String registryName, SharedData sharedData) {
this(registryName, sharedData, new SharedRegistry<>(registryName, sharedData));
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param registryName the name of the map registry
* @param sharedData the shared data
* @param registry the shared registry
*/
WriteSafeRegistry(String registryName, SharedData sharedData, Registry<T> registry) {
this.registryName = registryName;
this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass());
this.sharedData = sharedData;
this.sharedRegistry = registry;
}

/**
Expand All @@ -48,12 +71,26 @@ public WriteSafeRegistry(Vertx vertx, String registryName) {
public Future<Void> register(String sharedMapKey, T value) {
logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value);

return lock(sharedMapKey, () -> addValue(sharedMapKey, value));
return lock(sharedMapKey, () -> sharedRegistry.register(sharedMapKey, value));
}

@Override
public Future<JsonArray> get(String sharedMapKey) {
return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o);
return sharedRegistry.get(sharedMapKey);
}

/**
* Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey.
*
* @param sharedMapKey the shared map key
* @param value the value to unregister
* @return the future
*/
@Override
public Future<Void> unregister(String sharedMapKey, T value) {
logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);

return lock(sharedMapKey, () -> sharedRegistry.unregister(sharedMapKey, value));
}

/**
Expand All @@ -65,7 +102,7 @@ public Future<JsonArray> get(String sharedMapKey) {
*/
protected Future<Void> lock(String sharedMapKey, Supplier<Future<Void>> futureSupplier) {
logger.debug("Get lock for {}", sharedMapKey);
return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> {
return sharedData.getLock(sharedMapKey).onFailure(throwable -> {
logger.error("Error acquiring lock for {}", sharedMapKey, throwable);
}).compose(lock -> Future.<Void>future(event -> futureSupplier.get().onComplete(event))
.onComplete(anyResult -> {
Expand All @@ -74,62 +111,15 @@ protected Future<Void> lock(String sharedMapKey, Supplier<Future<Void>> futureSu
}));
}

private Future<Void> addValue(String sharedMapKey, Object value) {
Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey))
.map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray())
.compose(valueArray -> {
if (!valueArray.contains(value)) {
valueArray.add(value);
}

if (logger.isInfoEnabled()) {
logger.info("Registered verticle {} in shared map.", value);
}

return sharedMap.compose(map -> map.put(sharedMapKey, valueArray));
});
}

/**
* Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey.
*
* @param sharedMapKey the shared map key
* @param value the value to unregister
* @return the future
*/
@Override
public Future<Void> unregister(String sharedMapKey, T value) {
logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);

return lock(sharedMapKey, () -> removeValue(sharedMapKey, value));
}

private Future<Void> removeValue(String sharedMapKey, Object value) {
Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray)
.compose(values -> {
if (values == null) {
return succeededFuture();
}

if (logger.isInfoEnabled()) {
logger.info("Unregistered verticle {} in shared map.", value);
}

values.remove(value);
return sharedMap.compose(map -> map.put(sharedMapKey, values));
});
}

/**
* Shared map that is used as registry.
* <p>
* It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)}
* and {@link WriteSafeRegistry#unregister(String, Object)} methods.
*
* @return Future to the shared map
*/
public Future<AsyncMap<String, Object>> getSharedMap() {
return sharedDataAccessor.getAsyncMap(registryName);
return sharedData.getAsyncMap(registryName);
}
}

0 comments on commit 2d79ea4

Please sign in to comment.