Skip to content

Commit

Permalink
Support 409 Leadership Change (#1144)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored May 17, 2024
1 parent 54ca97f commit 43bfd1b
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 18 deletions.
11 changes: 7 additions & 4 deletions src/main/java/io/nats/client/impl/PullMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,26 @@ protected ManageResult manageStatus(Message msg) {
case CONFLICT_CODE:
// sometimes just a warning
String statMsg = status.getMessage();
if (statMsg.startsWith("Exceeded Max")) {
if (statMsg.startsWith("Exceeded Max")
|| statMsg.equals(SERVER_SHUTDOWN)
|| statMsg.equals(LEADERSHIP_CHANGE)
) {
if (raiseStatusWarnings) {
conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status));
}
return STATUS_HANDLED;
}

if (statMsg.equals(BATCH_COMPLETED) ||
statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES) ||
statMsg.equals(SERVER_SHUTDOWN))
statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES))
{
return STATUS_TERMINUS;
}
break;
}

// all others are errors
// All unknown 409s are errors, since that basically means the client is not aware of them.
// These known ones are also errors: "Consumer Deleted" and "Consumer is push based"
conn.executeCallback((c, el) -> el.pullStatusError(c, sub, status));
return STATUS_ERROR;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class Status {

public static String BATCH_COMPLETED = "Batch Completed"; // 409 informational
public static String SERVER_SHUTDOWN = "Server Shutdown"; // 409 informational with headers
public static String LEADERSHIP_CHANGE = "Leadership Change"; // 409

private final int code;
private final String message;
Expand Down
29 changes: 15 additions & 14 deletions src/test/java/io/nats/client/impl/JetStreamPullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -947,20 +947,21 @@ public void testConsumerDeletedSyncSub() throws Exception {
});
}

// This just flaps. It's a timing thing? Already spent too much time, IWOMM and it should work as is.
// @Test
// public void testConsumerDeletedAsyncSub() throws Exception {
// testConflictStatus(409, CONSUMER_DELETED, TYPE_ERROR, "2.9.6", (nc, jsm, js, tsc, handler) -> {
// jsm.addOrUpdateConsumer(tsc.stream, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build());
// Dispatcher d = nc.createDispatcher();
// PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, durable(1));
// JetStreamSubscription sub = js.subscribe(null, d, m -> {}, so);
// sub.pullExpiresIn(1, 30000);
// jsm.deleteConsumer(tsc.stream, durable(1));
// js.publish(tsc.subject(), null);
// return sub;
// });
// }
// This just flaps. It's a timing thing? Already spent too much time, IWOMM and it should work as is.
@Test
@Disabled
public void testConsumerDeletedAsyncSub() throws Exception {
testConflictStatus(409, CONSUMER_DELETED, TYPE_ERROR, "2.9.6", (nc, jsm, js, tsc, handler) -> {
jsm.addOrUpdateConsumer(tsc.stream, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build());
Dispatcher d = nc.createDispatcher();
PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, durable(1));
JetStreamSubscription sub = js.subscribe(null, d, m -> {}, so);
sub.pullExpiresIn(1, 30000);
jsm.deleteConsumer(tsc.stream, durable(1));
js.publish(tsc.subject(), null);
return sub;
});
}

static class BadPullRequestOptions extends PullRequestOptions {
public BadPullRequestOptions() {
Expand Down
79 changes: 79 additions & 0 deletions src/test/java/io/nats/client/utils/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,6 +95,10 @@ public interface TwoServerTest {
void test(Connection nc1, Connection nc2) throws Exception;
}

public interface ThreeServerTest {
void test(Connection nc1, Connection nc2, Connection nc3) throws Exception;
}

public interface VersionCheck {
boolean runTest(ServerInfo si);
}
Expand Down Expand Up @@ -319,6 +324,80 @@ public static void runInJsHubLeaf(TwoServerTest twoServerTest) throws Exception
}
}

public static void runInJsCluster(ThreeServerTest threeServerTest) throws Exception {
int port1 = NatsTestServer.nextPort();
int port2 = NatsTestServer.nextPort();
int port3 = NatsTestServer.nextPort();
int listen1 = NatsTestServer.nextPort();
int listen2 = NatsTestServer.nextPort();
int listen3 = NatsTestServer.nextPort();
String path1 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
String path2 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
String path3 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
String cluster = variant();
String serverPrefix = variant();

String[] server1Inserts = new String[] {
"jetstream {",
" store_dir=" + path1,
"}",
"server_name=" + serverPrefix + "1",
"cluster {",
" name: " + cluster,
" listen: 127.0.0.1:" + listen1,
" routes: [",
" nats-route://127.0.0.1:" + listen2,
" nats-route://127.0.0.1:" + listen3,
" ]",
"}",
};

String[] server2Inserts = new String[] {
"jetstream {",
" store_dir=" + path2,
"}",
"server_name=" + serverPrefix + "2",
"cluster {",
" name: " + cluster,
" listen: 127.0.0.1:" + listen2,
" routes: [",
" nats-route://127.0.0.1:" + listen1,
" nats-route://127.0.0.1:" + listen3,
" ]",
"}",
};

String[] server3Inserts = new String[] {
"jetstream {",
" store_dir=" + path3,
"}",
"server_name=" + serverPrefix + "3",
"cluster {",
" name: " + cluster,
" listen: 127.0.0.1:" + listen3,
" routes: [",
" nats-route://127.0.0.1:" + listen1,
" nats-route://127.0.0.1:" + listen2,
" ]",
"}",
};

try (NatsTestServer srv1 = new NatsTestServer(port1, false, true, null, server1Inserts, null);
Connection nc1 = standardConnection(srv1.getURI());
NatsTestServer srv2 = new NatsTestServer(port2, false, true, null, server2Inserts, null);
Connection nc2 = standardConnection(srv2.getURI());
NatsTestServer srv3 = new NatsTestServer(port3, false, true, null, server3Inserts, null);
Connection nc3 = standardConnection(srv3.getURI())
) {
try {
threeServerTest.test(nc1, nc2, nc3);
}
finally {
cleanupJs(nc1);
}
}
}

private static void cleanupJs(Connection c)
{
try {
Expand Down

0 comments on commit 43bfd1b

Please sign in to comment.