Skip to content

Commit

Permalink
Chaos App tuning and additional testing while porting. (#1080)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Feb 19, 2024
1 parent f13128b commit f9f34e2
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 52 deletions.
28 changes: 16 additions & 12 deletions src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@

public class ChaosTestApp {

public static final String APP_LABEL = "APP";

public static String[] MANUAL_ARGS = (
// "--servers nats://192.168.50.99:4222"
"--servers nats://localhost:4222"
+ " --stream app-stream"
+ " --subject app-subject"
+ " --stream jchaos-stream"
+ " --subject jchaos-subject"
// + " --runtime 3600 // 1 hour in seconds
+ " --screen left"
// + " --work"
+ " --create"
+ " --r3"
// + " --r3"
+ " --publish"
+ " --pubjitter 30"
// + " --simple ordered,100,5000"
// + " --simple durable 100 5000" // space or commas work, the parser figures it out
// + " --simple ephemeral,100,5000"
+ " --simple ordered 100 5000"
+ " --simple durable 100 5000" // space or commas work, the parser figures it out
+ " --fetch durable,100,5000"
// + " --push ordered"
// + " --push durable"
+ " --push ordered"
+ " --push durable"
// + " --logdir c:\\temp"
).split(" ");

Expand All @@ -58,7 +62,7 @@ public static void main(String[] args) throws Exception {

try {
Output.start(cmd);
Output.controlMessage("APP", cmd.toString().replace(" --", " \n--"));
Output.controlMessage(APP_LABEL, cmd.toString().replace(" --", " \n--"));
CountDownLatch waiter = new CountDownLatch(1);

Publisher publisher = null;
Expand All @@ -72,7 +76,7 @@ public static void main(String[] args) throws Exception {
createOrReplaceStream(cmd, jsm);
}
catch (Exception e) {
Output.errorMessage("APP", e.getMessage());
Output.errorMessage(APP_LABEL, e.getMessage());
}
}

Expand All @@ -93,7 +97,7 @@ public static void main(String[] args) throws Exception {
default:
throw new IllegalArgumentException("Unsupported consumer type: " + clc.consumerType);
}
Output.errorMessage("APP", con.label);
Output.controlMessage(APP_LABEL, con.label);
cons.add(con);
}
}
Expand Down Expand Up @@ -140,10 +144,10 @@ public static void createOrReplaceStream(CommandLine cmd, JetStreamManagement js
.replicas(cmd.r3 ? 3 : 1)
.build();
StreamInfo si = jsm.addStream(sc);
Output.controlMessage("APP", "Create Stream\n" + Output.formatted(si.getConfiguration()));
Output.controlMessage(APP_LABEL, "Create Stream\n" + Output.formatted(si.getConfiguration()));
}
catch (Exception e) {
Output.errorMessage("FATAL", "Failed creating stream: '" + cmd.stream + "' " + e);
Output.fatalMessage(APP_LABEL, "Failed creating stream: '" + cmd.stream + "' " + e);
System.exit(-1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ConnectableConsumer(CommandLine cmd, String initials, ConsumerKind consum
break;
}
this.initials = initials;
updateNameAndLabel(name);
label = name + " (" + consumerKind.name() + ")";

errorListener = new OutputErrorListener(label);

Expand All @@ -70,8 +70,9 @@ public ConnectableConsumer(CommandLine cmd, String initials, ConsumerKind consum
public void onMessage(Message m) throws InterruptedException {
m.ack();
long seq = m.metaData().streamSequence();
long lastSeq = lastReceivedSequence.get();
lastReceivedSequence.set(seq);
Output.workMessage(label, "Last Received Seq: " + seq);
Output.workMessage(label, "Last Received Seq: " + seq + "(" + lastSeq + ")");
}

public abstract void refreshInfo();
Expand All @@ -82,12 +83,11 @@ public void connectionEvent(Connection conn, Events type) {
refreshInfo();
}

protected void updateNameAndLabel(String updatedName) {
name = updatedName;
if (updatedName == null) {
label = consumerKind.name();
}
else {
protected void updateLabel(String conName) {
if (!name.contains(conName))
{
int at = name.lastIndexOf("-");
name = name.substring(0, at + 1) + conName;
label = name + " (" + consumerKind.name() + ")";
}
}
Expand Down
16 changes: 7 additions & 9 deletions src/examples/java/io/nats/examples/chaosTestApp/Monitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class Monitor implements Runnable, java.util.function.Consumer<String> {

static final String LABEL = "MONITOR";
static final String MONITOR_LABEL = "MONITOR";
static final long REPORT_FREQUENCY = 5000;
static final int SHORT_REPORTS = 50;

Expand All @@ -47,7 +47,6 @@ public Monitor(CommandLine cmd, Publisher publisher, List<ConnectableConsumer> c
@Override
public void accept(String s) {
reportFull.set(true);
// Output.print(LABEL, s);
}

@Override
Expand All @@ -57,10 +56,9 @@ public void run() {
.connectionListener((c, t) -> {
reportFull.set(true);
String s = "Connection: " + c.getServerInfo().getPort() + " " + t;
Output.controlMessage(LABEL, s);
// Output.print(LABEL, s);
Output.controlMessage(MONITOR_LABEL, s);
})
.errorListener(new OutputErrorListener(LABEL, this) {})
.errorListener(new OutputErrorListener(MONITOR_LABEL, this) {})
.maxReconnects(-1)
.build();

Expand All @@ -78,7 +76,7 @@ public void run() {
StreamInfo si = jsm.getStreamInfo(cmd.stream);
String message = "Stream\n" + formatted(si.getConfiguration())
+ "\n" + formatted(si.getClusterInfo());
Output.controlMessage(LABEL, message);
Output.controlMessage(MONITOR_LABEL, message);
reportFull.set(false);
if (consumers != null) {
for (ConnectableConsumer con : consumers) {
Expand Down Expand Up @@ -108,13 +106,13 @@ public void run() {

String pubReport = "";
if (publisher != null) {
pubReport = "| Publisher: " + publisher.getLastSeqno() +
pubReport = " | Publisher: " + publisher.getLastSeqno() +
(publisher.isInErrorState() ? " (Paused)" : " (Running)");
}
Output.controlMessage(LABEL, "Uptime: " + uptime(started) + pubReport + conReport);
Output.controlMessage(MONITOR_LABEL, "Uptime: " + uptime(started) + pubReport + conReport);
}
catch (Exception e) {
Output.controlMessage(LABEL, e.getMessage());
Output.controlMessage(MONITOR_LABEL, e.getMessage());
reportFull.set(true);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/examples/java/io/nats/examples/chaosTestApp/Output.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ public static void errorMessage(String label, String s) {
consoleMessage("ERROR", label, s, System.out);
}

public static void errorMessage(String label, String s, PrintStream out) {
consoleMessage("ERROR", label, s, out);
public static void fatalMessage(String label, String s) {
consoleMessage("FATAL", label, s, System.out);
}

public static void consoleMessage(String area, String label, String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ public PushConsumer(CommandLine cmd, ConsumerKind consumerKind) throws IOExcepti

@Override
public void refreshInfo() {
updateNameAndLabel(sub.getConsumerName());
updateLabel(sub.getConsumerName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ public SimpleConsumer(CommandLine cmd, ConsumerKind consumerKind, int batchSize,

@Override
public void refreshInfo() {
updateNameAndLabel(mc.getConsumerName());
updateLabel(mc.getConsumerName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
public class SimpleFetchConsumer extends ConnectableConsumer implements Runnable {
final StreamContext sc;
final ConsumerContext cc;
FetchConsumer fc;
final int batchSize;
final long expiresIn;
Thread t;
final Thread t;

FetchConsumer fc;

public SimpleFetchConsumer(CommandLine cmd, ConsumerKind consumerKind, int batchSize, long expiresIn) throws IOException, InterruptedException, JetStreamApiException {
super(cmd, "fc", consumerKind);
Expand All @@ -38,32 +39,29 @@ public SimpleFetchConsumer(CommandLine cmd, ConsumerKind consumerKind, int batch

sc = nc.getStreamContext(cmd.stream);

ConsumeOptions co = ConsumeOptions.builder()
.batchSize(batchSize)
.expiresIn(expiresIn)
.build();

cc = sc.createOrUpdateConsumer(newCreateConsumer().build());
Output.controlMessage(label, cc.getConsumerName());
Thread t = new Thread(this);
t = new Thread(this);
t.start();
}

@Override
public void run() {
FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(batchSize).expiresIn(expiresIn).build();
Output.controlMessage(label, toString(fco));

//noinspection InfiniteLoopStatement
while (true) {
FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(batchSize).expiresIn(expiresIn).build();
try (FetchConsumer fc = cc.fetch(fco)) {
try (FetchConsumer autoCloseableFc = cc.fetch(fco)) {
fc = autoCloseableFc;
Message m = fc.nextMessage();
while (m != null) {
onMessage(m);
m = fc.nextMessage();
}
}
catch (Exception e) {
// do we care if the autocloseable FetchConsumer errors on close?
// probably not, but maybe log it.
// if there was an error, just try again
}

// simulating some work to be done between fetches
Expand All @@ -81,7 +79,16 @@ public void run() {
@Override
public void refreshInfo() {
if (fc != null) {
updateNameAndLabel(fc.getConsumerName());
updateLabel(fc.getConsumerName());
}
}

public static String toString(FetchConsumeOptions fco) {
return "FetchConsumeOptions" +
"\nMax Messages: " + fco.getMaxMessages() +
"\nMax Bytes: " + fco.getMaxBytes() +
"\nExpires In: " + fco.getExpiresInMillis() +
"\nIdleHeartbeat: " + fco.getIdleHeartbeat() +
"\nThreshold Percent: " + fco.getThresholdPercent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,4 @@ public void run() {
}
}
}
}
}
11 changes: 6 additions & 5 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -748,23 +748,24 @@ public void testOrderedConsumeMultipleSubjects() throws Exception {
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubjects(tsc.subject(0), tsc.subject(1));
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);

int count0 = 0;
int count1 = 0;
int count2 = 0;
try (FetchConsumer fc = occtx.fetch(FetchConsumeOptions.builder().maxMessages(20).expiresIn(2000).build())) {
Message m = fc.nextMessage();
while (m != null) {
if (m.getSubject().equals(tsc.subject(0))) {
count1++;
count0++;
}
else {
count2++;
count1++;
}
m.ack();
m = fc.nextMessage();
}
}

assertEquals(10, count1);
assertEquals(5, count2);
assertEquals(10, count0);
assertEquals(5, count1);
});
}

Expand Down

0 comments on commit f9f34e2

Please sign in to comment.