From ccec12d9c4d26895bbc5cd7d48727cf2d96d8bd8 Mon Sep 17 00:00:00 2001 From: Larry McQueary Date: Thu, 19 Nov 2015 01:06:58 -0700 Subject: [PATCH] Numerous fixes. --- README.md | 7 +- TODO.md | 7 +- examples/Publisher.java | 2 +- examples/Replier.java | 82 +++--- examples/Requestor.java | 23 +- examples/Subscriber.java | 72 +++-- pom.xml | 2 +- .../io/nats/client/AsyncSubscriptionImpl.java | 3 +- src/main/java/io/nats/client/Channel.java | 39 ++- .../io/nats/client/ConnExceptionArgs.java | 32 --- .../io/nats/client/ConnExceptionHandler.java | 8 - src/main/java/io/nats/client/Connection.java | 4 +- .../java/io/nats/client/ConnectionEvent.java | 4 +- .../nats/client/ConnectionEventHandler.java | 2 +- .../io/nats/client/ConnectionFactory.java | 44 +-- .../java/io/nats/client/ConnectionImpl.java | 264 +++++++++++------- .../nats/client/DefaultExceptionHandler.java | 19 +- .../io/nats/client/ErrorEventHandler.java | 5 - .../java/io/nats/client/ExceptionHandler.java | 12 + src/main/java/io/nats/client/MessageImpl.java | 4 +- src/main/java/io/nats/client/Options.java | 12 +- src/main/java/io/nats/client/Parser.java | 8 +- ...rseException.java => ParserException.java} | 10 +- .../java/io/nats/client/Subscription.java | 4 + ...riptionImpl.java => SubscriptionImpl.java} | 30 +- .../io/nats/client/SyncSubscriptionImpl.java | 5 +- src/main/resources/jnats.properties | 2 +- 27 files changed, 400 insertions(+), 306 deletions(-) delete mode 100644 src/main/java/io/nats/client/ConnExceptionArgs.java delete mode 100644 src/main/java/io/nats/client/ConnExceptionHandler.java delete mode 100644 src/main/java/io/nats/client/ErrorEventHandler.java create mode 100644 src/main/java/io/nats/client/ExceptionHandler.java rename src/main/java/io/nats/client/{ParseException.java => ParserException.java} (62%) rename src/main/java/io/nats/client/{AbstractSubscriptionImpl.java => SubscriptionImpl.java} (83%) diff --git a/README.md b/README.md index a3b7230ec..fb1b32408 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,10 @@ A [Java](http://www.java.com) client for the [NATS messaging system](https://nat [![Javadoc](http://javadoc-badge.appspot.com/com.github.nats-io/jnats.svg?label=javadoc)](http://javadoc-badge.appspot.com/com.github.nats-io/jnats) [![Coverage Status](https://coveralls.io/repos/nats-io/jnats/badge.svg?branch=master)](https://coveralls.io/r/nats-io/jnats?branch=master) -This is a WORK IN PROGRESS. It's not quite a stable release. +This is a WORK IN PROGRESS. It's not quite a stable release. +Test coverage is inadequate (currently around zero). +Documentation (javadoc) is in progress. + Please refer to the TODO.md for constantly updating information on what things are not complete or not working. Watch this space for more info as it becomes available. @@ -31,7 +34,7 @@ This compressed archive contains the jnats client jar, slf4j dependencies, and t ```java -ConnectionFactory cf = new ConnectionFactory(Cosntants.DEFAULT_URL) +ConnectionFactory cf = new ConnectionFactory(Constants.DEFAULT_URL) Connection nc = cf.createConnection(); // Simple Publisher diff --git a/TODO.md b/TODO.md index adfbefa4a..3c622bf17 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,6 @@ -### jnats TODO list +# General - [] JavaDoc -- [] Simplify Exceptions -- [] Protobuf Encoding \ No newline at end of file +- [] TLS support +- [] Simplify Exception handling +- [] EncodedConnection (e.g. protobuf) diff --git a/examples/Publisher.java b/examples/Publisher.java index 899b097d6..971727637 100644 --- a/examples/Publisher.java +++ b/examples/Publisher.java @@ -66,7 +66,7 @@ public void run(String[] args) System.out.printf("(%d msgs/second).\n", (int)(count / elapsed)); } else { - System.err.println("\nTest not long enough to produce meaningful stats. " + System.out.println("\nTest not long enough to produce meaningful stats. " + "Please increase the message count (-count n)"); } printStats(c); diff --git a/examples/Replier.java b/examples/Replier.java index 340c4d299..ac6031e24 100644 --- a/examples/Replier.java +++ b/examples/Replier.java @@ -17,17 +17,16 @@ public class Replier implements MessageHandler { boolean sync = false; int received = 0; boolean verbose = false; - + long start = 0L; long end = 0L; Lock testLock = new ReentrantLock(); Condition allDone = testLock.newCondition(); boolean done = false; - - String replyText = "reply"; - byte[] replyBytes = replyText.getBytes(Charset.forName("UTF-8")); - + + byte[] replyBytes = "reply".getBytes(Charset.forName("UTF-8")); + public void run(String[] args) { parseArgs(args); @@ -35,7 +34,7 @@ public void run(String[] args) ConnectionFactory cf = null; Connection c = null; - + try { cf = new ConnectionFactory(url); c = cf.createConnection(); @@ -43,7 +42,7 @@ public void run(String[] args) System.err.println("Couldn't connect: " + e.getCause()); System.exit(-1); } - + long elapsed, seconds; if (sync) @@ -55,11 +54,11 @@ public void run(String[] args) elapsed = receiveAsyncSubscriber(c); } seconds = TimeUnit.NANOSECONDS.toSeconds(elapsed); - System.out.printf("Replied to %d msgs in %d seconds ", count, seconds); - + System.out.printf("Replied to %d msgs in %d seconds ", received, seconds); + if (seconds > 0) { - System.out.printf("(%d msgs/second).\n", - (count / seconds)); + System.out.printf("(%d msgs/second).\n", + (received / seconds)); } else { System.out.println(); System.out.println("Test not long enough to produce meaningful stats. " @@ -73,20 +72,18 @@ private void printStats(Connection c) Statistics s = c.getStats(); System.out.printf("Statistics: \n"); System.out.printf(" Incoming Payload Bytes: %d\n", s.getInBytes()); - System.out.printf(" Incoming Messages: %d", s.getInMsgs()); - System.out.printf(" Outgoing Payload Bytes: %d", s.getOutBytes()); - System.out.printf(" Outgoing Messages: %d", s.getOutMsgs()); + System.out.printf(" Incoming Messages: %d\n", s.getInMsgs()); + System.out.printf(" Outgoing Payload Bytes: %d\n", s.getOutBytes()); + System.out.printf(" Outgoing Messages: %d\n", s.getOutMsgs()); } @Override public void onMessage(Message msg) { - if (received == 0) + if (received++ == 0) start = System.nanoTime(); - received++; - if (verbose) - System.err.println("Received: " + msg); + System.out.println("Received: " + msg); Connection c = msg.getSubscription().getConnection(); try { @@ -102,6 +99,7 @@ public void onMessage(Message msg) { testLock.lock(); try { + System.out.println("I'M DONE"); done=true; allDone.signal(); } @@ -112,57 +110,47 @@ public void onMessage(Message msg) { } private long receiveAsyncSubscriber(Connection c) { - long t0 = System.nanoTime(); - - c.subscribeAsync(subject, this); + AsyncSubscription s = c.subscribeAsync(subject, this); // just wait to complete testLock.lock(); try { + s.start(); while (!done) allDone.await(); } catch (InterruptedException e) { + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); } finally { testLock.unlock(); } - return System.nanoTime() - t0; + return end - start; } private long receiveSyncSubscriber(Connection c) { - System.out.println("In receiveSyncSubscriber"); SyncSubscription s = c.subscribeSync(subject); - try { - s.nextMessage(); - } catch (Exception e) { - e.printStackTrace(); - } - received++; - start = System.nanoTime(); - - while (received < count) - { - received++; - Message m = null; - try { - System.out.println("nextMessage()"); + Message m = null; + try { + while (received < count) + { m = s.nextMessage(); - System.out.println("Got message " + m); - } catch (Exception e) { - e.printStackTrace(); - } - if (verbose) - System.out.println("Received: " + m); + if (received++ == 0) + start = System.nanoTime(); + + if (verbose) + System.out.println("Received: " + m); - try { c.publish(m.getReplyTo(),replyBytes); - } catch (ConnectionClosedException e) { - e.printStackTrace(); } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); } end = System.nanoTime(); return end-start; @@ -172,7 +160,7 @@ private void usage() { System.err.println( "Usage: Publish [-url url] [-subject subject] " + - "-count [count] [-sync] [-verbose]"); + "[-count count] [-sync] [-verbose]"); System.exit(-1); } diff --git a/examples/Requestor.java b/examples/Requestor.java index 6c3403dbd..dddc8e995 100644 --- a/examples/Requestor.java +++ b/examples/Requestor.java @@ -37,12 +37,23 @@ public void run(String[] args) start = System.nanoTime(); - Message replyMsg = null; + int received = 0; + + Message m = null; + byte[] reply = null; try { for (int i = 0; i < count; i++) { - replyMsg = c.request(subject, payload); - System.out.print("Got reply: " + new String(replyMsg.getData())); + m = c.request(subject, payload, 10000); + if (m == null) + break; + + received++; + reply = m.getData(); + if (reply != null) + System.out.println("Got reply: " + new String(reply)); + else + System.out.println("Got reply with null payload"); } } catch (Exception e) { e.printStackTrace(); @@ -51,10 +62,10 @@ public void run(String[] args) end = System.nanoTime(); elapsed = TimeUnit.NANOSECONDS.toSeconds(end-start); - System.out.printf("Completed %d requests in %d seconds ", count, elapsed); + System.out.printf("Completed %d requests in %d seconds ", received, elapsed); if (elapsed > 0) { System.out.printf("(%d msgs/second).\n", - (count / elapsed)); + (received / elapsed)); } else { System.out.println(); System.out.println("Test not long enough to produce meaningful stats. " @@ -78,7 +89,7 @@ private void usage() { System.err.println( "Usage: Requestor [-url url] [-subject subject] " + - "-count [count] [-payload payload]"); + "[-count count] [-payload payload]"); System.exit(-1); } diff --git a/examples/Subscriber.java b/examples/Subscriber.java index 96d262882..376cdc904 100644 --- a/examples/Subscriber.java +++ b/examples/Subscriber.java @@ -2,32 +2,30 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.nats.client.AsyncSubscription; import io.nats.client.Connection; import io.nats.client.ConnectionFactory; import io.nats.client.Constants; -import io.nats.client.ConnExceptionArgs; -import io.nats.client.ErrorEventHandler; +import io.nats.client.ExceptionHandler; import io.nats.client.Message; import io.nats.client.MessageHandler; import io.nats.client.NATSException; +import io.nats.client.SlowConsumerException; import io.nats.client.Statistics; +import io.nats.client.Subscription; import io.nats.client.SyncSubscription; -public class Subscriber implements MessageHandler, ErrorEventHandler { +public class Subscriber implements MessageHandler, ExceptionHandler { Map parsedArgs = new HashMap(); int count = 1000000; String url = Constants.DEFAULT_URL; String subject = "foo"; boolean sync = false; - int received = 0; + long received = 0L; boolean verbose = false; - long startTime = 0L; - long elapsedTime = 0L; + long start = 0L; + long elapsed = 0L; final Object testLock = new Object(); @@ -45,31 +43,27 @@ public void run(String[] args) return; } - long elapsed=0L; - if (sync) { elapsed = receiveSyncSubscriber(c); } else { + c.setExceptionHandler(this); + elapsed = receiveAsyncSubscriber(c); } long elapsedSeconds = TimeUnit.SECONDS.convert(elapsed, TimeUnit.NANOSECONDS); - System.out.printf("Received %d msgs in %d seconds ", count, + System.out.printf("Received %d msgs in %d seconds ", received, elapsedSeconds); if (elapsedSeconds > 0) { System.out.printf("(%d msgs/second).\n", - (count / elapsedSeconds)); + (received / elapsedSeconds)); } else { System.out.println(); System.out.println("Test not long enough to produce meaningful stats. " + "Please increase the message count (-count n)"); } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } printStats(c); } @@ -84,8 +78,10 @@ private void printStats(Connection c) @Override public void onMessage(Message msg) { if (received == 0) - startTime = System.nanoTime(); + start = System.nanoTime(); + if (msg == null) + System.out.println("Message==null"); received++; if (verbose) @@ -93,7 +89,7 @@ public void onMessage(Message msg) { if (received >= count) { - elapsedTime = System.nanoTime() - startTime; + elapsed = System.nanoTime() - start; synchronized(testLock) { testLock.notify(); @@ -101,11 +97,6 @@ public void onMessage(Message msg) { } } - @Override - public void onError(ConnExceptionArgs error) { - System.err.println("Connection error encountered: " + error.getError()); - } - private long receiveAsyncSubscriber(Connection c) { AsyncSubscription s = c.subscribeAsync(subject, this); @@ -124,25 +115,21 @@ private long receiveAsyncSubscriber(Connection c) } } - return elapsedTime; + return elapsed; } private long receiveSyncSubscriber(Connection c) { - long t0 = 0L; - SyncSubscription s = (SyncSubscription) c.subscribeSync(subject); + try { - s.nextMessage(); - received++; - - t0 = System.nanoTime(); - while (received < count) { - received++; Message m = s.nextMessage(); + if (received++ == 0) { + start = System.nanoTime(); + } if (verbose) System.out.println("Received: " + m); } @@ -150,10 +137,9 @@ private long receiveSyncSubscriber(Connection c) } catch (Exception ne) { System.err.println("Error receiving synchronously:"); ne.printStackTrace(); - } finally { - elapsedTime = System.nanoTime()-t0; - } - return elapsedTime; + } + elapsed = System.nanoTime()-start; + return elapsed; } private void usage() @@ -232,5 +218,17 @@ public static void main(String[] args) System.exit(0); } } + + @Override + public void handleException(Connection conn, Subscription subscription, Throwable e) { + System.err.println("Exception encountered: " + e.getMessage()); + e.printStackTrace(); + } + + @Override + public void handleSlowConsumerException(Connection conn, Subscription sub, + SlowConsumerException e) { + System.err.println("Warning: SLOW CONSUMER"); + } } diff --git a/pom.xml b/pom.xml index 2e1df5daf..09f50f2d8 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 io.nats jnats - 1.0-alpha-SNAPSHOT + 0.2.0-alpha jnats Java client for NATS Messaging System diff --git a/src/main/java/io/nats/client/AsyncSubscriptionImpl.java b/src/main/java/io/nats/client/AsyncSubscriptionImpl.java index f1e204613..cf3e8b666 100644 --- a/src/main/java/io/nats/client/AsyncSubscriptionImpl.java +++ b/src/main/java/io/nats/client/AsyncSubscriptionImpl.java @@ -9,7 +9,7 @@ * This is the implementation of the AsyncSubscription interface. * */ -class AsyncSubscriptionImpl extends AbstractSubscriptionImpl implements AsyncSubscription { +class AsyncSubscriptionImpl extends SubscriptionImpl implements AsyncSubscription { private MessageHandler msgHandler; // private MsgHandlerEventArgs msgHandlerArgs = new MsgHandlerEventArgs(); @@ -76,7 +76,6 @@ protected boolean processMsg(Message m) { } } } - msgHandler.onMessage(m); return true; } diff --git a/src/main/java/io/nats/client/Channel.java b/src/main/java/io/nats/client/Channel.java index 3258d43a7..82c01ae20 100644 --- a/src/main/java/io/nats/client/Channel.java +++ b/src/main/java/io/nats/client/Channel.java @@ -11,9 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -//class Channel extends LinkedBlockingQueue { class Channel { - + final static Logger logger = LoggerFactory.getLogger(Channel.class); /** * This channel class really a blocking queue, is named the way it is so the @@ -41,29 +40,49 @@ public Channel(Collection c) { T get(long timeout) throws TimeoutException { qLock.lock(); try { - if (finished) + if (logger.isDebugEnabled()) { + logger.debug("Channel.get(" + timeout +"): q.size()=" + q.size()); + } + if (finished) { + if (logger.isDebugEnabled()) { + logger.debug("Channel.get(" + timeout +"): returning defaultVal"); + } return this.defaultVal; - + } if (q.size() > 0) { return q.poll(); } else { if (timeout < 0) { + if (logger.isDebugEnabled()) { + logger.debug("Channel.get(" + timeout +"): waiting for >0 items"); + } while (q.size() == 0) { hasItems.await(); } - logger.debug("Done waiting in Channel.get(-1)"); + if (logger.isDebugEnabled()) { + logger.debug("Channel.get(" + timeout +"): queue now has " + + q.size() + " items"); + } + } else { if(hasItems.await(timeout, TimeUnit.MILLISECONDS)==false) { - throw new TimeoutException(); + if (logger.isDebugEnabled()) { + logger.debug("Channel.get(" + timeout +"): timed out waiting for >0 items"); + } + + throw new TimeoutException("Channel timed out waiting for items"); } } - if (finished) + if (finished) { + if (logger.isDebugEnabled()) { + logger.debug("Channel.get(" + timeout +"): returning defaultVal"); + } return this.defaultVal; - + } T item = q.poll(); - + return item; } @@ -78,6 +97,8 @@ T get(long timeout) throws TimeoutException { void add(T item) { + if (logger.isDebugEnabled()) + logger.debug("In Channel.add for {}", item); qLock.lock(); try { diff --git a/src/main/java/io/nats/client/ConnExceptionArgs.java b/src/main/java/io/nats/client/ConnExceptionArgs.java deleted file mode 100644 index 5a5d8feff..000000000 --- a/src/main/java/io/nats/client/ConnExceptionArgs.java +++ /dev/null @@ -1,32 +0,0 @@ -package io.nats.client; - -public class ConnExceptionArgs { - private ConnectionImpl c; - private Subscription s; - private String err; - - public ConnExceptionArgs(ConnectionImpl c, Subscription subscription, String err) - { - this.c = c; - this.s = subscription; - this.err = err; - } - - // Gets the connection associated with the event. - public ConnectionImpl getConnection() - { - return c; - } - - // Gets the Subscription associated with the event. - public Subscription getSubscription() - { - return s; - } - - // Gets the error associated with the event. - public String getError() - { - return err; - } -} diff --git a/src/main/java/io/nats/client/ConnExceptionHandler.java b/src/main/java/io/nats/client/ConnExceptionHandler.java deleted file mode 100644 index 76e4965bd..000000000 --- a/src/main/java/io/nats/client/ConnExceptionHandler.java +++ /dev/null @@ -1,8 +0,0 @@ -package io.nats.client; - -public interface ConnExceptionHandler { - public void handleException(ConnectionImpl conn, java.lang.Throwable e); - - public void onError(ConnExceptionArgs eventArgs); - -} diff --git a/src/main/java/io/nats/client/Connection.java b/src/main/java/io/nats/client/Connection.java index 935ad3b03..58373dcaf 100644 --- a/src/main/java/io/nats/client/Connection.java +++ b/src/main/java/io/nats/client/Connection.java @@ -38,8 +38,8 @@ public Message request(String subject, byte[] data, long timeout) void setDisconnectedEventHandler(ConnectionEventHandler disconnectedEventHandler); ConnectionEventHandler getReconnectedEventHandler(); void setReconnectedEventHandler(ConnectionEventHandler reconnectedEventHandler); - ConnExceptionHandler getExceptionHandler(); - void setExceptionHandler(ConnExceptionHandler exceptionHandler); + ExceptionHandler getExceptionHandler(); + void setExceptionHandler(ExceptionHandler exceptionHandler); ConnectionEventHandler getClosedEventHandler(); void setClosedEventHandler(ConnectionEventHandler closedEventHandler); ConnectionEventHandler getDisconnectedEventHandler(); diff --git a/src/main/java/io/nats/client/ConnectionEvent.java b/src/main/java/io/nats/client/ConnectionEvent.java index 1badddeec..259c788f8 100644 --- a/src/main/java/io/nats/client/ConnectionEvent.java +++ b/src/main/java/io/nats/client/ConnectionEvent.java @@ -1,13 +1,13 @@ package io.nats.client; public class ConnectionEvent { - ConnectionImpl nc; + Connection nc; public ConnectionEvent(ConnectionImpl c) { this.nc = c; } - public ConnectionImpl getConn() { + public Connection getConnection() { return nc; } diff --git a/src/main/java/io/nats/client/ConnectionEventHandler.java b/src/main/java/io/nats/client/ConnectionEventHandler.java index 32b0532e5..d720da681 100644 --- a/src/main/java/io/nats/client/ConnectionEventHandler.java +++ b/src/main/java/io/nats/client/ConnectionEventHandler.java @@ -1,5 +1,5 @@ package io.nats.client; public interface ConnectionEventHandler { - public void onEvent(ConnectionEvent eventInfo); + public void onEvent(ConnectionEvent event); } diff --git a/src/main/java/io/nats/client/ConnectionFactory.java b/src/main/java/io/nats/client/ConnectionFactory.java index c0f5bdb44..721f3db20 100644 --- a/src/main/java/io/nats/client/ConnectionFactory.java +++ b/src/main/java/io/nats/client/ConnectionFactory.java @@ -36,7 +36,7 @@ public class ConnectionFactory implements Cloneable { private int connectionTimeout = Constants.DEFAULT_TIMEOUT; private long pingInterval = Constants.DEFAULT_PING_INTERVAL; private int maxPingsOut = Constants.DEFAULT_MAX_PINGS_OUT; - private ConnExceptionHandler connExceptionHandler = new DefaultExceptionHandler(); + private ExceptionHandler exceptionHandler = new DefaultExceptionHandler(); private ConnectionEventHandler connectionEventHandler = null; // The size of the buffered channel used between the socket @@ -85,6 +85,11 @@ public ConnectionFactory(String url, String[] servers) } } + /** + * + * @return the Connection. + * @throws NATSException if a Connection cannot be established for some reason. + */ public ConnectionImpl createConnection() throws NATSException { ConnectionImpl conn = null; Options options = options(); @@ -122,7 +127,7 @@ private Options options() { result.setConnectionTimeout(connectionTimeout); result.setPingInterval(pingInterval); result.setMaxPingsOut(maxPingsOut); - result.setExceptionHandler(connExceptionHandler); + result.setExceptionHandler(exceptionHandler); result.setSubChanLen(subChanLen); // private ConnectionEventHandler connectionEventHandler = null; return null; return result; @@ -292,42 +297,45 @@ public void setNoRandomize(boolean noRandomize) { } /** - * @return the connectionName + * @return the connection name associated with this Connection. */ public String getConnectionName() { return this.connectionName; } /** - * @param connectionName the connectionName to set + * @param connectionName the name to set for this Connection. */ public void setConnectionName(String connectionName) { this.connectionName=connectionName; } /** - * @return the verbose + * @return whether or not the connection will require +OK/+ERR */ public boolean isVerbose() { return this.verbose; } /** - * @param verbose the verbose to set + * @param verbose whether or not this Connection should + * require protocol acks from the server (+OK/-ERR) */ public void setVerbose(boolean verbose) { this.verbose = verbose; } /** - * @return the pedantic + * @return whether strict server-side protocol checking + * is enabled */ public boolean isPedantic() { return this.pedantic; } /** - * @param pedantic the pedantic to set + * @param pedantic when true, strict + * server-side protocol checking occurs. */ public void setPedantic(boolean pedantic) { this.pedantic = pedantic; @@ -421,7 +429,8 @@ public void setPingInterval(long pingInterval) { } /** - * @return the maxPingsOut + * @return the maximum number of oustanding outbound pings before + * marking the Connection stale and triggering reconnection. */ public int getMaxPingsOut() { return this.maxPingsOut; @@ -435,19 +444,20 @@ public void setMaxPingsOut(int maxPingsOut) { } /** - * @return the connExceptionHandler + * @return the exceptionHandler */ - public ConnExceptionHandler getExceptionHandler() { - return connExceptionHandler; + public ExceptionHandler getExceptionHandler() { + return exceptionHandler; } /** - * @param connExceptionHandler the connExceptionHandler to set + * @param exceptionHandler the {@link ExceptionHandler} to set for + * connections. */ - public void setExceptionHandler(ConnExceptionHandler connExceptionHandler) { - if (connExceptionHandler == null) { - throw new IllegalArgumentException("ConnExceptionHandler cannot be null!"); + public void setExceptionHandler(ExceptionHandler exceptionHandler) { + if (exceptionHandler == null) { + throw new IllegalArgumentException("ExceptionHandler cannot be null!"); } - this.connExceptionHandler = connExceptionHandler; + this.exceptionHandler = exceptionHandler; } } diff --git a/src/main/java/io/nats/client/ConnectionImpl.java b/src/main/java/io/nats/client/ConnectionImpl.java index d0103cf68..90a3e0bea 100644 --- a/src/main/java/io/nats/client/ConnectionImpl.java +++ b/src/main/java/io/nats/client/ConnectionImpl.java @@ -5,8 +5,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; -import java.net.SocketException; import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -15,6 +15,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Queue; import java.util.Random; import java.util.Vector; @@ -24,7 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -36,6 +37,8 @@ final class ConnectionImpl implements Connection { final Logger logger = LoggerFactory.getLogger(ConnectionImpl.class); + String version = null; + final static int DEFAULT_SCRATCH_SIZE = 512; @@ -85,7 +88,7 @@ public static enum ConnState { public static final String PONG_PROTO = "PONG" + _CRLF_; public static final String PUB_PROTO = "PUB %s %s %d" + _CRLF_; public static final String SUB_PROTO = "SUB %s %s %d" + _CRLF_; - public static final String UNSUB_PROTO = "UNSUB %d %s" + _CRLF_; + public static final String UNSUB_PROTO = "UNSUB %d %d" + _CRLF_; private ConnectionImpl nc = this; @@ -94,10 +97,10 @@ public static enum ConnState { protected ConnectionEventHandler closedEventHandler; protected ConnectionEventHandler disconnectedEventHandler; protected ConnectionEventHandler reconnectedEventHandler; - protected ConnExceptionHandler exceptionHandler = new DefaultExceptionHandler(); + protected ExceptionHandler exceptionHandler = new DefaultExceptionHandler(); - private AtomicInteger sidCounter = new AtomicInteger(); + private AtomicLong sidCounter = new AtomicLong(); private URI url = null; private Options opts = null; @@ -121,7 +124,7 @@ public static enum ConnState { private final Condition flusherKickedCondition = flusherLock.newCondition(); private boolean flusherDone = false; - private Map subs = null; + private Map subs = null; private List srvPool = null; private Exception lastEx = null; private ServerInfo info = null; @@ -136,8 +139,6 @@ public static enum ConnState { protected int pingProtoBytesLen = 0; protected byte[] pongProtoBytes = null; protected int pongProtoBytesLen = 0; - protected byte[] pubProtoBytes = null; - protected int pubProtoBytesLen = 0; protected byte[] pubPrimBytes = null; protected int pubPrimBytesLen = 0; @@ -149,13 +150,23 @@ public static enum ConnState { private static final int NUM_TASK_THREADS = 5; private final ExecutorService taskExec = Executors.newFixedThreadPool(NUM_TASK_THREADS, new NATSThreadFactory("natsthreadfactory")); - private Vector futures = new Vector(); private Random r = null; public ConnectionImpl(Options o) throws NoServersException { + Properties props = new Properties(); + InputStream inputStream = + getClass().getClassLoader().getResourceAsStream("jnats.properties"); + + try { + props.load(inputStream); + version = props.getProperty("client.version"); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } this.opts = o; this.pongs = createPongs(); - this.subs = new ConcurrentHashMap(); + this.subs = new ConcurrentHashMap(); this.stats = new Statistics(); this.ps = new Parser(this); this.msgArgs = new MsgArg(); @@ -166,8 +177,6 @@ public ConnectionImpl(Options o) throws NoServersException { pingProtoBytesLen = pingProtoBytes.length; pongProtoBytes = PONG_PROTO.getBytes(); pongProtoBytesLen = pongProtoBytes.length; - pubProtoBytes = PUB_PROTO.getBytes(Charset.forName("UTF-8")); - pubProtoBytesLen = pubProtoBytes.length; pubPrimBytes = _PUB_P_.getBytes(Charset.forName("UTF-8")); pubPrimBytesLen = pubPrimBytes.length; @@ -429,12 +438,10 @@ private void close(ConnState closeState, boolean invokeDelegates) // { // s.closeChannel(); // } - Iterator> it = subs.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry pair = (Map.Entry)it.next(); - AbstractSubscriptionImpl s = (AbstractSubscriptionImpl)pair.getValue(); + for (Long key : subs.keySet()) + { + SubscriptionImpl s = subs.get(key); s.closeChannel(); - it.remove(); // avoids a ConcurrentModificationException } subs.clear(); sidCounter.set(0); @@ -957,11 +964,16 @@ else if (c.op.startsWith("tls:")) } } + /* + * This method is only used by sendPong. It is also used in the Go + * client's tests. + */ private void sendProto(byte[] value, int length) throws IOException { mu.lock(); try { bw.write(value, 0, length); + kickFlusher(); } finally { mu.unlock(); } @@ -1064,7 +1076,7 @@ protected void spinUpSocketWatchers() { t.start(); socketWatchers.add(t); - t = new NATSThread(new Flusher(this), "flusher"); + t = new NATSThread(new Flusher(), "flusher"); t.start(); socketWatchers.add(t); @@ -1118,7 +1130,7 @@ class ConnectInfo { private Boolean ssl; private String name; private String lang = ConnectionImpl.LANG_STRING; - private String version = ConnectionImpl.VERSION; + private String version = ConnectionImpl.this.version; public ConnectInfo(boolean verbose, boolean pedantic, String username, String password, boolean secure, String connectionName) { @@ -1406,8 +1418,9 @@ public void run() { // It is used to deliver messages to asynchronous subscribers. protected void deliverMsgs(Channel ch) { - Message m = null; - + logger.debug("In deliverMsgs"); + MessageImpl m = null; + while (true) { mu.lock(); @@ -1421,12 +1434,13 @@ protected void deliverMsgs(Channel ch) try { logger.debug("Calling ch.get(-1)..."); - m = ch.get(-1); + m = (MessageImpl)ch.get(-1); logger.debug("ch.get(-1) returned " + m); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); - } + } + if (m == null) { // the channel has been closed, exit silently. @@ -1435,13 +1449,13 @@ protected void deliverMsgs(Channel ch) // Note, this seems odd message having the sub process itself, // but this is good for performance. - AbstractSubscriptionImpl s = (AbstractSubscriptionImpl) m.getSubscription(); - if (!s.processMsg(m)) +// SubscriptionImpl s = (SubscriptionImpl) m.getSubscription(); + if (!m.sub.processMsg(m)) { mu.lock(); try { - removeSub(s); + removeSub(m.sub); } finally { mu.unlock(); } @@ -1452,10 +1466,10 @@ protected void deliverMsgs(Channel ch) protected class Flusher implements Runnable { - - public Flusher(ConnectionImpl connectionImpl) { - // TODO Auto-generated constructor stub - } +// ConnectionImpl +// public Flusher(ConnectionImpl connectionImpl) { +// // TODO Auto-generated constructor stub +// } @Override public void run() { @@ -1464,7 +1478,7 @@ public void run() { } - protected void processMsgArgs(byte[] buffer, long length) throws ParseException + protected void processMsgArgs(byte[] buffer, long length) throws ParserException { String s = new String(buffer, 0, (int)length); String[] args = s.split(" "); @@ -1484,12 +1498,12 @@ protected void processMsgArgs(byte[] buffer, long length) throws ParseException msgArgs.size = Integer.parseInt(args[3]); break; default: - throw new ParseException("Unable to parse message arguments: " + s); + throw new ParserException("Unable to parse message arguments: " + s); } if (msgArgs.size < 0) { - throw new ParseException("Invalid MessageImpl - Bad or Missing Size: " + s); + throw new ParserException("Invalid MessageImpl - Bad or Missing Size: " + s); } } @@ -1504,8 +1518,7 @@ protected void processMsg(byte[] msg, long length) boolean maxReached = false; - Subscription sub; - AbstractSubscriptionImpl s; + SubscriptionImpl s; mu.lock(); @@ -1519,10 +1532,10 @@ protected void processMsg(byte[] msg, long length) // (as opposed to checking with Contains or TryGetValue) try { - sub = subs.get(msgArgs.sid); - s = (AbstractSubscriptionImpl)sub; + s = subs.get(msgArgs.sid); if (logger.isDebugEnabled()) - logger.debug("\tfound subscription sid:" + s.getSid()); + logger.debug("\tfound subscription sid:" + s.getSid() + + " subj: " + s.getSubject() ); } catch (Exception e) @@ -1540,7 +1553,7 @@ protected void processMsg(byte[] msg, long length) { if (logger.isDebugEnabled()) logger.debug("\tcreating message"); - Message m = new MessageImpl(msgArgs, sub, msg, length); + Message m = new MessageImpl(msgArgs, s, msg, length); if (logger.isDebugEnabled()) logger.debug("\tcreated message: " + m); if (!s.addMessage(m, opts.getSubChanLen())) @@ -1568,26 +1581,42 @@ protected void processMsg(byte[] msg, long length) removeSub(s); } void removeSub(Subscription s) { - // TODO Auto-generated method stub - + long key = s.getSid(); + if (subs.containsKey(key)) + { + subs.remove(key); + if (logger.isDebugEnabled()) + { + logger.debug("Removed sid=" + s.getSid() + " subj=" + + s.getSubject()); + } + } + SubscriptionImpl sub = (SubscriptionImpl) s; + + if (sub.mch != null) + { + if (logger.isDebugEnabled()) + { + logger.debug("Closed sid=" + s.getSid() + " subj=" + + s.getSubject()); + } + sub.mch.close(); + sub.mch = null; + } + + sub.conn = null; } // processSlowConsumer will set SlowConsumer state and fire the // async error handler if registered. - void processSlowConsumer(AbstractSubscriptionImpl s) + void processSlowConsumer(SubscriptionImpl s) { if (this.exceptionHandler != null && !s.sc) { - ConnExceptionArgs args = new ConnExceptionArgs(this, s, "Slow Consumer"); - //TODO fix this; reconcile opts with Connection params - taskExec.execute(new ExceptionHandlerTask(this.exceptionHandler, args)); - // new Task(() => - // { - // opts.AsyncErrorEventHandler(this, - // new ConnExceptionArgs(this, s, "Slow Consumer")); - // }).Start(); + taskExec.execute(new ExceptionHandlerTask(this.exceptionHandler, this, + s, new SlowConsumerException())); } s.sc = true; } @@ -1619,7 +1648,9 @@ public void run() { TimeUnit.MILLISECONDS.toSeconds(opts.getPingInterval()) + " seconds.");} sendPing(null); } catch (IOException e) { - // TODO Auto-generated catch block + if (opts.getExceptionHandler() != null) + getExceptionHandler().handleException( + ConnectionImpl.this, null, e); e.printStackTrace(); } } finally { @@ -1681,6 +1712,7 @@ private void sendPing(Channel ch) throws IOException protected void unsubscribe(Subscription sub, int max) throws ConnectionClosedException, IOException { mu.lock(); + try { if (isClosed()) throw new ConnectionClosedException(); @@ -1704,10 +1736,12 @@ protected void unsubscribe(Subscription sub, int max) throws ConnectionClosedExc // We will send all subscriptions when reconnecting // so that we can supress here. if (!isReconnecting()) { - String unsubMsg = String.format(UNSUB_PROTO, s.getSid(), max); - bw.writeBytes(unsubMsg); + byte[] unsub = String.format(UNSUB_PROTO, s.getSid(), max).getBytes(Charset.forName("UTF-8")); + bw.write(unsub); } + } finally { + mu.unlock(); } kickFlusher(); @@ -1715,7 +1749,9 @@ protected void unsubscribe(Subscription sub, int max) throws ConnectionClosedExc private void kickFlusher() { + logger.debug("kickFlusher(): acquiring flusherLock"); flusherLock.lock(); + logger.debug("kickFlusher(): acquired flusherLock"); try { if (!flusherKicked) { @@ -1724,6 +1760,7 @@ private void kickFlusher() flusherKicked = true; } finally { flusherLock.unlock(); + logger.debug("kickFlusher(): released flusherLock"); } } @@ -1738,13 +1775,10 @@ private boolean waitForFlusherKick() // if kicked before we get here meantime, skip // waiting. if (!flusherKicked) - { flusherKickedCondition.await(); - } flusherKicked = false; } catch (InterruptedException e) { - e.printStackTrace(); } finally { flusherLock.unlock(); } @@ -1791,25 +1825,33 @@ private void flusher() while (!isFlusherDone()) { + logger.debug("flusher(): waiting for kick..."); boolean val = waitForFlusherKick(); + if (val == false) return; + logger.debug("flusher(): I've been kicked!"); - mu.lock(); - try - { - if (!isConnected()) - return; + logger.debug("flusher(): acquiring conn mutex"); + if (mu.tryLock()) { + logger.debug("flusher(): acquired conn mutex"); + try + { + if (!isConnected()) + return; + logger.debug("flusher(): flushing buffer"); - // TODO: Check writability of underlying stream? - bw.flush(); - logger.trace("buffer flushed"); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - mu.unlock(); + // TODO: Check writability of underlying stream? + bw.flush(); + logger.trace("flusher(): buffer flushed"); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + mu.unlock(); + logger.debug("flusher(): released conn mutex"); + } } } } @@ -1868,17 +1910,13 @@ public void flush() throws Exception // server. Used in reconnects private void resendSubscriptions() throws IOException { - Iterator> it = subs.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry pair = (Map.Entry)it.next(); - Subscription s = (Subscription)pair.getValue(); - + for (Long key : subs.keySet()) + { + SubscriptionImpl s = subs.get(key); if (s instanceof AsyncSubscription) ((AsyncSubscriptionImpl)s).enable(); //enableAsyncProcessing() logger.debug("Resending subscriptions:"); sendSubscriptionMessage(s); - - it.remove(); // avoids a ConcurrentModificationException } bw.flush(); @@ -1887,36 +1925,40 @@ private void resendSubscriptions() throws IOException class ConnEventHandlerTask implements Runnable { private final ConnectionEventHandler cb; - private final ConnectionEvent eventArgs; + private final ConnectionEvent connEvent; ConnEventHandlerTask(ConnectionEventHandler cb, ConnectionEvent eventArgs) { this.cb = cb; - this.eventArgs = eventArgs; + this.connEvent = eventArgs; } public void run() { - cb.onEvent(eventArgs); + cb.onEvent(connEvent); } } class ExceptionHandlerTask implements Runnable { - private final ConnExceptionHandler cb; - private final ConnExceptionArgs eventArgs; + private final ExceptionHandler cb; + private final Connection conn; + private final Subscription sub; + private final Throwable ex; - ExceptionHandlerTask(ConnExceptionHandler exceptionHandler, ConnExceptionArgs eventArgs) { + ExceptionHandlerTask(ExceptionHandler exceptionHandler, ConnectionImpl conn, Subscription sub, + Throwable ex) { this.cb = exceptionHandler; - this.eventArgs = eventArgs; + this.conn = conn; + this.sub = sub; + this.ex = ex; } public void run() { - cb.onError(eventArgs); + cb.handleException(this.conn, this.sub, this.ex); } - } private Subscription subscribe(String subj, String queue, MessageHandler cb, int chanLen) { - Subscription sub = null; + SubscriptionImpl sub = null; boolean async = (cb != null); mu.lock(); try { @@ -1944,8 +1986,11 @@ private Subscription subscribe(String subj, String queue, MessageHandler cb, int @Override public Subscription subscribe(String subj, String queue, MessageHandler cb) { - Subscription s = subscribe(subj, queue, cb, opts.getSubChanLen()); + SubscriptionImpl s = + (SubscriptionImpl) subscribe(subj, queue, cb, opts.getSubChanLen()); addSubscription(s); + if (logger.isDebugEnabled()) + printSubs(this); return s; } @@ -1954,11 +1999,14 @@ public AsyncSubscription subscribeAsync(String subj, MessageHandler cb) { return (AsyncSubscription) subscribe(subj, null, cb, opts.getSubChanLen()); } - private void addSubscription(Subscription s) { - subs.put(new Long(sidCounter.getAndIncrement()), s); + private void addSubscription(SubscriptionImpl s) { + s.setSid(sidCounter.incrementAndGet()); + subs.put(s.getSid(), s); if (logger.isDebugEnabled()) logger.debug("Successfully added subscription to " + s.getSubject() + "[" + s.getSid() + "]"); + if (logger.isDebugEnabled()) + printSubs(this); } @Override @@ -2057,8 +2105,9 @@ public void publish(String subject, String reply, byte[] data) throws Connection } int msgSize = data != null ? data.length : 0; - + logger.debug("publish(): acquiring mutex"); mu.lock(); + logger.debug("publish(): acquired mutex"); try { // Proactively reject payloads over the threshold set by server. @@ -2096,7 +2145,7 @@ public void publish(String subject, String reply, byte[] data) throws Connection try { if (logger.isDebugEnabled()) - logger.debug("writing: " + new String(pubProtoBuf)); + logger.debug("publish(): writing " + new String(pubProtoBuf)); bw.write(pubProtoBuf, 0, pubProtoLen); if (msgSize > 0) @@ -2106,16 +2155,18 @@ public void publish(String subject, String reply, byte[] data) throws Connection bw.write(crlfProtoBytes, 0, crlfProtoBytesLen); } catch (IOException e) { - + //TODO remove this + e.printStackTrace(); } stats.incrementOutMsgs(); stats.incrementOutBytes(msgSize); - kickFlusher(); } finally { mu.unlock(); + logger.debug("publish(): released mutex"); } + kickFlusher(); } // publish @@ -2134,6 +2185,18 @@ public Message request(String subject, byte[] data, long timeout) throws ConnectionClosedException, BadSubscriptionException, SlowConsumerException, MaxMessagesException, IOException { Message m = null; + if (logger.isDebugEnabled()) { + String ds = null; + if (data != null) + ds = new String(data); + logger.debug("#########In request(" +subject+ "," + ds + "," + timeout + ")") ; + } + if (timeout <= 0) + { + throw new IllegalArgumentException( + "Timeout must be greater that 0."); + } + String inbox = newInbox(); SyncSubscription s = subscribeSync(inbox, null); @@ -2197,14 +2260,15 @@ protected void sendSubscriptionMessage(Subscription sub) { sub.getQueue(), sub.getSid()); try { bw.writeBytes(s); + bw.flush(); if (logger.isDebugEnabled()) {logger.debug("Sent [" + s + "]" );} } catch (IOException e) { } - // kickFlusher(); } } finally { mu.unlock(); } + kickFlusher(); } /** @@ -2259,7 +2323,7 @@ public void setReconnectedEventHandler(ConnectionEventHandler reconnectedEventHa * @return the exceptionHandler */ @Override - public ConnExceptionHandler getExceptionHandler() { + public ExceptionHandler getExceptionHandler() { return exceptionHandler; } @@ -2267,8 +2331,16 @@ public ConnExceptionHandler getExceptionHandler() { * @param exceptionHandler the exceptionHandler to set */ @Override - public void setExceptionHandler(ConnExceptionHandler exceptionHandler) { + public void setExceptionHandler(ExceptionHandler exceptionHandler) { this.exceptionHandler = exceptionHandler; } + + static void printSubs(ConnectionImpl c) { + c.logger.debug("SUBS:"); + for (long key : c.subs.keySet()) + { + c.logger.debug("\tkey: " + key + " value: " + c.subs.get(key)); + } + } } diff --git a/src/main/java/io/nats/client/DefaultExceptionHandler.java b/src/main/java/io/nats/client/DefaultExceptionHandler.java index 48d566122..257a75568 100644 --- a/src/main/java/io/nats/client/DefaultExceptionHandler.java +++ b/src/main/java/io/nats/client/DefaultExceptionHandler.java @@ -1,17 +1,22 @@ package io.nats.client; -public class DefaultExceptionHandler implements ConnExceptionHandler { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class DefaultExceptionHandler implements ExceptionHandler { + private final static Logger logger = LoggerFactory.getLogger(DefaultExceptionHandler.class); + @Override - public void handleException(ConnectionImpl conn, Throwable e) { - // TODO Auto-generated method stub - + public void handleException(Connection conn, Subscription sub, Throwable e) { + System.err.println("Exception thrown on Subscription sid:" + sub.getSid() + + " subject: " + sub.getSubject()); + e.printStackTrace(); } @Override - public void onError(ConnExceptionArgs eventArgs) { - // TODO Auto-generated method stub - + public void handleSlowConsumerException(Connection conn, Subscription sub, + SlowConsumerException e) { + logger.error("SLOW CONSUMER subscription subject=" + sub.getSubject() + " sid:"+sub.getSid()); } } diff --git a/src/main/java/io/nats/client/ErrorEventHandler.java b/src/main/java/io/nats/client/ErrorEventHandler.java deleted file mode 100644 index e8c981ce9..000000000 --- a/src/main/java/io/nats/client/ErrorEventHandler.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.nats.client; - -public interface ErrorEventHandler { - public void onError(ConnExceptionArgs error); -} diff --git a/src/main/java/io/nats/client/ExceptionHandler.java b/src/main/java/io/nats/client/ExceptionHandler.java new file mode 100644 index 000000000..2a09013cf --- /dev/null +++ b/src/main/java/io/nats/client/ExceptionHandler.java @@ -0,0 +1,12 @@ +package io.nats.client; + +public interface ExceptionHandler { + public void handleException(Connection conn, Subscription subscription, Throwable e); + +// public void handleSlowConsumerException(Connection conn, +// Subscription sub, +// Throwable e); + + void handleSlowConsumerException(Connection conn, Subscription sub, SlowConsumerException e); + +} diff --git a/src/main/java/io/nats/client/MessageImpl.java b/src/main/java/io/nats/client/MessageImpl.java index cad06f6b9..cf2e3d77e 100644 --- a/src/main/java/io/nats/client/MessageImpl.java +++ b/src/main/java/io/nats/client/MessageImpl.java @@ -13,12 +13,12 @@ final class MessageImpl implements Message { private String subject; private String replyTo; private byte[] data; - protected Subscription sub; + protected SubscriptionImpl sub; protected MessageImpl() { } - public MessageImpl(MsgArg msgArgs, Subscription sub, byte[] msg, long length) { + public MessageImpl(MsgArg msgArgs, SubscriptionImpl sub, byte[] msg, long length) { this.subject = msgArgs.subject; this.replyTo = msgArgs.reply; this.data = msg; diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index e973e0e49..8a961a358 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -24,7 +24,7 @@ public class Options { private int connectionTimeout; private long pingInterval; private int maxPingsOut; - private ConnExceptionHandler connExceptionHandler; + private ExceptionHandler exceptionHandler; private ConnectionEventHandler connectionEventHandler; /** * @return the subChanLen @@ -42,7 +42,7 @@ public void setSubChanLen(int subChanLen) { public ConnectionEventHandler disconnectedEventHandler; public ConnectionEventHandler closedEventHandler; public ConnectionEventHandler reconnectedEventHandler; - public ErrorEventHandler asyncErrorEventHandler; + public ExceptionHandler asyncErrorEventHandler; public URI getUrl() { return url; @@ -198,11 +198,11 @@ public int getMaxPingsOut() { public void setMaxPingsOut(int maxPingsOut) { this.maxPingsOut = maxPingsOut; } - public ConnExceptionHandler getExceptionHandler() { - return connExceptionHandler; + public ExceptionHandler getExceptionHandler() { + return exceptionHandler; } - public void setExceptionHandler(ConnExceptionHandler connExceptionHandler) { - this.connExceptionHandler = connExceptionHandler; + public void setExceptionHandler(ExceptionHandler exceptionHandler) { + this.exceptionHandler = exceptionHandler; } public ConnectionEventHandler getConnectionListener() { return connectionEventHandler; diff --git a/src/main/java/io/nats/client/Parser.java b/src/main/java/io/nats/client/Parser.java index f91209924..58d7d4dae 100644 --- a/src/main/java/io/nats/client/Parser.java +++ b/src/main/java/io/nats/client/Parser.java @@ -170,7 +170,7 @@ public Parser(ConnectionImpl connectionImpl) { this.conn.ps = null; } - protected void parse(byte[] buffer, int len) throws ParseException { + protected void parse(byte[] buffer, int len) throws ParserException { int i; char b; @@ -496,15 +496,15 @@ protected void parse(byte[] buffer, int len) throws ParseException { } break; default: - throw new ParseException("Unable to parse."); + throw new ParserException("Unable to parse."); } // switch(state) } // for } - private void parseError(byte[] buffer, int position) throws ParseException { - throw new ParseException(String.format("Parse Error [%s], %s", + private void parseError(byte[] buffer, int position) throws ParserException { + throw new ParserException(String.format("Parse Error [%s], %s", NatsOp.names.get(state), Arrays.toString(buffer))); } diff --git a/src/main/java/io/nats/client/ParseException.java b/src/main/java/io/nats/client/ParserException.java similarity index 62% rename from src/main/java/io/nats/client/ParseException.java rename to src/main/java/io/nats/client/ParserException.java index c212a35a0..d5a20e720 100644 --- a/src/main/java/io/nats/client/ParseException.java +++ b/src/main/java/io/nats/client/ParserException.java @@ -1,27 +1,27 @@ package io.nats.client; -public class ParseException extends Exception { +public class ParserException extends Exception { /** * */ private static final long serialVersionUID = 6250978396296475549L; - public ParseException() { + public ParserException() { // TODO Auto-generated constructor stub } - public ParseException(String msg) { + public ParserException(String msg) { super(msg); // TODO Auto-generated constructor stub } - public ParseException(Throwable e) { + public ParserException(Throwable e) { super(e); // TODO Auto-generated constructor stub } - public ParseException(String string, Exception e) { + public ParserException(String string, Exception e) { super(string, e); // TODO Auto-generated constructor stub } diff --git a/src/main/java/io/nats/client/Subscription.java b/src/main/java/io/nats/client/Subscription.java index 18cd3e1c3..0eacf165b 100644 --- a/src/main/java/io/nats/client/Subscription.java +++ b/src/main/java/io/nats/client/Subscription.java @@ -59,6 +59,10 @@ void autoUnsubscribe(int max) */ int getQueuedMessageCount(); + /** + * Get the Subscription ID for this subscription. + * @return the Subscription ID. + */ long getSid(); void setMax(long max); diff --git a/src/main/java/io/nats/client/AbstractSubscriptionImpl.java b/src/main/java/io/nats/client/SubscriptionImpl.java similarity index 83% rename from src/main/java/io/nats/client/AbstractSubscriptionImpl.java rename to src/main/java/io/nats/client/SubscriptionImpl.java index fe9a734fe..1aea86953 100644 --- a/src/main/java/io/nats/client/AbstractSubscriptionImpl.java +++ b/src/main/java/io/nats/client/SubscriptionImpl.java @@ -8,11 +8,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class AbstractSubscriptionImpl implements Subscription { +abstract class SubscriptionImpl implements Subscription { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); - // public AbstractSubscriptionImpl() {} + // public SubscriptionImpl() {} final Lock mu = new ReentrantLock(); @@ -39,11 +39,11 @@ abstract class AbstractSubscriptionImpl implements Subscription { ConnectionImpl conn = null; Channel mch = new Channel(); - AbstractSubscriptionImpl() { + SubscriptionImpl() { } - public AbstractSubscriptionImpl(ConnectionImpl conn, String subject, String queue) { + public SubscriptionImpl(ConnectionImpl conn, String subject, String queue) { this.conn = conn; this.subject = subject; this.queue = queue; @@ -98,6 +98,9 @@ boolean addMessage(Message m, int maxCount) { if (mch.getCount() >= maxCount) { + if (logger.isDebugEnabled()) + logger.debug("MAXIMUM COUNT " + maxCount + + " REACHED for sid:" + getSid()); return false; } else @@ -107,7 +110,7 @@ boolean addMessage(Message m, int maxCount) if (logger.isDebugEnabled()) logger.debug("Added message to channel: " + m); } - } + } // mch != null return true; } @@ -152,10 +155,10 @@ public long getSid() { } /** - * @param sid the sid to set + * @param l the sid to set */ - public void setSid(long sid) { - this.sid = sid; + public void setSid(long l) { + this.sid = l; } /** @@ -197,7 +200,16 @@ public void autoUnsubscribe(int max) throws BadSubscriptionException, Connection } public int getQueuedMessageCount() { - return this.mch.getCount(); + if (this.mch != null) + return this.mch.getCount(); + else + return 0; + } + + public String toString() { + String s = String.format("{subject=%s, sid=%d, queued=%d, max=%d}", + getSubject(), getSid(), getQueuedMessageCount(), getMax()); + return s; } protected abstract boolean processMsg(Message msg); diff --git a/src/main/java/io/nats/client/SyncSubscriptionImpl.java b/src/main/java/io/nats/client/SyncSubscriptionImpl.java index 4bfdbb3d1..d37b3d403 100644 --- a/src/main/java/io/nats/client/SyncSubscriptionImpl.java +++ b/src/main/java/io/nats/client/SyncSubscriptionImpl.java @@ -2,7 +2,7 @@ import java.util.concurrent.TimeUnit; -final class SyncSubscriptionImpl extends AbstractSubscriptionImpl implements SyncSubscription { +final class SyncSubscriptionImpl extends SubscriptionImpl implements SyncSubscription { public SyncSubscriptionImpl(ConnectionImpl nc, String subj, String queue) { super(nc, subj, queue); @@ -48,6 +48,9 @@ public Message nextMessage(long timeout) msg = localChannel.get(-1); } } catch (TimeoutException e) { + if (localConn.exceptionHandler != null) { + localConn.exceptionHandler.handleException(localConn, this, e); + } } if (msg != null) { diff --git a/src/main/resources/jnats.properties b/src/main/resources/jnats.properties index 6b227788d..95cdb7b74 100644 --- a/src/main/resources/jnats.properties +++ b/src/main/resources/jnats.properties @@ -1,4 +1,4 @@ application.name = jnats -application.version = ${project.version} +client.version = ${project.version} #application.copyright = (c) nobody #application.title = ${application.name} ${application.version}\n${application.copyright} \ No newline at end of file