From b05b87ec9c146330e3bf6a6cbdbc8296935aa138 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Tue, 19 Dec 2023 12:28:48 -0500 Subject: [PATCH] Chaos Testing Tuning (#1055) --- .../examples/chaosTestApp/ChaosTestApp.java | 150 ++++++++ .../chaosTestApp/ConnectableConsumer.java | 111 ++++++ .../nats/examples/chaosTestApp/Monitor.java | 131 +++++++ .../io/nats/examples/chaosTestApp/Output.java | 356 ++++++++++++++++++ .../chaosTestApp/OutputErrorListener.java | 132 +++++++ .../nats/examples/chaosTestApp/Publisher.java | 89 +++++ .../examples/chaosTestApp/PushConsumer.java | 50 +++ .../examples/chaosTestApp/SimpleConsumer.java | 57 +++ .../chaosTestApp/SimpleFetchConsumer.java | 87 +++++ .../chaosTestApp/support/CommandLine.java | 290 ++++++++++++++ .../support/CommandLineConsumer.java | 53 +++ .../chaosTestApp/support/ConsumerKind.java | 27 ++ .../chaosTestApp/support/ConsumerType.java | 27 ++ .../java/io/nats/client/ConsumerContext.java | 6 - .../io/nats/client/JetStreamSubscription.java | 2 +- .../java/io/nats/client/MessageConsumer.java | 4 +- .../nats/client/api/StreamConfiguration.java | 4 +- .../nats/client/impl/NatsConsumerContext.java | 46 ++- .../nats/client/impl/NatsFetchConsumer.java | 9 +- .../io/nats/client/impl/NatsJetStream.java | 2 +- .../nats/client/impl/NatsJetStreamImpl.java | 14 +- .../nats/client/impl/NatsMessageConsumer.java | 7 +- .../client/impl/NatsMessageConsumerBase.java | 4 - .../client/impl/OrderedMessageManager.java | 3 +- .../nats/client/impl/PullMessageManager.java | 28 +- ...er.java => PullOrderedMessageManager.java} | 4 +- .../impl/SimplifiedSubscriptionMaker.java | 5 +- .../nats/client/impl/SimplificationTests.java | 16 +- 28 files changed, 1640 insertions(+), 74 deletions(-) create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/ConnectableConsumer.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/Monitor.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/Output.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/OutputErrorListener.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/Publisher.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/PushConsumer.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/SimpleConsumer.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/SimpleFetchConsumer.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/support/CommandLine.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/support/CommandLineConsumer.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/support/ConsumerKind.java create mode 100644 src/examples/java/io/nats/examples/chaosTestApp/support/ConsumerType.java rename src/main/java/io/nats/client/impl/{OrderedPullMessageManager.java => PullOrderedMessageManager.java} (95%) diff --git a/src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java b/src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java new file mode 100644 index 000000000..0888fc84d --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java @@ -0,0 +1,150 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.Connection; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.api.StorageType; +import io.nats.client.api.StreamConfiguration; +import io.nats.client.api.StreamInfo; +import io.nats.examples.chaosTestApp.support.CommandLine; +import io.nats.examples.chaosTestApp.support.CommandLineConsumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ChaosTestApp { + + public static String[] MANUAL_ARGS = ( +// "--servers nats://192.168.50.99:4222" + "--servers nats://localhost:4222" + + " --stream app-stream" + + " --subject app-subject" +// + " --runtime 3600 // 1 hour in seconds + + " --screen left" + + " --create" + + " --r3" + + " --publish" + + " --pubjitter 30" +// + " --simple ordered,100,5000" +// + " --simple durable 100 5000" // space or commas work, the parser figures it out + + " --fetch durable,100,5000" +// + " --push ordered" +// + " --push durable" +// + " --logdir c:\\temp" + ).split(" "); + + public static void main(String[] args) throws Exception { + + args = MANUAL_ARGS; // comment out for real command line + + CommandLine cmd = new CommandLine(args); + Monitor monitor; + + try { + Output.start(cmd); + Output.controlMessage("APP", cmd.toString().replace(" --", " \n--")); + CountDownLatch waiter = new CountDownLatch(1); + + Publisher publisher = null; + List cons = null; + + if (cmd.create) { + Options options = cmd.makeManagmentOptions(); + try (Connection nc = Nats.connect(options)) { + System.out.println(nc.getServerInfo()); + JetStreamManagement jsm = nc.jetStreamManagement(); + createOrReplaceStream(cmd, jsm); + } + catch (Exception e) { + Output.errorMessage("APP", e.getMessage()); + } + } + + if (!cmd.commandLineConsumers.isEmpty()) { + cons = new ArrayList<>(); + for (CommandLineConsumer clc : cmd.commandLineConsumers) { + ConnectableConsumer con; + switch (clc.consumerType) { + case Push: + con = new PushConsumer(cmd, clc.consumerKind); + break; + case Simple: + con = new SimpleConsumer(cmd, clc.consumerKind, clc.batchSize, clc.expiresIn); + break; + case Fetch: + con = new SimpleFetchConsumer(cmd, clc.consumerKind, clc.batchSize, clc.expiresIn); + break; + default: + throw new IllegalArgumentException("Unsupported consumer type: " + clc.consumerType); + } + Output.errorMessage("APP", con.label); + cons.add(con); + } + } + + if (cmd.publish) { + publisher = new Publisher(cmd, cmd.pubjitter); + Thread pubThread = new Thread(publisher); + pubThread.start(); + } + + // just creating the stream? + if (publisher == null && cons == null) { + return; + } + + monitor = new Monitor(cmd, publisher, cons); + Thread monThread = new Thread(monitor); + monThread.start(); + + long runtime = cmd.runtime < 1 ? Long.MAX_VALUE : cmd.runtime; + //noinspection ResultOfMethodCallIgnored + waiter.await(runtime, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + //noinspection CallToPrintStackTrace + e.printStackTrace(); + } + finally { + Output.dumpControl(); + System.exit(0); + } + } + + public static void createOrReplaceStream(CommandLine cmd, JetStreamManagement jsm) { + try { + jsm.deleteStream(cmd.stream); + } + catch (Exception ignore) {} + try { + StreamConfiguration sc = StreamConfiguration.builder() + .name(cmd.stream) + .storageType(StorageType.File) + .subjects(cmd.subject) + .replicas(cmd.r3 ? 3 : 1) + .build(); + StreamInfo si = jsm.addStream(sc); + Output.controlMessage("APP", "Create Stream\n" + Output.formatted(si.getConfiguration())); + } + catch (Exception e) { + Output.errorMessage("FATAL", "Failed creating stream: '" + cmd.stream + "' " + e); + System.exit(-1); + } + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/ConnectableConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/ConnectableConsumer.java new file mode 100644 index 000000000..4ecd76c43 --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/ConnectableConsumer.java @@ -0,0 +1,111 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.*; +import io.nats.client.api.ConsumerConfiguration; +import io.nats.client.api.DeliverPolicy; +import io.nats.examples.chaosTestApp.support.CommandLine; +import io.nats.examples.chaosTestApp.support.ConsumerKind; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +public abstract class ConnectableConsumer implements ConnectionListener { + + protected final Connection nc; + protected final JetStream js; + protected final OutputErrorListener errorListener; + protected final AtomicLong lastReceivedSequence; + protected final MessageHandler handler; + protected final ConsumerKind consumerKind; + + protected final CommandLine cmd; + protected String initials; + protected String name; + protected String durableName; + protected String label; + + public ConnectableConsumer(CommandLine cmd, String initials, ConsumerKind consumerKind) throws IOException, InterruptedException, JetStreamApiException { + this.cmd = cmd; + lastReceivedSequence = new AtomicLong(0); + this.consumerKind = consumerKind; + switch (consumerKind) { + case Durable: + durableName = initials + "-dur-" + new NUID().nextSequence(); + name = durableName; + break; + case Ephemeral: + durableName = null; + name = initials + "-eph-" + new NUID().nextSequence(); + break; + case Ordered: + durableName = null; + name = initials + "-ord-" + new NUID().nextSequence(); + break; + } + this.initials = initials; + updateNameAndLabel(name); + + errorListener = new OutputErrorListener(label); + + Options options = cmd.makeOptions(this, errorListener); + nc = Nats.connect(options); + js = nc.jetStream(); + + handler = this::onMessage; + } + + public void onMessage(Message m) throws InterruptedException { + m.ack(); + long seq = m.metaData().streamSequence(); + lastReceivedSequence.set(seq); + Output.workMessage(label, "Last Received Seq: " + seq); + } + + public abstract void refreshInfo(); + + @Override + public void connectionEvent(Connection conn, Events type) { + Output.controlMessage(label, "Connection: " + conn.getServerInfo().getPort() + " " + type.name().toLowerCase()); + refreshInfo(); + } + + protected void updateNameAndLabel(String updatedName) { + name = updatedName; + if (updatedName == null) { + label = consumerKind.name(); + } + else { + label = name + " (" + consumerKind.name() + ")"; + } + } + + public long getLastReceivedSequence() { + return lastReceivedSequence.get(); + } + + protected ConsumerConfiguration.Builder newCreateConsumer() { + return recreateConsumer(0); + } + + private ConsumerConfiguration.Builder recreateConsumer(long last) { + return ConsumerConfiguration.builder() + .name(consumerKind == ConsumerKind.Ordered ? null : name) + .durable(durableName) + .deliverPolicy(last == 0 ? DeliverPolicy.All : DeliverPolicy.ByStartSequence) + .startSequence(last == 0 ? -1 : last + 1) + .filterSubject(cmd.subject); + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/Monitor.java b/src/examples/java/io/nats/examples/chaosTestApp/Monitor.java new file mode 100644 index 000000000..86ce6231b --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/Monitor.java @@ -0,0 +1,131 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.Connection; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.api.StreamInfo; +import io.nats.examples.chaosTestApp.support.CommandLine; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static io.nats.examples.chaosTestApp.Output.formatted; + +public class Monitor implements Runnable, java.util.function.Consumer { + + static final String LABEL = "MONITOR"; + static final long REPORT_FREQUENCY = 5000; + static final int SHORT_REPORTS = 50; + + final CommandLine cmd; + final Publisher publisher; + final List consumers; + final AtomicBoolean reportFull; + + public Monitor(CommandLine cmd, Publisher publisher, List consumers) { + this.cmd = cmd; + this.publisher = publisher; + this.consumers = consumers; + reportFull = new AtomicBoolean(true); + } + + @Override + public void accept(String s) { + reportFull.set(true); + // Output.print(LABEL, s); + } + + @Override + public void run() { + Options options = new Options.Builder() + .servers(cmd.servers) + .connectionListener((c, t) -> { + reportFull.set(true); + String s = "Connection: " + c.getServerInfo().getPort() + " " + t; + Output.controlMessage(LABEL, s); + // Output.print(LABEL, s); + }) + .errorListener(new OutputErrorListener(LABEL, this) {}) + .maxReconnects(-1) + .build(); + + long started = System.currentTimeMillis(); + int shortReportsOwed = 0; + try (Connection nc = Nats.connect(options)) { + JetStreamManagement jsm = nc.jetStreamManagement(); + //noinspection InfiniteLoopStatement + while (true) { + //noinspection BusyWait + Thread.sleep(REPORT_FREQUENCY); + try { + StringBuilder conReport = new StringBuilder(); + if (reportFull.get()) { + StreamInfo si = jsm.getStreamInfo(cmd.stream); + String message = "Stream\n" + formatted(si.getConfiguration()) + + "\n" + formatted(si.getClusterInfo()); + Output.controlMessage(LABEL, message); + reportFull.set(false); + if (consumers != null) { + for (ConnectableConsumer con : consumers) { + con.refreshInfo(); + } + } + } + if (shortReportsOwed < 1) { + shortReportsOwed = SHORT_REPORTS; + if (consumers != null) { + for (ConnectableConsumer con : consumers) { + conReport.append("\n").append(con.label).append(" | Last Sequence: ").append(con.getLastReceivedSequence()); + } + } + } + else { + shortReportsOwed--; + if (consumers != null) { + for (ConnectableConsumer con : consumers) { + conReport.append(" | ") + .append(con.name) + .append(": ") + .append(con.getLastReceivedSequence()); + } + } + } + + String pubReport = ""; + if (publisher != null) { + pubReport = "| Publisher: " + publisher.getLastSeqno() + + (publisher.isInErrorState() ? " (Paused)" : " (Running)"); + } + Output.controlMessage(LABEL, "Uptime: " + uptime(started) + pubReport + conReport); + } + catch (Exception e) { + Output.controlMessage(LABEL, e.getMessage()); + reportFull.set(true); + } + } + } + catch (Exception e) { + e.printStackTrace(); + System.exit(-1); + } + } + + private static String uptime(long started) { + return Duration.ofMillis(System.currentTimeMillis() - started).toString().replace("PT", ""); + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/Output.java b/src/examples/java/io/nats/examples/chaosTestApp/Output.java new file mode 100644 index 000000000..4965105bd --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/Output.java @@ -0,0 +1,356 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.support.JsonSerializable; +import io.nats.client.support.JsonValue; +import io.nats.examples.chaosTestApp.support.CommandLine; + +import javax.swing.*; +import javax.swing.text.BadLocationException; +import javax.swing.text.Document; +import java.awt.*; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class Output extends JPanel { + public enum Screen {Left, Main, Console} + + static final Object workLock = new Object(); + static final Object controlLock = new Object(); + static final Object debugLock = new Object(); + + static boolean console; + static boolean work; + static boolean debug; + + static boolean started; + static Output workInstance; + static Output controlInstance; + static Output debugInstance; + static PrintStream workLog; + static PrintStream controlLog; + static PrintStream debugLog; + static String controlConsoleAreaLabel = null; + + static final int HEIGHT_REDUCTION = 45; + + static final Font DISPLAY_FONT; + static final int SCREEN_AVAILABLE_WIDTH; + static final int SCREEN_AVAILABLE_HEIGHT; + static final int ROWS; + + JTextArea area; + + static { + // figure UI_FONT + String fontName = Font.MONOSPACED; + GraphicsEnvironment localEnv; + localEnv= GraphicsEnvironment.getLocalGraphicsEnvironment(); + String allfonts[] = localEnv.getAvailableFontFamilyNames(); + for (String allfont : allfonts) { + if (allfont.equals("JetBrains Mono")) { + fontName = allfont; + break; + } + } + DISPLAY_FONT = new Font(fontName, Font.PLAIN, 14); + Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize(); + SCREEN_AVAILABLE_WIDTH = (int)screenSize.getWidth(); + SCREEN_AVAILABLE_HEIGHT = (int)screenSize.getHeight() - HEIGHT_REDUCTION; + + ROWS = SCREEN_AVAILABLE_HEIGHT / 21; + } + + public static void start(CommandLine cmd) { + if (started) { + return; + } + + started = true; + console = cmd.uiScreen == Screen.Console; + work = cmd.work; + debug = cmd.debug; + + if (console && (work || debug)) { + controlConsoleAreaLabel = "CTRL"; + } + + // LOG FILES + if (cmd.logdir != null) { + File f = new File(cmd.logdir); + if (!f.exists() && !f.mkdirs()) { + errorMessage("OUTPUT", "Unable to create logdir: " + cmd.logdir); + System.exit(-1); + } + String template = "applog-which.txt"; + try { + String fn = template.replace("which", "control"); + Path p = Paths.get(f.getAbsolutePath(), fn); + controlLog = new PrintStream(new FileOutputStream(p.toFile())); + + if (debug) { + fn = template.replace("which", "debug"); + p = Paths.get(f.getAbsolutePath(), fn); + debugLog = new PrintStream(new FileOutputStream(p.toFile())); + } + + if (work) { + fn = template.replace("which", "work"); + p = Paths.get(f.getAbsolutePath(), fn); + workLog = new PrintStream(new FileOutputStream(p.toFile())); + } + } + catch (FileNotFoundException e) { + errorMessage("OUTPUT", "Unable to create log file: " + e); + System.exit(-1); + } + } + + // SCREEN + int debugWidth = (int)(SCREEN_AVAILABLE_WIDTH * 0.42); + int workWidth = (int)(SCREEN_AVAILABLE_WIDTH * 0.24); + int controlWidth = SCREEN_AVAILABLE_WIDTH - debugWidth - workWidth; + int offset = 0; + if (cmd.uiScreen == Screen.Left) { + if (debug || work) { + if (debug) { + debugInstance = newUi("Debug", -debugWidth, debugWidth, SCREEN_AVAILABLE_HEIGHT); + offset = -debugWidth; + } + if (work) { + offset -= controlWidth; + controlInstance = newUi("Control", offset, controlWidth, SCREEN_AVAILABLE_HEIGHT); + workInstance = newUi("Work", offset - workWidth, workWidth, SCREEN_AVAILABLE_HEIGHT); + } + else { + offset -= debugWidth; + controlInstance = newUi("Control", offset, debugWidth, SCREEN_AVAILABLE_HEIGHT); + } + } + else { + offset = -SCREEN_AVAILABLE_WIDTH / 2; + controlInstance = newUi("Control", offset, -offset, SCREEN_AVAILABLE_HEIGHT); + } + } + else if (cmd.uiScreen == Screen.Main) { + if (debug || work) { + if (debug) { + debugInstance = newUi("Debug", 0, debugWidth, SCREEN_AVAILABLE_HEIGHT); + offset = debugWidth; + } + if (work) { + controlInstance = newUi("Control", offset, controlWidth, SCREEN_AVAILABLE_HEIGHT); + offset += controlWidth; + workInstance = newUi("Work", offset, workWidth, SCREEN_AVAILABLE_HEIGHT); + } + else { + controlInstance = newUi("Control", offset, debugWidth, SCREEN_AVAILABLE_HEIGHT); + } + } + else { + offset = SCREEN_AVAILABLE_WIDTH / 2; + controlInstance = newUi("Control", 0, offset, SCREEN_AVAILABLE_HEIGHT); + } + } + } + + private static Output newUi(String name, int xLoc, int width, int height) { + //Create and set up the window. + JFrame frame = new JFrame(name); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + + //Add contents to the window. + Output output = new Output(); + frame.add(output); + + //Display the window. + frame.setLocation(xLoc, 0); + frame.setPreferredSize(new Dimension(width, height)); + frame.pack(); + frame.setVisible(true); + return output; + } + + private Output() { + super(new GridLayout(1, 1)); + area = new JTextArea(ROWS, 40); + area.setEditable(false); + area.setFont(DISPLAY_FONT); + add(new JScrollPane(area)); + } + + private static String time() { + String t = "" + System.currentTimeMillis(); + return t.substring(t.length() - 9); + } + + public static void workMessage(String label, String s) { + if (work) { + synchronized (workLock) { + if (console) { + consoleMessage("WORK", label, s); + } + else { + append(label, s, workInstance.area); + } + if (workLog != null) { + consoleMessage(null, label, s, workLog); + } + } + } + } + + public static void controlMessage(String label, JsonSerializable j) { + controlMessage(label, formatted(j)); + } + + public static void controlMessage(String label, String jvLabel, JsonValue jv) { + controlMessage(label, formatted(jv).replace("JsonValue", jvLabel)); + } + + public static void controlMessage(String label, String s) { + synchronized (controlLock) { + if (console) { + consoleMessage(controlConsoleAreaLabel, label, s); + } + else { + append(label, s, controlInstance.area); + } + if (workLog != null) { + consoleMessage(null, label, s, controlLog); + } + } + } + + public static void dumpControl() { + dump("Control", controlInstance.area.getDocument()); + } + + private static void dump(String label, Document document) { + try { + System.out.println("----------------------------------------------------------------------------------------------------"); + System.out.println("UI-" + label); + System.out.println("----------------------------------------------------------------------------------------------------"); + System.out.println(document.getText(0, document.getLength())); + System.out.println("----------------------------------------------------------------------------------------------------"); + } + catch (BadLocationException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public static void debugMessage(String label, String s) { + if (debug) { + synchronized (debugLock) { + if (console) { + consoleMessage("DEBUG", label, s); + } + else { + debugInstance.area.append(s); + debugInstance.area.append("\n"); + afterAppend(debugInstance.area); + } + if (debugLog != null) { + consoleMessage("DEBUG", label, s + "\n", controlLog); + } + } + } + } + + static final String NLINDENT = "\n "; + private static void append(String label, String s, JTextArea area) { + if (s.contains("\n")) { + String timeLabel = time() + " | " + label; + area.append(timeLabel); + if (!s.startsWith("\n")) { + area.append(" | "); + } + area.append(s.replace("\n", NLINDENT)); + } + else { + area.append(time()); + area.append(" | "); + area.append(label); + area.append(" | "); + area.append(s); + } + area.append("\n"); + afterAppend(area); + } + + public static void errorMessage(String label, String s) { + consoleMessage("ERROR", label, s, System.out); + } + + public static void errorMessage(String label, String s, PrintStream out) { + consoleMessage("ERROR", label, s, out); + } + + public static void consoleMessage(String area, String label, String s) { + consoleMessage(area, label, s, System.out); + } + + public static void consoleMessage(String area, String label, String s, PrintStream out) { + out.print(time()); + String llabel = label == null ? "" : " | " + label; + out.print(area == null ? llabel : " | " + area + llabel); + + if (s.contains("\n")) { + if (!s.startsWith("\n")) { + out.print(" | "); + } + out.print(s.replace("\n", NLINDENT)); + } + else { + out.print(" | "); + out.print(s); + } + out.println(); + } + + private static void afterAppend(JTextArea area) { + int c = area.getLineCount(); + while (c > ROWS) { + try { + int end = area.getLineEndOffset(1); + area.getDocument().remove(0, end); + c = area.getLineCount(); + } + catch (BadLocationException e) { + throw new RuntimeException(e); + } + } + } + + public static String FN = "\n "; + public static String FBN = "{\n "; + public static String formatted(JsonSerializable j) { + return j.getClass().getSimpleName() + j.toJson() + .replace("{\"", FBN + "\"").replace(",", "," + FN); + } + + public static String formatted(Object o) { + return formatted(o.toString()); + } + + public static String formatted(String s) { + return s.replace("{", FBN).replace(", ", "," + FN); + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/OutputErrorListener.java b/src/examples/java/io/nats/examples/chaosTestApp/OutputErrorListener.java new file mode 100644 index 000000000..6fabc7595 --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/OutputErrorListener.java @@ -0,0 +1,132 @@ +// Copyright 2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.*; +import io.nats.client.api.ServerInfo; +import io.nats.client.support.Status; + +public class OutputErrorListener implements ErrorListener { + String id; + java.util.function.Consumer watcher; + + public OutputErrorListener(String id) { + this(id, null); + } + + public OutputErrorListener(String id, java.util.function.Consumer watcher) { + this.id = id; + this.watcher = watcher; + } + + private String supplyMessage(String label, Connection conn, Consumer consumer, Subscription sub, Object... pairs) { + StringBuilder sb = new StringBuilder(label); + if (conn != null) { + ServerInfo si = conn.getServerInfo(); + if (si != null) { + sb.append(", CONN: ").append(conn.getServerInfo().getClientId()); + } + } + if (consumer != null) { + sb.append(", CON: ").append(consumer.hashCode()); + } + if (sub != null) { + sb.append(", SUB: ").append(sub.hashCode()); + if (sub instanceof JetStreamSubscription) { + JetStreamSubscription jssub = (JetStreamSubscription)sub; + sb.append(", CON: ").append(jssub.getConsumerName()); + } + } + for (int x = 0; x < pairs.length; x++) { + sb.append(", ").append(pairs[x]).append(pairs[++x]); + } + if (watcher != null) { + watcher.accept(sb.toString()); + } + return sb.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public void errorOccurred(final Connection conn, final String error) { + Output.controlMessage(id, supplyMessage("SEVERE errorOccurred", conn, null, null, "Error: ", error)); + } + + /** + * {@inheritDoc} + */ + @Override + public void exceptionOccurred(final Connection conn, final Exception exp) { + Output.controlMessage(id, supplyMessage("SEVERE exceptionOccurred", conn, null, null, "EX: ", exp)); + } + + /** + * {@inheritDoc} + */ + @Override + public void slowConsumerDetected(final Connection conn, final Consumer consumer) { + Output.controlMessage(id, supplyMessage("WARN slowConsumerDetected", conn, consumer, null)); + } + + /** + * {@inheritDoc} + */ + @Override + public void messageDiscarded(final Connection conn, final Message msg) { + Output.controlMessage(id, supplyMessage("INFO messageDiscarded", conn, null, null, "Message: ", msg)); + } + + /** + * {@inheritDoc} + */ + @Override + public void heartbeatAlarm(final Connection conn, final JetStreamSubscription sub, + final long lastStreamSequence, final long lastConsumerSequence) { + Output.controlMessage(id, supplyMessage("SEVERE HB Alarm", conn, null, sub, "lastStreamSeq: ", lastStreamSequence, "lastConsumerSeq: ", lastConsumerSequence)); + } + + /** + * {@inheritDoc} + */ + @Override + public void unhandledStatus(final Connection conn, final JetStreamSubscription sub, final Status status) { + Output.controlMessage(id, supplyMessage("WARN unhandledStatus", conn, null, sub, "Status:", status)); + } + + /** + * {@inheritDoc} + */ + @Override + public void pullStatusWarning(Connection conn, JetStreamSubscription sub, Status status) { +// Output.controlMessage(id, supplyMessage("WARN pullStatusWarning", conn, null, sub, "Status:", status)); + } + + /** + * {@inheritDoc} + */ + @Override + public void pullStatusError(Connection conn, JetStreamSubscription sub, Status status) { + Output.controlMessage(id, supplyMessage("SEVERE pullStatusError", conn, null, sub, "Status:", status)); + } + + /** + * {@inheritDoc} + */ + @Override + public void flowControlProcessed(Connection conn, JetStreamSubscription sub, String id, FlowControlSource source) { + Output.controlMessage(this.id, supplyMessage("INFO flowControlProcessed", conn, null, sub, "FlowControlSource:", source)); + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/Publisher.java b/src/examples/java/io/nats/examples/chaosTestApp/Publisher.java new file mode 100644 index 000000000..fcc408605 --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/Publisher.java @@ -0,0 +1,89 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.Connection; +import io.nats.client.JetStream; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.api.PublishAck; +import io.nats.examples.chaosTestApp.support.CommandLine; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +public class Publisher implements Runnable { + + static final String LABEL = "PUBLISHER"; + + final CommandLine cmd; + final long pubDelay; + final AtomicLong lastSeqno = new AtomicLong(-1); + final AtomicLong errorRun = new AtomicLong(0); + + public Publisher(CommandLine cmd, long pubDelay) { + this.cmd = cmd; + this.pubDelay = pubDelay; + } + + public long getLastSeqno() { + return lastSeqno.get(); + } + public boolean isInErrorState() { + return errorRun.get() > 0; + } + + @Override + public void run() { + Options options = new Options.Builder() + .servers(cmd.servers) + .connectionListener((c, t) -> Output.controlMessage(LABEL, "Connection: " + c.getServerInfo().getPort() + " " + t)) + .errorListener(new OutputErrorListener(LABEL) {}) + .maxReconnects(-1) + .build(); + + try (Connection nc = Nats.connect(options)) { + JetStream js = nc.jetStream(); + //noinspection InfiniteLoopStatement + while (true) { + if (lastSeqno.get() == -1) { + Output.controlMessage(LABEL, "Starting Publish"); + lastSeqno.set(0); + } + try { + PublishAck pa = js.publish(cmd.subject, null); + lastSeqno.set(pa.getSeqno()); + if (errorRun.get() > 0) { + Output.controlMessage(LABEL, "Restarting Publish"); + } + errorRun.set(0); + } + catch (Exception e) { + if (errorRun.incrementAndGet() == 1) { + Output.controlMessage(LABEL, e.getMessage()); + } + } + try { + //noinspection BusyWait + Thread.sleep(ThreadLocalRandom.current().nextLong(pubDelay)); + } + catch (InterruptedException ignore) {} + } + } + catch (Exception e) { + e.printStackTrace(); + System.exit(-1); + } + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/PushConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/PushConsumer.java new file mode 100644 index 000000000..0638a07ca --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/PushConsumer.java @@ -0,0 +1,50 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.Dispatcher; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamSubscription; +import io.nats.client.PushSubscribeOptions; +import io.nats.examples.chaosTestApp.support.CommandLine; +import io.nats.examples.chaosTestApp.support.ConsumerKind; + +import java.io.IOException; + +public class PushConsumer extends ConnectableConsumer { + final Dispatcher d; + final JetStreamSubscription sub; + + public PushConsumer(CommandLine cmd, ConsumerKind consumerKind) throws IOException, InterruptedException, JetStreamApiException { + super(cmd, "pu", consumerKind); + + d = nc.createDispatcher(); + + PushSubscribeOptions pso = PushSubscribeOptions.builder() + .stream(cmd.stream) + .configuration(newCreateConsumer() + .idleHeartbeat(1000) + .build()) + .ordered(consumerKind == ConsumerKind.Ordered) + .build(); + + sub = js.subscribe(cmd.subject, d, handler, false, pso); + Output.controlMessage(label, sub.getConsumerName()); + } + + @Override + public void refreshInfo() { + updateNameAndLabel(sub.getConsumerName()); + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/SimpleConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/SimpleConsumer.java new file mode 100644 index 000000000..8d3dcd4f8 --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/SimpleConsumer.java @@ -0,0 +1,57 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.*; +import io.nats.client.api.OrderedConsumerConfiguration; +import io.nats.examples.chaosTestApp.support.CommandLine; +import io.nats.examples.chaosTestApp.support.ConsumerKind; + +import java.io.IOException; + +public class SimpleConsumer extends ConnectableConsumer { + final StreamContext sc; + final ConsumerContext cc; + final OrderedConsumerContext occ; + final MessageConsumer mc; + + public SimpleConsumer(CommandLine cmd, ConsumerKind consumerKind, int batchSize, long expiresIn) throws IOException, InterruptedException, JetStreamApiException { + super(cmd, "sc", consumerKind); + + sc = nc.getStreamContext(cmd.stream); + + ConsumeOptions co = ConsumeOptions.builder() + .batchSize(batchSize) + .expiresIn(expiresIn) + .build(); + + if (consumerKind == ConsumerKind.Ordered) { + OrderedConsumerConfiguration ocConfig = new OrderedConsumerConfiguration().filterSubjects(cmd.subject); + cc = null; + occ = sc.createOrderedConsumer(ocConfig); + mc = occ.consume(co, handler); + } + else { + occ = null; + cc = sc.createOrUpdateConsumer(newCreateConsumer().build()); + mc = cc.consume(co, handler); + } + Output.controlMessage(label, mc.getConsumerName()); + } + + @Override + public void refreshInfo() { + updateNameAndLabel(mc.getConsumerName()); + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/SimpleFetchConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/SimpleFetchConsumer.java new file mode 100644 index 000000000..7321027d1 --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/SimpleFetchConsumer.java @@ -0,0 +1,87 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp; + +import io.nats.client.*; +import io.nats.examples.chaosTestApp.support.CommandLine; +import io.nats.examples.chaosTestApp.support.ConsumerKind; + +import java.io.IOException; + +public class SimpleFetchConsumer extends ConnectableConsumer implements Runnable { + final StreamContext sc; + final ConsumerContext cc; + FetchConsumer fc; + final int batchSize; + final long expiresIn; + Thread t; + + public SimpleFetchConsumer(CommandLine cmd, ConsumerKind consumerKind, int batchSize, long expiresIn) throws IOException, InterruptedException, JetStreamApiException { + super(cmd, "fc", consumerKind); + if (consumerKind == ConsumerKind.Ordered) { + throw new IllegalArgumentException("Ordered Consumer not supported for App Simple Fetch"); + } + + this.batchSize = batchSize; + this.expiresIn = expiresIn; + + sc = nc.getStreamContext(cmd.stream); + + ConsumeOptions co = ConsumeOptions.builder() + .batchSize(batchSize) + .expiresIn(expiresIn) + .build(); + + cc = sc.createOrUpdateConsumer(newCreateConsumer().build()); + Output.controlMessage(label, cc.getConsumerName()); + Thread t = new Thread(this); + t.start(); + } + + @Override + public void run() { + //noinspection InfiniteLoopStatement + while (true) { + FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(batchSize).expiresIn(expiresIn).build(); + try (FetchConsumer fc = cc.fetch(fco)) { + Message m = fc.nextMessage(); + while (m != null) { + onMessage(m); + m = fc.nextMessage(); + } + } + catch (Exception e) { + // do we care if the autocloseable FetchConsumer errors on close? + // probably not, but maybe log it. + } + + // simulating some work to be done between fetches + try { + Output.workMessage(label, "Fetch Batch Completed, Last Received Seq: " + lastReceivedSequence.get()); + //noinspection BusyWait + Thread.sleep(10); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void refreshInfo() { + if (fc != null) { + updateNameAndLabel(fc.getConsumerName()); + } + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/support/CommandLine.java b/src/examples/java/io/nats/examples/chaosTestApp/support/CommandLine.java new file mode 100644 index 000000000..8f3c66d67 --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/support/CommandLine.java @@ -0,0 +1,290 @@ +// Copyright 2021-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp.support; + +import io.nats.client.ConnectionListener; +import io.nats.client.ErrorListener; +import io.nats.client.Options; +import io.nats.examples.chaosTestApp.Output; + +import java.util.ArrayList; +import java.util.List; + +import static java.lang.Integer.parseInt; + +public class CommandLine { + + private void usage() { + System.out.println( + "----------------------------------------------------------------------------------------------------\n" + + "APP COMMAND LINE\n" + + "----------------------------------------------------------------------------------------------------\n" + + "--servers [,]*\n" + + " * i.e. --servers nats://localhost:4000,nats://localhost:4001,nats://localhost:4002\n" + + " * not supplied uses \"ats://localhost:4222\"\n" + + "--stream \n" + + " * not supplied uses \"app-stream\"\n" + + "--subject \n" + + " * not supplied uses \"app-subject\"\n" + + "--runtime <>m|<>s|<>ms|<>\n" + + " * not supplied or zero or negative infinite" + + " * m minute, s second ms millseconds no suffix milliseonds\n" + + "--debug\n" + + " * show the debug window\n" + + "--work\n" + + " * work the work window\n" + + "--screen console is default\n" + + "--simple ,batchSize,expiresInMs\n" + + "--simple batchSize expiresInMs\n" + + " * Simple Consumer.\n" + + "--fetch, batchSize expiresInMs\n" + + " * Simple Fetch Consumer.\n" + + "--push,\n" + + " * Push Consumer.\n" + + "*** One or more consumers is required.\n" + + "--create\n" + + " * (Re)create the stream.\n" + + "--r3\n" + + " * Make the stream R3 when (Re)create the stream.\n" + + "--publish\n" + + " * Turns on publishing.\n" + + "--pubjitter\n" + + " * publish jitter in milliseconds, amount of time to pause between publish\n" + + " * not supplied uses 50ms\n" + + "--logdir\n" + + " * Directory to log to. Only logs if supplied\n" + + "----------------------------------------------------------------------------------------------------\n" + ); + } + + public final String[] servers; + public final String stream; + public final String subject; + public final String logdir; + public final long runtime; + public final long pubjitter; + public final boolean create; + public final boolean r3; + public final boolean publish; + public final boolean debug; + public final boolean work; + public final Output.Screen uiScreen; + public final List commandLineConsumers; + + public Options makeManagmentOptions() { + return makeOptions((conn, event) -> {}, new ErrorListener() {}, 0); + } + + public Options makeOptions(ConnectionListener cl, ErrorListener el) { + return makeOptions(cl, el, -1); + } + + public Options makeOptions(ConnectionListener cl, ErrorListener el, int maxReconnects) { + return new Options.Builder() + .servers(servers) + .connectionListener(cl) + .errorListener(el) + .maxReconnects(maxReconnects) + .build(); + } + + // ---------------------------------------------------------------------------------------------------- + // ToString + // ---------------------------------------------------------------------------------------------------- + private void append(StringBuilder sb, String label, Object value, boolean test) { + if (test) { + sb.append("--").append(label).append(" ").append(value).append(" "); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("Chaos Test App Config "); + append(sb, "servers", String.join(",", servers), true); + append(sb, "stream", stream, true); + append(sb, "subject", subject, true); + append(sb, "logdir", logdir, logdir != null); + append(sb, "runtime", runtime, true); + append(sb, "create", create, create); + append(sb, "R3", r3, r3); + append(sb, "publish", publish, publish); + append(sb, "pubjitter", pubjitter, publish); + append(sb, "debug", debug, debug); + append(sb, "work", work, work); + append(sb, "screen", uiScreen, true); + for (CommandLineConsumer cc : commandLineConsumers) { + append(sb, "consumer", cc, true); + } + return sb.toString().trim(); + } + + // ---------------------------------------------------------------------------------------------------- + // Construction + // ---------------------------------------------------------------------------------------------------- + public CommandLine(String[] args) { + try { + String[] _servers = new String[]{Options.DEFAULT_URL}; + String _stream = "app-stream"; + String _subject = "app-subject"; + String _logdir = null; + long _runtime = -1; + long _publishJitter = 50; + boolean _debug = false; + boolean _create = false; + boolean _r3 = false; + boolean _publish = false; + boolean _work = false; + Output.Screen _uiScreen = Output.Screen.Console; + List _commandLineConsumers = new ArrayList<>(); + + if (args != null && args.length > 0) { + try { + for (int x = 0; x < args.length; x++) { + String arg = args[x].trim(); + if (arg.isEmpty()) { + continue; + } + switch (arg) { + case "--servers": + _servers = asString(args[++x]).split(","); + break; + case "--stream": + _stream = asString(args[++x]); + break; + case "--subject": + _subject = asString(args[++x]); + break; + case "--logdir": + _logdir = asString(args[++x]); + break; + case "--runtime": + _runtime = (long) asNumber("runtime", args[++x], -1) * 1000; + break; + case "--pubjitter": + _publishJitter = asNumber("pubjitter", args[++x], -1); + break; + case "--create": + _create = true; + break; + case "--r3": + _r3 = true; + break; + case "--publish": + _publish = true; + break; + case "--debug": + _debug = true; + break; + case "--work": + _work = true; + break; + case "--screen": + String screen = asString(args[++x]).toLowerCase(); + if (screen.equals("left")) { + _uiScreen = Output.Screen.Left; + } + else if (screen.equals("center")) { + _uiScreen = Output.Screen.Main; + } + else { + throw new IllegalArgumentException("Unknown Screen"); + } + break; + case "--simple": + case "--fetch": + String temp = args[++x]; + if (temp.contains(",")) { + String[] split = temp.split(","); + _commandLineConsumers.add(new CommandLineConsumer( + arg.substring(2), + split[0], + asNumber("batchSize", split[1], -1), + asNumber("expiresInMs", split[2], -1) + )); + } + else { + _commandLineConsumers.add(new CommandLineConsumer( + arg.substring(2), + temp, + asNumber("batchSize", args[++x], -1), + asNumber("expiresInMs", args[++x], -1) + )); + } + break; + case "--push": + _commandLineConsumers.add(new CommandLineConsumer(args[++x])); + break; + default: + throw new IllegalArgumentException("Unknown argument: " + arg); + } + } + } + catch (Exception e) { + e.printStackTrace(); + throw new IllegalArgumentException("Exception while parsing, most likely missing an argument value."); + } + } + + if (!_create && !_publish && _commandLineConsumers.isEmpty()) { + throw new IllegalArgumentException("Consumer commands are required if not creating or publishing"); + } + + servers = _servers; + stream = _stream; + subject = _subject; + logdir = _logdir; + runtime = _runtime; + create = _create; + r3 =_r3; + publish = _publish; + pubjitter = _publishJitter; + debug = _debug; + work = _work; + uiScreen = _uiScreen; + commandLineConsumers = _commandLineConsumers; + } + catch (RuntimeException e) { + usage(); + throw e; + } + } + + private String asString(String val) { + return val.trim(); + } + + private int asNumber(String name, String val, int upper) { + int v = parseInt(val); + if (upper == -2 && v < 1) { + return Integer.MAX_VALUE; + } + if (upper > 0) { + if (v > upper) { + throw new IllegalArgumentException("Value for " + name + " cannot exceed " + upper); + } + } + return v; + } + + private int asNumber(String name, String val, int lower, int upper) { + int v = parseInt(val); + if (v < lower) { + throw new IllegalArgumentException("Value for " + name + " cannot be less than " + lower); + } + if (v > upper) { + throw new IllegalArgumentException("Value for " + name + " cannot exceed " + upper); + } + return v; + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/support/CommandLineConsumer.java b/src/examples/java/io/nats/examples/chaosTestApp/support/CommandLineConsumer.java new file mode 100644 index 000000000..a51240375 --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/support/CommandLineConsumer.java @@ -0,0 +1,53 @@ +// Copyright 2021-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp.support; + +public class CommandLineConsumer { + public final ConsumerType consumerType; + public final ConsumerKind consumerKind; + public final int batchSize; + public final long expiresIn; + + public CommandLineConsumer(String consumerKind) { + this.consumerType = ConsumerType.Push; + this.consumerKind = ConsumerKind.instance(consumerKind); + batchSize = 0; + expiresIn = 0; + } + + public CommandLineConsumer(String consumerType, String consumerKind, int batchSize, long expiresIn) { + this.consumerType = ConsumerType.instance(consumerType); + this.consumerKind = ConsumerKind.instance(consumerKind); + if (batchSize < 1) { + throw new IllegalArgumentException("Invalid Batch Size:" + batchSize); + } + this.batchSize = batchSize; + if (expiresIn < 1_000) { + throw new IllegalArgumentException("Expires must be >= 1000ms"); + } + this.expiresIn = expiresIn; + } + + @Override + public String toString() { + if (consumerType == ConsumerType.Simple) { + return consumerType.toString().toLowerCase() + + " " + consumerKind.toString().toLowerCase() + + " " + batchSize + + " " + expiresIn; + } + return consumerType.toString().toLowerCase() + + " " + consumerKind.toString().toLowerCase(); + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/support/ConsumerKind.java b/src/examples/java/io/nats/examples/chaosTestApp/support/ConsumerKind.java new file mode 100644 index 000000000..58ad4217a --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/support/ConsumerKind.java @@ -0,0 +1,27 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp.support; + +public enum ConsumerKind { + Ephemeral, Durable, Ordered; + + public static ConsumerKind instance(String text) { + for (ConsumerKind k : ConsumerKind.values()) { + if (k.name().equalsIgnoreCase(text)) { + return k; + } + } + throw new IllegalArgumentException("Invalid ConsumerKind: " + text); + } +} diff --git a/src/examples/java/io/nats/examples/chaosTestApp/support/ConsumerType.java b/src/examples/java/io/nats/examples/chaosTestApp/support/ConsumerType.java new file mode 100644 index 000000000..8447e4ea7 --- /dev/null +++ b/src/examples/java/io/nats/examples/chaosTestApp/support/ConsumerType.java @@ -0,0 +1,27 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.chaosTestApp.support; + +public enum ConsumerType { + Simple, Fetch, Push; + + public static ConsumerType instance(String text) { + for (ConsumerType t : ConsumerType.values()) { + if (t.name().equalsIgnoreCase(text)) { + return t; + } + } + throw new IllegalArgumentException("Invalid ConsumerType: " + text); + } +} diff --git a/src/main/java/io/nats/client/ConsumerContext.java b/src/main/java/io/nats/client/ConsumerContext.java index fad8ac553..9147d5e8e 100644 --- a/src/main/java/io/nats/client/ConsumerContext.java +++ b/src/main/java/io/nats/client/ConsumerContext.java @@ -21,12 +21,6 @@ * The Consumer Context provides a convenient interface around a defined JetStream Consumer */ public interface ConsumerContext extends BaseConsumerContext { - /** - * Gets the consumer name that was used to create the context. - * @return the consumer name - */ - String getConsumerName(); - /** * Gets information about the consumer behind this subscription. * @return consumer information diff --git a/src/main/java/io/nats/client/JetStreamSubscription.java b/src/main/java/io/nats/client/JetStreamSubscription.java index e57072bb5..c12000f80 100644 --- a/src/main/java/io/nats/client/JetStreamSubscription.java +++ b/src/main/java/io/nats/client/JetStreamSubscription.java @@ -26,7 +26,7 @@ public interface JetStreamSubscription extends Subscription { /** - * Gets the consumer name that was used to create the subscription. + * Gets the consumer name associated with the subscription. * @return the consumer name */ String getConsumerName(); diff --git a/src/main/java/io/nats/client/MessageConsumer.java b/src/main/java/io/nats/client/MessageConsumer.java index a1f29257d..1ef6bec42 100644 --- a/src/main/java/io/nats/client/MessageConsumer.java +++ b/src/main/java/io/nats/client/MessageConsumer.java @@ -23,8 +23,8 @@ */ public interface MessageConsumer extends AutoCloseable { /** - * Gets the consumer name that was used to create the message consumer - * since in some cases the consumer info is not available. + * Gets the consumer name associated with the subscription. + * Some simplified consumer types do not support this, so it might be null. * @return the consumer name */ String getConsumerName(); diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index 4cecb867d..20ee6b175 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -145,9 +145,9 @@ static StreamConfiguration instance(JsonValue v) { * Returns a StreamConfiguration deserialized from its JSON form. * * @see #toJson() - * @param json + * @param json the json representing the Stream Configuration * @return StreamConfiguration for the given json - * @throws JsonParseException + * @throws JsonParseException thrown if the parsing fails for invalid json */ public static StreamConfiguration instance(String json) throws JsonParseException { return instance(JsonParser.parse(json)); diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index 1bfa2ebf4..d2e37857a 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -81,15 +81,14 @@ static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Bui } @Override - public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Dispatcher userDispatcher, PullMessageManager optionalPmm) throws IOException, JetStreamApiException { + public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Dispatcher userDispatcher, PullMessageManager optionalPmm, Long optionalInactiveThreshold) throws IOException, JetStreamApiException { PullSubscribeOptions pso; if (ordered) { if (lastConsumer != null) { highestSeq = Math.max(highestSeq, lastConsumer.pmm.lastStreamSeq); } - ConsumerConfiguration cc = lastConsumer == null - ? originalOrderedCc - : streamCtx.js.consumerConfigurationForOrdered(originalOrderedCc, highestSeq, null, null); + ConsumerConfiguration cc = streamCtx.js.consumerConfigurationForOrdered( + originalOrderedCc, highestSeq, null, null, optionalInactiveThreshold); pso = new OrderedPullSubscribeOptionsBuilder(streamCtx.streamName, cc).build(); } else { @@ -177,29 +176,42 @@ public Message next(Duration maxWait) throws IOException, InterruptedException, */ @Override public Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { - NatsMessageConsumerBase con; + if (maxWaitMillis < MIN_EXPIRES_MILLS) { + throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds."); + } + + NatsMessageConsumerBase nmcb = null; synchronized (stateLock) { checkState(); - if (maxWaitMillis < MIN_EXPIRES_MILLS) { - throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds."); - } - con = new NatsMessageConsumerBase(cachedConsumerInfo); - con.initSub(subscribe(null, null, null)); - con.sub._pull(PullRequestOptions.builder(1) - .expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT) - .build(), false, null); - trackConsume(con); + try { + long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait + nmcb = new NatsMessageConsumerBase(cachedConsumerInfo); + nmcb.initSub(subscribe(null, null, null, inactiveThreshold)); + nmcb.sub._pull(PullRequestOptions.builder(1) + .expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT) + .build(), false, null); + trackConsume(nmcb); + } + catch (Exception e) { + if (nmcb != null) { + try { + nmcb.close(); + } + catch (Exception ignore) {} + } + return null; + } } // intentionally outside of lock try { - return con.sub.nextMessage(maxWaitMillis); + return nmcb.sub.nextMessage(maxWaitMillis); } finally { try { - con.finished.set(true); - con.close(); + nmcb.finished.set(true); + nmcb.close(); } catch (Exception e) { // from close/autocloseable, but we know it doesn't actually throw diff --git a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java index 0e214ec40..61374b056 100644 --- a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java @@ -29,20 +29,21 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer { super(cachedConsumerInfo); - maxWaitNanos = fetchConsumeOptions.getExpiresInMillis() * 1_000_000; + long expiresInMillis = fetchConsumeOptions.getExpiresInMillis(); + maxWaitNanos = expiresInMillis * 1_000_000; + long inactiveThreshold = expiresInMillis * 110 / 100; // ten % longer than the wait PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages()) .maxBytes(fetchConsumeOptions.getMaxBytes()) .expiresIn(fetchConsumeOptions.getExpiresInMillis()) .idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat()) .build(); - initSub(subscriptionMaker.subscribe(null, null, null)); + initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold)); pullSubject = sub._pull(pro, false, this); startNanos = -1; } @Override - public void pendingUpdated() { - } + public void pendingUpdated() {} @Override public void heartbeatError() { diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index be1567c9f..ad152d1d7 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -235,7 +235,7 @@ MessageManager createMessageManager( MessageManagerFactory _pullMessageManagerFactory = (mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new PullMessageManager(mmConn, mmSo, mmSyncMode); MessageManagerFactory _pullOrderedMessageManagerFactory = - (mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new OrderedPullMessageManager(mmConn, mmJs, mmStream, mmSo, mmCc, mmSyncMode); + (mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new PullOrderedMessageManager(mmConn, mmJs, mmStream, mmSo, mmCc, mmSyncMode); JetStreamSubscription createSubscription(String userSubscribeSubject, PushSubscribeOptions pushSubscribeOptions, diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index fee8acd14..78743e26f 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -183,19 +183,27 @@ String generateConsumerName() { ConsumerConfiguration consumerConfigurationForOrdered( ConsumerConfiguration originalCc, long lastStreamSeq, - String newDeliverSubject, String consumerName) + String newDeliverSubject, + String consumerName, + Long inactiveThreshold) { ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(originalCc) - .deliverPolicy(DeliverPolicy.ByStartSequence) .deliverSubject(newDeliverSubject) - .startSequence(Math.max(1, lastStreamSeq + 1)) .startTime(null); // clear start time in case it was originally set + if (lastStreamSeq > 0) { + builder.deliverPolicy(DeliverPolicy.ByStartSequence) + .startSequence(Math.max(1, lastStreamSeq + 1)); + } + if (consumerName != null && consumerCreate290Available) { builder.name(consumerName); } + if (inactiveThreshold != null) { + builder.inactiveThreshold(inactiveThreshold); + } return builder.build(); } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java index bdd8afcab..aa7987631 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java @@ -51,10 +51,6 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager @Override public void heartbeatError() { - restart(); - } - - private void restart() { try { // just close the current sub and make another one. // this could go on endlessly @@ -74,9 +70,10 @@ void doSub() throws JetStreamApiException, IOException { } }; try { - super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm)); + super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null)); repull(); stopped.set(false); + finished.set(false); } catch (JetStreamApiException | IOException e) { setupHbAlarmToTrigger(); diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java index fd1f4218f..76fddc40c 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java @@ -52,10 +52,6 @@ public boolean isFinished() { return finished.get(); } - protected void setCachedConsumerInfo(ConsumerInfo cachedConsumerInfo) { - this.cachedConsumerInfo = cachedConsumerInfo; - } - /** * {@inheritDoc} */ diff --git a/src/main/java/io/nats/client/impl/OrderedMessageManager.java b/src/main/java/io/nats/client/impl/OrderedMessageManager.java index 85687bc99..55c81743f 100644 --- a/src/main/java/io/nats/client/impl/OrderedMessageManager.java +++ b/src/main/java/io/nats/client/impl/OrderedMessageManager.java @@ -97,7 +97,7 @@ private void handleErrorCondition() { // 3. make a new consumer using the same deliver subject but // with a new starting point - ConsumerConfiguration userCC = js.consumerConfigurationForOrdered(originalCc, lastStreamSeq, newDeliverSubject, actualConsumerName); + ConsumerConfiguration userCC = js.consumerConfigurationForOrdered(originalCc, lastStreamSeq, newDeliverSubject, actualConsumerName, null); ConsumerInfo ci = js._createConsumer(stream, userCC); // this can fail when a server is down. sub.setConsumerName(ci.getName()); @@ -107,7 +107,6 @@ private void handleErrorCondition() { catch (Exception e) { // don't want this doubly failing for any reason try { - e.printStackTrace(); js.conn.processException(e); } catch (Exception ignore) {} diff --git a/src/main/java/io/nats/client/impl/PullMessageManager.java b/src/main/java/io/nats/client/impl/PullMessageManager.java index 520869546..ea3a01184 100644 --- a/src/main/java/io/nats/client/impl/PullMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullMessageManager.java @@ -69,10 +69,9 @@ protected void handleHeartbeatError() { } } - private void trackIncoming(int m, long b, String pullsubject) { + private void trackIncoming(int m, long b) { synchronized (stateChangeLock) { // message time used for heartbeat tracking - // subjects used to detect multiple failed heartbeats updateLastMessageReceived(); if (m != Integer.MIN_VALUE) { @@ -105,32 +104,29 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { // normal js message if (status == null) { - trackIncoming(1, msg.consumeByteCount(), null); + trackIncoming(1, msg.consumeByteCount()); return true; } // heartbeat just needed to be recorded if (status.isHeartbeat()) { - trackIncoming(Integer.MIN_VALUE, -1, msg.subject); + trackIncoming(Integer.MIN_VALUE, Integer.MIN_VALUE); return false; } - Headers h = msg.getHeaders(); int m = Integer.MIN_VALUE; - long b = -1; + long b = Long.MIN_VALUE; + Headers h = msg.getHeaders(); if (h != null) { - String s = h.getFirst(NATS_PENDING_MESSAGES); - if (s != null) { - try { - m = Integer.parseInt(s); - b = Long.parseLong(h.getFirst(NATS_PENDING_BYTES)); - } - catch (NumberFormatException ignore) { - m = Integer.MIN_VALUE; // shouldn't happen but don't fail; make sure don't track m/b - } + try { + m = Integer.parseInt(h.getFirst(NATS_PENDING_MESSAGES)); + b = Long.parseLong(h.getFirst(NATS_PENDING_BYTES)); + } + catch (NumberFormatException ignore) { + m = Integer.MIN_VALUE; // shouldn't happen but don't fail; make sure don't track m/b } } - trackIncoming(m, b, msg.subject); + trackIncoming(m, b); return true; } diff --git a/src/main/java/io/nats/client/impl/OrderedPullMessageManager.java b/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java similarity index 95% rename from src/main/java/io/nats/client/impl/OrderedPullMessageManager.java rename to src/main/java/io/nats/client/impl/PullOrderedMessageManager.java index cf4beeb4d..d225f8557 100644 --- a/src/main/java/io/nats/client/impl/OrderedPullMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java @@ -22,7 +22,7 @@ import static io.nats.client.impl.MessageManager.ManageResult.MESSAGE; import static io.nats.client.impl.MessageManager.ManageResult.STATUS_HANDLED; -class OrderedPullMessageManager extends PullMessageManager { +class PullOrderedMessageManager extends PullMessageManager { protected final ConsumerConfiguration originalCc; protected final NatsJetStream js; @@ -30,7 +30,7 @@ class OrderedPullMessageManager extends PullMessageManager { protected long expectedExternalConsumerSeq; protected final AtomicReference targetSid; - protected OrderedPullMessageManager(NatsConnection conn, + protected PullOrderedMessageManager(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration originalCc, boolean syncMode) { diff --git a/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java b/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java index 43e13f358..6761bc74d 100644 --- a/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java +++ b/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java @@ -20,5 +20,8 @@ import java.io.IOException; interface SimplifiedSubscriptionMaker { - NatsJetStreamPullSubscription subscribe(MessageHandler optionalMessageHandler, Dispatcher optionalDispatcher, PullMessageManager optionalPmm) throws IOException, JetStreamApiException; + NatsJetStreamPullSubscription subscribe(MessageHandler optionalMessageHandler, + Dispatcher optionalDispatcher, + PullMessageManager optionalPmm, + Long optionalInactiveThreshold) throws IOException, JetStreamApiException; } diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index 1d4ca6d95..a8cb1fb99 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -545,9 +545,9 @@ public void testConsumeOptionsBuilder() { } // this sim is different from the other sim b/c next has a new sub every message - public static class OrderedPullNextTestDropSimulator extends OrderedPullMessageManager { + public static class PullOrderedNextTestDropSimulator extends PullOrderedMessageManager { @SuppressWarnings("ClassEscapesDefinedScope") - public OrderedPullNextTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { + public PullOrderedNextTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { super(conn, js, stream, so, serverCC, syncMode); } @@ -581,7 +581,7 @@ public void testOrderedBehaviorNext() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullNextTestDropSimulator::new; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); @@ -602,9 +602,9 @@ public void testOrderedBehaviorNext() throws Exception { }); } - public static class OrderedPullTestDropSimulator extends OrderedPullMessageManager { + public static class PullOrderedTestDropSimulator extends PullOrderedMessageManager { @SuppressWarnings("ClassEscapesDefinedScope") - public OrderedPullTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { + public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { super(conn, js, stream, so, serverCC, syncMode); } @@ -629,7 +629,7 @@ public void testOrderedBehaviorFetch() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); @@ -671,7 +671,7 @@ public void testOrderedBehaviorIterable() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); @@ -703,7 +703,7 @@ public void testOrderedConsume() throws Exception { StreamContext sctx = js.getStreamContext(tsc.stream); // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; CountDownLatch msgLatch = new CountDownLatch(6); AtomicInteger received = new AtomicInteger();