Skip to content

Commit

Permalink
Update to executeBlocking API change
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jul 19, 2023
1 parent 4d7578b commit 3004c31
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 72 deletions.
70 changes: 31 additions & 39 deletions src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void nodeListener(NodeListener nodeListener) {

@Override
public <K, V> void getAsyncMap(String name, Promise<AsyncMap<K, V>> promise) {
vertx.<AsyncMap<K, V>>executeBlocking(prom -> prom.complete(new AsyncMapImpl<>(getCache(name), vertx))).onComplete(promise);
vertx.<AsyncMap<K, V>>executeBlocking(() -> new AsyncMapImpl<>(getCache(name), vertx)).onComplete(promise);
}

@Override
Expand All @@ -207,7 +207,7 @@ public <K, V> Map<K, V> getSyncMap(String name) {

@Override
public void getLockWithTimeout(String name, long timeout, Promise<Lock> promise) {
vertx.<Lock>executeBlocking(prom -> {
vertx.<Lock>executeBlocking(() -> {
IgniteSemaphore semaphore = ignite.semaphore(LOCK_SEMAPHORE_PREFIX + name, 1, true, true);
boolean locked;
long remaining = timeout;
Expand All @@ -217,16 +217,16 @@ public void getLockWithTimeout(String name, long timeout, Promise<Lock> promise)
remaining = remaining - TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS);
} while (!locked && remaining > 0);
if (locked) {
prom.complete(new LockImpl(semaphore, lockReleaseExec));
return new LockImpl(semaphore, lockReleaseExec);
} else {
throw new VertxException("Timed out waiting to get lock " + name);
throw new VertxException("Timed out waiting to get lock " + name, true);
}
}, false).onComplete(promise);
}

@Override
public void getCounter(String name, Promise<Counter> promise) {
vertx.<Counter>executeBlocking(prom -> prom.complete(new CounterImpl(ignite.atomicLong(name, 0, true)))).onComplete(promise);
vertx.<Counter>executeBlocking(() -> new CounterImpl(ignite.atomicLong(name, 0, true))).onComplete(promise);
}

@Override
Expand All @@ -240,9 +240,9 @@ public void setNodeInfo(NodeInfo nodeInfo, Promise<Void> promise) {
this.nodeInfo = nodeInfo;
}
IgniteNodeInfo value = new IgniteNodeInfo(nodeInfo);
vertx.<Void>executeBlocking(prom -> {
vertx.<Void>executeBlocking(() -> {
nodeInfoMap.put(nodeId, value);
prom.complete();
return null;
}, false).onComplete(promise);
}

Expand Down Expand Up @@ -280,7 +280,7 @@ public List<String> getNodes() {

@Override
public void join(Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
vertx.<Void>executeBlocking(() -> {
synchronized (monitor) {
if (!active) {
active = true;
Expand All @@ -303,34 +303,31 @@ public void join(Promise<Void> promise) {
return false;
}

vertx.<Void>executeBlocking(f -> {
vertx.<Void>executeBlocking(() -> {
String id = nodeId(((DiscoveryEvent) event).eventNode());
switch (event.type()) {
case EVT_NODE_JOINED:
notifyNodeListener(listener -> listener.nodeAdded(id));
log.debug("node " + id + " joined the cluster");
f.complete();
break;
return null;
case EVT_NODE_LEFT:
case EVT_NODE_FAILED:
if (cleanNodeInfos(id)) {
cleanSubs(id);
}
notifyNodeListener(listener -> listener.nodeLeft(id));
log.debug("node " + id + " left the cluster");
f.complete();
break;
return null;
case EVT_NODE_SEGMENTED:
if (customIgnite || !shutdownOnSegmentation) {
log.warn("node got segmented");
} else {
log.warn("node got segmented and will be shut down");
vertx.close();
}
f.fail(new IllegalStateException("node is stopped"));
break;
throw new IllegalStateException("node is stopped");
default:
f.fail("event not known");
throw new IllegalStateException("event not known");
}
});

Expand All @@ -341,20 +338,16 @@ public void join(Promise<Void> promise) {
subsMapHelper = new SubsMapHelper(ignite, nodeSelector, vertx);
nodeInfoMap = ignite.getOrCreateCache("__vertx.nodeInfo");

try {
MILLISECONDS.sleep(delayAfterStart);
prom.complete();
} catch (InterruptedException e) {
prom.fail(e);
}
MILLISECONDS.sleep(delayAfterStart);
}
return null;
}
}).onComplete(promise);
}

@Override
public void leave(Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
vertx.<Void>executeBlocking(() -> {
synchronized (monitor) {
if (active) {
active = false;
Expand All @@ -375,7 +368,7 @@ public void leave(Promise<Void> promise) {
}
}

prom.complete();
return null;
}).onComplete(promise);
}

Expand All @@ -386,24 +379,23 @@ public boolean isActive() {

@Override
public void addRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
subsMapHelper.put(address, registrationInfo)
.onComplete(prom);
vertx.<Void>executeBlocking(() -> {
subsMapHelper.put(address, registrationInfo);
return null;
}, false).onComplete(promise);
}

@Override
public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
subsMapHelper.remove(address, registrationInfo, prom);
vertx.<Void>executeBlocking(() -> {
subsMapHelper.remove(address, registrationInfo);
return null;
}, false).onComplete(promise);
}

@Override
public void getRegistrations(String address, Promise<List<RegistrationInfo>> promise) {
vertx.<List<RegistrationInfo>>executeBlocking(prom -> {
subsMapHelper.get(address, prom);
}, false).onComplete(promise);
vertx.executeBlocking(() -> subsMapHelper.get(address), false).onComplete(promise);
}

private void cleanSubs(String id) {
Expand Down Expand Up @@ -519,37 +511,37 @@ private CounterImpl(IgniteAtomicLong cnt) {

@Override
public Future<Long> get() {
return vertx.executeBlocking(fut -> fut.complete(cnt.get()));
return vertx.executeBlocking(() -> cnt.get());
}

@Override
public Future<Long> incrementAndGet() {
return vertx.executeBlocking(fut -> fut.complete(cnt.incrementAndGet()));
return vertx.executeBlocking(() -> cnt.incrementAndGet());
}

@Override
public Future<Long> getAndIncrement() {
return vertx.executeBlocking(fut -> fut.complete(cnt.getAndIncrement()));
return vertx.executeBlocking(() -> cnt.getAndIncrement());
}

@Override
public Future<Long> decrementAndGet() {
return vertx.executeBlocking(fut -> fut.complete(cnt.decrementAndGet()));
return vertx.executeBlocking(() -> cnt.decrementAndGet());
}

@Override
public Future<Long> addAndGet(long value) {
return vertx.executeBlocking(fut -> fut.complete(cnt.addAndGet(value)));
return vertx.executeBlocking(() -> cnt.addAndGet(value));
}

@Override
public Future<Long> getAndAdd(long value) {
return vertx.executeBlocking(fut -> fut.complete(cnt.getAndAdd(value)));
return vertx.executeBlocking(() -> cnt.getAndAdd(value));
}

@Override
public Future<Boolean> compareAndSet(long expected, long value) {
return vertx.executeBlocking(fut -> fut.complete(cnt.compareAndSet(expected, value)));
return vertx.executeBlocking(() -> cnt.compareAndSet(expected, value));
}
}
}
18 changes: 5 additions & 13 deletions src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,16 @@ public Future<List<V>> values() {
@Override
public Future<Map<K, V>> entries() {
return vertx.executeBlocking(
promise -> {
() -> {
try {
List<Cache.Entry<K, V>> all = cache.query(new ScanQuery<K, V>()).getAll();
Map<K, V> map = new HashMap<>(all.size());
for (Cache.Entry<K, V> entry : all) {
map.put(unmarshal(entry.getKey()), unmarshal(entry.getValue()));
}
promise.complete(map);
return map;
} catch (final RuntimeException cause) {
promise.fail(new VertxException(cause));
throw new VertxException(cause, true);
}
}
);
Expand All @@ -163,17 +163,9 @@ private <T> Future<T> executeWithTtl(Function<IgniteCache<K, V>, IgniteFuture<T>
: cache;

return vertx.executeBlocking(
promise -> {
() -> {
IgniteFuture<T> future = cacheOp.apply(cache0);
future.listen(
fut -> {
try {
promise.complete(unmarshal(future.get()));
} catch (final RuntimeException e) {
promise.fail(new VertxException(e));
}
}
);
return unmarshal(future.get());
}
);
}
Expand Down
34 changes: 14 additions & 20 deletions src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ public SubsMapHelper(Ignite ignite, NodeSelector nodeSelector, VertxInternal ver
.setLocalListener(l -> listen(l, vertxInternal)));
}

public void get(String address, Promise<List<RegistrationInfo>> promise) {
public List<RegistrationInfo> get(String address) {
if (shutdown) {
promise.complete(null);
return;
return null;
}
try {
List<RegistrationInfo> infos = map.query(new ScanQuery<IgniteRegistrationInfo, Boolean>((k, v) -> k.address().equals(address)))
Expand All @@ -78,25 +77,23 @@ public void get(String address, Promise<List<RegistrationInfo>> promise) {
synchronized (local) {
size += local.size();
if (size == 0) {
promise.complete(Collections.emptyList());
return;
return Collections.emptyList();
}
infos.addAll(local);
}
} else if (size == 0) {
promise.complete(Collections.emptyList());
return;
return Collections.emptyList();
}

promise.complete(infos);
return infos;
} catch (IllegalStateException | CacheException e) {
promise.fail(new VertxException(e));
throw new VertxException(e, true);
}
}

public Future<Void> put(String address, RegistrationInfo registrationInfo) {
public void put(String address, RegistrationInfo registrationInfo) {
if (shutdown) {
return Future.failedFuture(new VertxException("shutdown in progress"));
throw new VertxException("shutdown in progress");
}
try {
if (registrationInfo.localOnly()) {
Expand All @@ -106,9 +103,8 @@ public Future<Void> put(String address, RegistrationInfo registrationInfo) {
map.put(new IgniteRegistrationInfo(address, registrationInfo), Boolean.TRUE);
}
} catch (IllegalStateException | CacheException e) {
return Future.failedFuture(new VertxException(e));
throw new VertxException(e);
}
return Future.succeededFuture();
}

private Set<RegistrationInfo> addToSet(RegistrationInfo registrationInfo, Set<RegistrationInfo> curr) {
Expand All @@ -117,9 +113,8 @@ private Set<RegistrationInfo> addToSet(RegistrationInfo registrationInfo, Set<Re
return res;
}

public void remove(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
public void remove(String address, RegistrationInfo registrationInfo) {
if (shutdown) {
promise.complete();
return;
}
try {
Expand All @@ -129,9 +124,8 @@ public void remove(String address, RegistrationInfo registrationInfo, Promise<Vo
} else {
map.remove(new IgniteRegistrationInfo(address, registrationInfo));
}
promise.complete();
} catch (IllegalStateException | CacheException e) {
promise.fail(new VertxException(e));
throw new VertxException(e, true);
}
}

Expand Down Expand Up @@ -166,20 +160,20 @@ private Future<List<RegistrationInfo>> getAndUpdate(String address) {
prom.future().onSuccess(registrationInfos -> {
nodeSelector.registrationsUpdated(new RegistrationUpdateEvent(address, registrationInfos));
});
get(address, prom);
prom.complete(get(address));
} else {
prom.complete();
}
return prom.future();
}

private void listen(final Iterable<CacheEntryEvent<? extends IgniteRegistrationInfo, ? extends Boolean>> events, final VertxInternal vertxInternal) {
vertxInternal.<List<RegistrationInfo>>executeBlocking(promise -> {
vertxInternal.executeBlocking(() -> {
StreamSupport.stream(events.spliterator(), false)
.map(e -> e.getKey().address())
.distinct()
.forEach(this::fireRegistrationUpdateEvent);
promise.complete();
return null;
});
}
}

0 comments on commit 3004c31

Please sign in to comment.