diff --git a/src/main/java/io/neonbee/internal/SharedRegistry.java b/src/main/java/io/neonbee/internal/SharedRegistry.java new file mode 100644 index 00000000..d6499fb9 --- /dev/null +++ b/src/main/java/io/neonbee/internal/SharedRegistry.java @@ -0,0 +1,118 @@ +package io.neonbee.internal; + +import static io.vertx.core.Future.succeededFuture; + +import io.neonbee.NeonBee; +import io.neonbee.logging.LoggingFacade; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; + +/** + * A registry to manage values in the {@link SharedData} shared map. + *

+ * This class is a generic implementation of a registry that can be used to store values in a shared map. + * + * @param + */ +public class SharedRegistry implements Registry { + + private final LoggingFacade logger = LoggingFacade.create(); + + private final SharedData sharedData; + + private final String registryName; + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + public SharedRegistry(Vertx vertx, String registryName) { + this(registryName, new SharedDataAccessor(vertx, SharedRegistry.class)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + */ + public SharedRegistry(String registryName, SharedData sharedData) { + this.registryName = registryName; + this.sharedData = sharedData; + } + + /** + * Register a value in the async shared map of {@link NeonBee} by key. + * + * @param sharedMapKey the shared map key + * @param value the value to register + * @return the future + */ + @Override + public Future register(String sharedMapKey, T value) { + logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value); + + Future> sharedMap = getSharedMap(); + + return sharedMap.compose(map -> map.get(sharedMapKey)) + .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray()) + .compose(valueArray -> { + if (!valueArray.contains(value)) { + valueArray.add(value); + } + + if (logger.isInfoEnabled()) { + logger.info("Registered verticle {} in shared map.", value); + } + + return sharedMap.compose(map -> map.put(sharedMapKey, valueArray)); + }); + } + + /** + * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. + * + * @param sharedMapKey the shared map key + * @param value the value to unregister + * @return the future + */ + @Override + public Future unregister(String sharedMapKey, T value) { + logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); + + Future> sharedMap = getSharedMap(); + + return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) + .compose(values -> { + if (values == null) { + return succeededFuture(); + } + + if (logger.isInfoEnabled()) { + logger.info("Unregistered verticle {} in shared map.", value); + } + + values.remove(value); + return sharedMap.compose(map -> map.put(sharedMapKey, values)); + }); + } + + @Override + public Future get(String sharedMapKey) { + return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o); + } + + /** + * Shared map that is used as registry. + * + * @return Future to the shared map + */ + public Future> getSharedMap() { + return sharedData.getAsyncMap(registryName); + } +} diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java index e8cf2f80..c9b97964 100644 --- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -1,7 +1,5 @@ package io.neonbee.internal; -import static io.vertx.core.Future.succeededFuture; - import java.util.function.Supplier; import io.neonbee.NeonBee; @@ -10,6 +8,7 @@ import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; /** * A registry to manage values in the {@link SharedDataAccessor} shared map. @@ -22,7 +21,9 @@ public class WriteSafeRegistry implements Registry { private final LoggingFacade logger = LoggingFacade.create(); - private final SharedDataAccessor sharedDataAccessor; + private final SharedData sharedData; + + private final Registry sharedRegistry; private final String registryName; @@ -33,8 +34,30 @@ public class WriteSafeRegistry implements Registry { * @param registryName the name of the map registry */ public WriteSafeRegistry(Vertx vertx, String registryName) { + this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + */ + public WriteSafeRegistry(String registryName, SharedData sharedData) { + this(registryName, sharedData, new SharedRegistry<>(registryName, sharedData)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + * @param registry the shared registry + */ + WriteSafeRegistry(String registryName, SharedData sharedData, Registry registry) { this.registryName = registryName; - this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass()); + this.sharedData = sharedData; + this.sharedRegistry = registry; } /** @@ -48,12 +71,26 @@ public WriteSafeRegistry(Vertx vertx, String registryName) { public Future register(String sharedMapKey, T value) { logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value); - return lock(sharedMapKey, () -> addValue(sharedMapKey, value)); + return lock(sharedMapKey, () -> sharedRegistry.register(sharedMapKey, value)); } @Override public Future get(String sharedMapKey) { - return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o); + return sharedRegistry.get(sharedMapKey); + } + + /** + * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. + * + * @param sharedMapKey the shared map key + * @param value the value to unregister + * @return the future + */ + @Override + public Future unregister(String sharedMapKey, T value) { + logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); + + return lock(sharedMapKey, () -> sharedRegistry.unregister(sharedMapKey, value)); } /** @@ -65,7 +102,7 @@ public Future get(String sharedMapKey) { */ protected Future lock(String sharedMapKey, Supplier> futureSupplier) { logger.debug("Get lock for {}", sharedMapKey); - return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> { + return sharedData.getLock(sharedMapKey).onFailure(throwable -> { logger.error("Error acquiring lock for {}", sharedMapKey, throwable); }).compose(lock -> Future.future(event -> futureSupplier.get().onComplete(event)) .onComplete(anyResult -> { @@ -74,62 +111,15 @@ protected Future lock(String sharedMapKey, Supplier> futureSu })); } - private Future addValue(String sharedMapKey, Object value) { - Future> sharedMap = getSharedMap(); - - return sharedMap.compose(map -> map.get(sharedMapKey)) - .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray()) - .compose(valueArray -> { - if (!valueArray.contains(value)) { - valueArray.add(value); - } - - if (logger.isInfoEnabled()) { - logger.info("Registered verticle {} in shared map.", value); - } - - return sharedMap.compose(map -> map.put(sharedMapKey, valueArray)); - }); - } - - /** - * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. - * - * @param sharedMapKey the shared map key - * @param value the value to unregister - * @return the future - */ - @Override - public Future unregister(String sharedMapKey, T value) { - logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); - - return lock(sharedMapKey, () -> removeValue(sharedMapKey, value)); - } - - private Future removeValue(String sharedMapKey, Object value) { - Future> sharedMap = getSharedMap(); - - return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) - .compose(values -> { - if (values == null) { - return succeededFuture(); - } - - if (logger.isInfoEnabled()) { - logger.info("Unregistered verticle {} in shared map.", value); - } - - values.remove(value); - return sharedMap.compose(map -> map.put(sharedMapKey, values)); - }); - } - /** * Shared map that is used as registry. + *

+ * It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)} + * and {@link WriteSafeRegistry#unregister(String, Object)} methods. * * @return Future to the shared map */ public Future> getSharedMap() { - return sharedDataAccessor.getAsyncMap(registryName); + return sharedData.getAsyncMap(registryName); } }