Skip to content

Commit

Permalink
Switch to atomic methods for better node and subscription cleanup, ad…
Browse files Browse the repository at this point in the history
…d logging for failures
  • Loading branch information
zyclonite committed Aug 4, 2024
1 parent 5d6c38c commit dff2dcc
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 17 deletions.
53 changes: 49 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
<version>1.4.12</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -162,14 +162,44 @@
</argLine>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<excludes>
<exclude>**/it/**/*Test.java</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<failIfNoSpecifiedTests>false</failIfNoSpecifiedTests>
<systemPropertyVariables>
<java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
<IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED>true</IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED>
<IGNITE_NO_ASCII>true</IGNITE_NO_ASCII>
<IGNITE_UPDATE_NOTIFIER>false</IGNITE_UPDATE_NOTIFIER>
<IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE>1000</IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE>
<vertx.logger-delegate-factory-class-name>io.vertx.core.logging.SLF4JLogDelegateFactory</vertx.logger-delegate-factory-class-name>
<io.netty.leakDetectionLevel>PARANOID</io.netty.leakDetectionLevel>
<buildDirectory>${project.build.directory}</buildDirectory>
<vertxVersion>${project.version}</vertxVersion>
</systemPropertyVariables>
<argLine>
<!-- Needs to be small enough to run in a EC2 1.7GB small instance -->
-Xms512M -Xmx1200M
</argLine>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/it/**/*Test.java</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
Expand Down Expand Up @@ -216,6 +246,21 @@
</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<argLine>
<!-- Needs to be small enough to run in a EC2 1.7GB small instance -->
-Xms512M -Xmx1200M
<!-- setup for JDK 17+ to replace illegal-access -->
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public void join(Promise<Void> promise) {
break;
case EVT_NODE_LEFT:
case EVT_NODE_FAILED:
if (cleanNodeInfos(id)) {
if(cleanNodeInfos(id)) {
cleanSubs(id);
}
notifyNodeListener(listener -> listener.nodeLeft(id));
Expand Down Expand Up @@ -407,18 +407,14 @@ public void getRegistrations(String address, Promise<List<RegistrationInfo>> pro
}

private void cleanSubs(String id) {
try {
subsMapHelper.removeAllForNode(id);
} catch (IllegalStateException | CacheException e) {
//ignore
}
subsMapHelper.removeAllForNode(id);
}

private boolean cleanNodeInfos(String nid) {
try {
return nodeInfoMap.remove(nid);
} catch (IllegalStateException | CacheException e) {
//ignore
log.warn("Could not remove nodeInfo (" + nid + "): " + e.getMessage());
}
return false;
}
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.vertx.core.Promise;
import io.vertx.core.VertxException;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
Expand All @@ -43,6 +45,8 @@
* @author Lukas Prettenthaler
*/
public class SubsMapHelper {
private static final Logger log = LoggerFactory.getLogger(SubsMapHelper.class);

private final IgniteCache<IgniteRegistrationInfo, Boolean> map;
private final NodeSelector nodeSelector;
private final ConcurrentMap<String, Set<RegistrationInfo>> localSubs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -127,7 +131,7 @@ public void remove(String address, RegistrationInfo registrationInfo, Promise<Vo
localSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
fireRegistrationUpdateEvent(address);
} else {
map.remove(new IgniteRegistrationInfo(address, registrationInfo));
map.remove(new IgniteRegistrationInfo(address, registrationInfo), Boolean.TRUE);
}
promise.complete();
} catch (IllegalStateException | CacheException e) {
Expand All @@ -145,10 +149,12 @@ public void removeAllForNode(String nodeId) {
.getAll().stream()
.map(Cache.Entry::getKey)
.collect(Collectors.toCollection(TreeSet::new));
try {
map.removeAll(toRemove);
} catch (IllegalStateException | CacheException t) {
//ignore
for (IgniteRegistrationInfo info : toRemove) {
try {
map.remove(info, Boolean.TRUE);
} catch (IllegalStateException | CacheException t) {
log.warn("Could not remove subscriber: " + t.getMessage());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testInterval() throws Exception {
Throttling throttling = new Throttling((VertxInternal) vertx, address -> {
events.compute(address, (k, v) -> {
if (v == null) {
v = Collections.synchronizedList(new LinkedList<>());
v = Collections.synchronizedList(Collections.synchronizedList(new LinkedList<>()));
}
v.add(System.nanoTime());
return v;
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
</encoder>
</appender>
<logger name="org.apache.ignite.IgniteLogger" level="ERROR"/>
<logger name="io.vertx.core.eventbus.impl.clustered.ConnectionHolder" level="ERROR"/>
<root level="${testLogLevel:-INFO}">
<appender-ref ref="STDOUT"/>
</root>
Expand Down

0 comments on commit dff2dcc

Please sign in to comment.