diff --git a/src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java b/src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java index 0888fc84d..293d20a87 100644 --- a/src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java +++ b/src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java @@ -30,22 +30,26 @@ public class ChaosTestApp { + public static final String APP_LABEL = "APP"; + public static String[] MANUAL_ARGS = ( // "--servers nats://192.168.50.99:4222" "--servers nats://localhost:4222" - + " --stream app-stream" - + " --subject app-subject" + + " --stream jchaos-stream" + + " --subject jchaos-subject" // + " --runtime 3600 // 1 hour in seconds + " --screen left" +// + " --work" + " --create" - + " --r3" +// + " --r3" + " --publish" + " --pubjitter 30" -// + " --simple ordered,100,5000" -// + " --simple durable 100 5000" // space or commas work, the parser figures it out +// + " --simple ephemeral,100,5000" + + " --simple ordered 100 5000" + + " --simple durable 100 5000" // space or commas work, the parser figures it out + " --fetch durable,100,5000" -// + " --push ordered" -// + " --push durable" + + " --push ordered" + + " --push durable" // + " --logdir c:\\temp" ).split(" "); @@ -58,7 +62,7 @@ public static void main(String[] args) throws Exception { try { Output.start(cmd); - Output.controlMessage("APP", cmd.toString().replace(" --", " \n--")); + Output.controlMessage(APP_LABEL, cmd.toString().replace(" --", " \n--")); CountDownLatch waiter = new CountDownLatch(1); Publisher publisher = null; @@ -72,7 +76,7 @@ public static void main(String[] args) throws Exception { createOrReplaceStream(cmd, jsm); } catch (Exception e) { - Output.errorMessage("APP", e.getMessage()); + Output.errorMessage(APP_LABEL, e.getMessage()); } } @@ -93,7 +97,7 @@ public static void main(String[] args) throws Exception { default: throw new IllegalArgumentException("Unsupported consumer type: " + clc.consumerType); } - Output.errorMessage("APP", con.label); + Output.controlMessage(APP_LABEL, con.label); cons.add(con); } } @@ -140,10 +144,10 @@ public static void createOrReplaceStream(CommandLine cmd, JetStreamManagement js .replicas(cmd.r3 ? 3 : 1) .build(); StreamInfo si = jsm.addStream(sc); - Output.controlMessage("APP", "Create Stream\n" + Output.formatted(si.getConfiguration())); + Output.controlMessage(APP_LABEL, "Create Stream\n" + Output.formatted(si.getConfiguration())); } catch (Exception e) { - Output.errorMessage("FATAL", "Failed creating stream: '" + cmd.stream + "' " + e); + Output.fatalMessage(APP_LABEL, "Failed creating stream: '" + cmd.stream + "' " + e); System.exit(-1); } } diff --git a/src/examples/java/io/nats/examples/chaosTestApp/ConnectableConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/ConnectableConsumer.java index 4ecd76c43..d4c9809f4 100644 --- a/src/examples/java/io/nats/examples/chaosTestApp/ConnectableConsumer.java +++ b/src/examples/java/io/nats/examples/chaosTestApp/ConnectableConsumer.java @@ -56,7 +56,7 @@ public ConnectableConsumer(CommandLine cmd, String initials, ConsumerKind consum break; } this.initials = initials; - updateNameAndLabel(name); + label = name + " (" + consumerKind.name() + ")"; errorListener = new OutputErrorListener(label); @@ -70,8 +70,9 @@ public ConnectableConsumer(CommandLine cmd, String initials, ConsumerKind consum public void onMessage(Message m) throws InterruptedException { m.ack(); long seq = m.metaData().streamSequence(); + long lastSeq = lastReceivedSequence.get(); lastReceivedSequence.set(seq); - Output.workMessage(label, "Last Received Seq: " + seq); + Output.workMessage(label, "Last Received Seq: " + seq + "(" + lastSeq + ")"); } public abstract void refreshInfo(); @@ -82,12 +83,11 @@ public void connectionEvent(Connection conn, Events type) { refreshInfo(); } - protected void updateNameAndLabel(String updatedName) { - name = updatedName; - if (updatedName == null) { - label = consumerKind.name(); - } - else { + protected void updateLabel(String conName) { + if (!name.contains(conName)) + { + int at = name.lastIndexOf("-"); + name = name.substring(0, at + 1) + conName; label = name + " (" + consumerKind.name() + ")"; } } diff --git a/src/examples/java/io/nats/examples/chaosTestApp/Monitor.java b/src/examples/java/io/nats/examples/chaosTestApp/Monitor.java index 86ce6231b..24c86a744 100644 --- a/src/examples/java/io/nats/examples/chaosTestApp/Monitor.java +++ b/src/examples/java/io/nats/examples/chaosTestApp/Monitor.java @@ -28,7 +28,7 @@ public class Monitor implements Runnable, java.util.function.Consumer { - static final String LABEL = "MONITOR"; + static final String MONITOR_LABEL = "MONITOR"; static final long REPORT_FREQUENCY = 5000; static final int SHORT_REPORTS = 50; @@ -47,7 +47,6 @@ public Monitor(CommandLine cmd, Publisher publisher, List c @Override public void accept(String s) { reportFull.set(true); - // Output.print(LABEL, s); } @Override @@ -57,10 +56,9 @@ public void run() { .connectionListener((c, t) -> { reportFull.set(true); String s = "Connection: " + c.getServerInfo().getPort() + " " + t; - Output.controlMessage(LABEL, s); - // Output.print(LABEL, s); + Output.controlMessage(MONITOR_LABEL, s); }) - .errorListener(new OutputErrorListener(LABEL, this) {}) + .errorListener(new OutputErrorListener(MONITOR_LABEL, this) {}) .maxReconnects(-1) .build(); @@ -78,7 +76,7 @@ public void run() { StreamInfo si = jsm.getStreamInfo(cmd.stream); String message = "Stream\n" + formatted(si.getConfiguration()) + "\n" + formatted(si.getClusterInfo()); - Output.controlMessage(LABEL, message); + Output.controlMessage(MONITOR_LABEL, message); reportFull.set(false); if (consumers != null) { for (ConnectableConsumer con : consumers) { @@ -108,13 +106,13 @@ public void run() { String pubReport = ""; if (publisher != null) { - pubReport = "| Publisher: " + publisher.getLastSeqno() + + pubReport = " | Publisher: " + publisher.getLastSeqno() + (publisher.isInErrorState() ? " (Paused)" : " (Running)"); } - Output.controlMessage(LABEL, "Uptime: " + uptime(started) + pubReport + conReport); + Output.controlMessage(MONITOR_LABEL, "Uptime: " + uptime(started) + pubReport + conReport); } catch (Exception e) { - Output.controlMessage(LABEL, e.getMessage()); + Output.controlMessage(MONITOR_LABEL, e.getMessage()); reportFull.set(true); } } diff --git a/src/examples/java/io/nats/examples/chaosTestApp/Output.java b/src/examples/java/io/nats/examples/chaosTestApp/Output.java index 4965105bd..ff7ed0acb 100644 --- a/src/examples/java/io/nats/examples/chaosTestApp/Output.java +++ b/src/examples/java/io/nats/examples/chaosTestApp/Output.java @@ -299,8 +299,8 @@ public static void errorMessage(String label, String s) { consoleMessage("ERROR", label, s, System.out); } - public static void errorMessage(String label, String s, PrintStream out) { - consoleMessage("ERROR", label, s, out); + public static void fatalMessage(String label, String s) { + consoleMessage("FATAL", label, s, System.out); } public static void consoleMessage(String area, String label, String s) { diff --git a/src/examples/java/io/nats/examples/chaosTestApp/PushConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/PushConsumer.java index 0638a07ca..c03616180 100644 --- a/src/examples/java/io/nats/examples/chaosTestApp/PushConsumer.java +++ b/src/examples/java/io/nats/examples/chaosTestApp/PushConsumer.java @@ -45,6 +45,6 @@ public PushConsumer(CommandLine cmd, ConsumerKind consumerKind) throws IOExcepti @Override public void refreshInfo() { - updateNameAndLabel(sub.getConsumerName()); + updateLabel(sub.getConsumerName()); } } diff --git a/src/examples/java/io/nats/examples/chaosTestApp/SimpleConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/SimpleConsumer.java index 8d3dcd4f8..ef039c589 100644 --- a/src/examples/java/io/nats/examples/chaosTestApp/SimpleConsumer.java +++ b/src/examples/java/io/nats/examples/chaosTestApp/SimpleConsumer.java @@ -52,6 +52,6 @@ public SimpleConsumer(CommandLine cmd, ConsumerKind consumerKind, int batchSize, @Override public void refreshInfo() { - updateNameAndLabel(mc.getConsumerName()); + updateLabel(mc.getConsumerName()); } } diff --git a/src/examples/java/io/nats/examples/chaosTestApp/SimpleFetchConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/SimpleFetchConsumer.java index 7321027d1..eac63af0e 100644 --- a/src/examples/java/io/nats/examples/chaosTestApp/SimpleFetchConsumer.java +++ b/src/examples/java/io/nats/examples/chaosTestApp/SimpleFetchConsumer.java @@ -22,10 +22,11 @@ public class SimpleFetchConsumer extends ConnectableConsumer implements Runnable { final StreamContext sc; final ConsumerContext cc; - FetchConsumer fc; final int batchSize; final long expiresIn; - Thread t; + final Thread t; + + FetchConsumer fc; public SimpleFetchConsumer(CommandLine cmd, ConsumerKind consumerKind, int batchSize, long expiresIn) throws IOException, InterruptedException, JetStreamApiException { super(cmd, "fc", consumerKind); @@ -38,23 +39,21 @@ public SimpleFetchConsumer(CommandLine cmd, ConsumerKind consumerKind, int batch sc = nc.getStreamContext(cmd.stream); - ConsumeOptions co = ConsumeOptions.builder() - .batchSize(batchSize) - .expiresIn(expiresIn) - .build(); - cc = sc.createOrUpdateConsumer(newCreateConsumer().build()); Output.controlMessage(label, cc.getConsumerName()); - Thread t = new Thread(this); + t = new Thread(this); t.start(); } @Override public void run() { + FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(batchSize).expiresIn(expiresIn).build(); + Output.controlMessage(label, toString(fco)); + //noinspection InfiniteLoopStatement while (true) { - FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(batchSize).expiresIn(expiresIn).build(); - try (FetchConsumer fc = cc.fetch(fco)) { + try (FetchConsumer autoCloseableFc = cc.fetch(fco)) { + fc = autoCloseableFc; Message m = fc.nextMessage(); while (m != null) { onMessage(m); @@ -62,8 +61,7 @@ public void run() { } } catch (Exception e) { - // do we care if the autocloseable FetchConsumer errors on close? - // probably not, but maybe log it. + // if there was an error, just try again } // simulating some work to be done between fetches @@ -81,7 +79,16 @@ public void run() { @Override public void refreshInfo() { if (fc != null) { - updateNameAndLabel(fc.getConsumerName()); + updateLabel(fc.getConsumerName()); } } + + public static String toString(FetchConsumeOptions fco) { + return "FetchConsumeOptions" + + "\nMax Messages: " + fco.getMaxMessages() + + "\nMax Bytes: " + fco.getMaxBytes() + + "\nExpires In: " + fco.getExpiresInMillis() + + "\nIdleHeartbeat: " + fco.getIdleHeartbeat() + + "\nThreshold Percent: " + fco.getThresholdPercent(); + } } diff --git a/src/examples/java/io/nats/examples/jetstream/simple/FetchResilientExample.java b/src/examples/java/io/nats/examples/jetstream/simple/FetchResilientExample.java index ae94313d9..514a4c68c 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/FetchResilientExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/FetchResilientExample.java @@ -139,4 +139,4 @@ public void run() { } } } -} \ No newline at end of file +} diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index a8cb1fb99..078a4fef1 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -748,23 +748,24 @@ public void testOrderedConsumeMultipleSubjects() throws Exception { OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubjects(tsc.subject(0), tsc.subject(1)); OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + int count0 = 0; int count1 = 0; - int count2 = 0; try (FetchConsumer fc = occtx.fetch(FetchConsumeOptions.builder().maxMessages(20).expiresIn(2000).build())) { Message m = fc.nextMessage(); while (m != null) { if (m.getSubject().equals(tsc.subject(0))) { - count1++; + count0++; } else { - count2++; + count1++; } + m.ack(); m = fc.nextMessage(); } } - assertEquals(10, count1); - assertEquals(5, count2); + assertEquals(10, count0); + assertEquals(5, count1); }); }