diff --git a/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/InfinispanClusterManager.java b/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/InfinispanClusterManager.java index 66e91a6..02772fc 100644 --- a/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/InfinispanClusterManager.java +++ b/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/InfinispanClusterManager.java @@ -20,6 +20,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.future.PromiseInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.shareddata.AsyncMap; @@ -129,10 +130,10 @@ public BasicCacheContainer getCacheContainer() { @Override public void getAsyncMap(String name, Promise> promise) { - vertx.>executeBlocking(prom -> { + vertx.>executeBlocking(() -> { EmbeddedCacheManagerAdmin administration = cacheManager.administration().withFlags(CacheContainerAdmin.AdminFlag.VOLATILE); Cache cache = administration.getOrCreateCache(name, "__vertx.distributed.cache.configuration"); - prom.complete(new InfinispanAsyncMapImpl<>(vertx, cache)); + return new InfinispanAsyncMapImpl<>(vertx, cache); }, false).onComplete(promise); } @@ -143,7 +144,8 @@ public Map getSyncMap(String name) { @Override public void getLockWithTimeout(String name, long timeout, Promise prom) { - vertx.executeBlocking(promise -> { + vertx.executeBlocking(() -> { + PromiseInternal promise = vertx.promise(); if (!lockManager.isDefined(name)) { lockManager.defineLock(name); } @@ -159,16 +161,17 @@ public void getLockWithTimeout(String name, long timeout, Promise prom) { promise.fail(throwable); } }); - }, false).onComplete(prom); + return promise.future(); + }, false).compose(f -> f).onComplete(prom); } @Override public void getCounter(String name, Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { if (!counterManager.isDefined(name)) { counterManager.defineCounter(name, CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).build()); } - prom.complete(new InfinispanCounter(vertx, counterManager.getStrongCounter(name).sync())); + return new InfinispanCounter(vertx, counterManager.getStrongCounter(name).sync()); }, false).onComplete(promise); } @@ -218,57 +221,47 @@ public void getNodeInfo(String nodeId, Promise promise) { @Override public void join(Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { if (active) { - prom.complete(); - return; + return null; } active = true; if (!userProvidedCacheManager) { - try { - FileLookup fileLookup = FileLookupFactory.newInstance(); + FileLookup fileLookup = FileLookupFactory.newInstance(); - URL ispnConfig = fileLookup.lookupFileLocation(ispnConfigPath, getCTCCL()); - if (ispnConfig == null) { - log.warn("Cannot find Infinispan config '" + ispnConfigPath + "', using default"); - ispnConfig = fileLookup.lookupFileLocation(DEFAULT_INFINISPAN_XML, getCTCCL()); - } - ConfigurationBuilderHolder builderHolder = new ParserRegistry().parse(ispnConfig); - // Workaround Launcher in fatjar issue (context classloader may be null) - ClassLoader classLoader = getCTCCL(); - if (classLoader == null) { - classLoader = getClass().getClassLoader(); - } - builderHolder.getGlobalConfigurationBuilder().classLoader(classLoader); - - if (fileLookup.lookupFileLocation(jgroupsConfigPath, getCTCCL()) != null) { - log.warn("Forcing JGroups config to '" + jgroupsConfigPath + "'"); - builderHolder.getGlobalConfigurationBuilder().transport().defaultTransport() - .removeProperty(JGroupsTransport.CHANNEL_CONFIGURATOR) - .addProperty(JGroupsTransport.CONFIGURATION_FILE, jgroupsConfigPath); - } + URL ispnConfig = fileLookup.lookupFileLocation(ispnConfigPath, getCTCCL()); + if (ispnConfig == null) { + log.warn("Cannot find Infinispan config '" + ispnConfigPath + "', using default"); + ispnConfig = fileLookup.lookupFileLocation(DEFAULT_INFINISPAN_XML, getCTCCL()); + } + ConfigurationBuilderHolder builderHolder = new ParserRegistry().parse(ispnConfig); + // Workaround Launcher in fatjar issue (context classloader may be null) + ClassLoader classLoader = getCTCCL(); + if (classLoader == null) { + classLoader = getClass().getClassLoader(); + } + builderHolder.getGlobalConfigurationBuilder().classLoader(classLoader); - cacheManager = new DefaultCacheManager(builderHolder, true); - } catch (IOException e) { - prom.fail(e); - return; + if (fileLookup.lookupFileLocation(jgroupsConfigPath, getCTCCL()) != null) { + log.warn("Forcing JGroups config to '" + jgroupsConfigPath + "'"); + builderHolder.getGlobalConfigurationBuilder().transport().defaultTransport() + .removeProperty(JGroupsTransport.CHANNEL_CONFIGURATOR) + .addProperty(JGroupsTransport.CONFIGURATION_FILE, jgroupsConfigPath); } + + cacheManager = new DefaultCacheManager(builderHolder, true); } viewListener = new ClusterViewListener(); cacheManager.addListener(viewListener); - try { - subsCacheHelper = new SubsCacheHelper(vertx, cacheManager, nodeSelector); + subsCacheHelper = new SubsCacheHelper(vertx, cacheManager, nodeSelector); - nodeInfoCache = cacheManager.getCache("__vertx.nodeInfo").getAdvancedCache(); + nodeInfoCache = cacheManager.getCache("__vertx.nodeInfo").getAdvancedCache(); - lockManager = (EmbeddedClusteredLockManager) EmbeddedClusteredLockManagerFactory.from(cacheManager); - counterManager = EmbeddedCounterManagerFactory.asCounterManager(cacheManager); + lockManager = (EmbeddedClusteredLockManager) EmbeddedClusteredLockManagerFactory.from(cacheManager); + counterManager = EmbeddedCounterManagerFactory.asCounterManager(cacheManager); - prom.complete(); - } catch (Exception e) { - prom.fail(e); - } + return null; }, false).onComplete(promise); } @@ -278,10 +271,9 @@ private ClassLoader getCTCCL() { @Override public void leave(Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { if (!active) { - prom.complete(); - return; + return null; } active = false; subsCacheHelper.close(); @@ -289,7 +281,7 @@ public void leave(Promise promise) { if (!userProvidedCacheManager) { cacheManager.stop(); } - prom.complete(); + return null; }, false).onComplete(promise); } diff --git a/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/CloseableIteratorCollectionStream.java b/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/CloseableIteratorCollectionStream.java index c5e0368..d12d81b 100644 --- a/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/CloseableIteratorCollectionStream.java +++ b/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/CloseableIteratorCollectionStream.java @@ -77,7 +77,7 @@ public synchronized CloseableIteratorCollectionStream handler(Handler h close(); } else { dataHandler = handler; - context.>executeBlocking(fut -> fut.complete(iterableSupplier.get()), false).onComplete(ar -> { + context.>executeBlocking(iterableSupplier::get, false).onComplete(ar -> { synchronized (this) { if (ar.succeeded()) { iterable = ar.result(); @@ -131,7 +131,7 @@ private synchronized void doRead() { } readInProgress = true; if (iterator == null) { - context.>executeBlocking(fut -> fut.complete(iterable.iterator()), false).onComplete(ar -> { + context.>executeBlocking(() -> iterable.iterator(), false).onComplete(ar -> { synchronized (this) { readInProgress = false; if (ar.succeeded()) { @@ -154,12 +154,12 @@ private synchronized void doRead() { context.runOnContext(v -> emitQueued()); return; } - context.>executeBlocking(fut -> { + context.>executeBlocking(() -> { List batch = new ArrayList<>(BATCH_SIZE); for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) { batch.add(iterator.next()); } - fut.complete(batch); + return batch; }, false).onComplete(ar -> { synchronized (this) { if (ar.succeeded()) { @@ -208,7 +208,7 @@ public synchronized CloseableIteratorCollectionStream endHandler(Handler> iteratorRef = new AtomicReference<>(); - context.executeBlocking(fut -> { + context.executeBlocking(() -> { synchronized (this) { iteratorRef.set(iterator); } @@ -216,7 +216,7 @@ private void close() { if (iter != null) { iter.close(); } - fut.complete(); + return null; }, false); } } diff --git a/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/InfinispanAsyncMapImpl.java b/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/InfinispanAsyncMapImpl.java index 60d1ecc..b7a7458 100644 --- a/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/InfinispanAsyncMapImpl.java +++ b/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/InfinispanAsyncMapImpl.java @@ -140,28 +140,28 @@ public Future clear() { @Override public Future size() { - return vertx.executeBlocking(future -> future.complete(cache.size()), false); + return vertx.executeBlocking(cache::size, false); } @Override public Future> keys() { - return vertx.executeBlocking(promise -> { + return vertx.executeBlocking(() -> { Set cacheKeys = cache.keySet().stream().collect(CacheCollectors.serializableCollector(Collectors::toSet)); - promise.complete(cacheKeys.stream().map(DataConverter::fromCachedObject).collect(toSet())); + return cacheKeys.stream().map(DataConverter::fromCachedObject).collect(toSet()); }, false); } @Override public Future> values() { - return vertx.executeBlocking(promise -> { + return vertx.executeBlocking(() -> { List cacheValues = cache.values().stream().collect(CacheCollectors.serializableCollector(Collectors::toList)); - promise.complete(cacheValues.stream().map(DataConverter::fromCachedObject).collect(toList())); + return cacheValues.stream().map(DataConverter::fromCachedObject).collect(toList()); }, false); } @Override public Future> entries() { - return vertx.executeBlocking(promise -> { + return vertx.executeBlocking(() -> { Map cacheEntries = cache.entrySet().stream() .collect(CacheCollectors.serializableCollector(() -> toMap(Entry::getKey, Entry::getValue))); Map result = new HashMap<>(); @@ -170,7 +170,7 @@ public Future> entries() { V v = DataConverter.fromCachedObject(entry.getValue()); result.put(k, v); } - promise.complete(result); + return result; }, false); } diff --git a/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/InfinispanCounter.java b/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/InfinispanCounter.java index d08fb2e..d1ca542 100644 --- a/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/InfinispanCounter.java +++ b/vertx-infinispan/src/main/java/io/vertx/ext/cluster/infinispan/impl/InfinispanCounter.java @@ -40,9 +40,7 @@ public InfinispanCounter(Vertx vertx, SyncStrongCounter strongCounter) { @Override public Future get() { - return vertx.executeBlocking(future -> { - future.complete(strongCounter.getValue()); - }, false); + return vertx.executeBlocking(strongCounter::getValue, false); } @Override @@ -62,22 +60,16 @@ public Future decrementAndGet() { @Override public Future addAndGet(long value) { - return vertx.executeBlocking(future -> { - future.complete(strongCounter.addAndGet(value)); - }, false); + return vertx.executeBlocking(() -> strongCounter.addAndGet(value), false); } @Override public Future getAndAdd(long value) { - return vertx.executeBlocking(future -> { - future.complete(strongCounter.addAndGet(value) - value); - }, false); + return vertx.executeBlocking(() -> strongCounter.addAndGet(value) - value, false); } @Override public Future compareAndSet(long expected, long value) { - return vertx.executeBlocking(future -> { - future.complete(strongCounter.compareAndSet(expected, value)); - }, false); + return vertx.executeBlocking(() -> strongCounter.compareAndSet(expected, value), false); } } diff --git a/vertx-infinispan/src/test/java/io/vertx/ext/cluster/infinispan/ClusterHealthCheckTest.java b/vertx-infinispan/src/test/java/io/vertx/ext/cluster/infinispan/ClusterHealthCheckTest.java index 57320f1..23d2f32 100644 --- a/vertx-infinispan/src/test/java/io/vertx/ext/cluster/infinispan/ClusterHealthCheckTest.java +++ b/vertx-infinispan/src/test/java/io/vertx/ext/cluster/infinispan/ClusterHealthCheckTest.java @@ -20,6 +20,7 @@ import io.vertx.core.*; import io.vertx.core.json.JsonObject; import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.ext.healthchecks.Status; import io.vertx.test.core.VertxTestBase; import org.junit.Rule; import org.junit.Test; @@ -80,7 +81,11 @@ public void testDetailedHealthCheck() { startNodes(2); ClusterHealthCheck healthCheck = ClusterHealthCheck.createProcedure(vertices[1], true); vertices[0].sharedData().getAsyncMap("foo").onComplete(onSuccess(asyncMap -> { - vertices[1].executeBlocking(healthCheck).onComplete(onSuccess(status -> { + vertices[1].executeBlocking(() -> { + Promise promise = Promise.promise(); + healthCheck.handle(promise); + return promise.future().toCompletionStage().toCompletableFuture().get(); + }).onComplete(onSuccess(status -> { JsonObject json = new JsonObject(status.toJson().encode()); // test serialization+deserialization assertTrue(json.getBoolean("ok")); assertEquals(Integer.valueOf(2), json.getJsonObject("data").getJsonObject("clusterHealth").getInteger("numberOfNodes"));