Skip to content

Commit

Permalink
Numerous fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Larry McQueary committed Nov 19, 2015
1 parent 532f4ca commit ccec12d
Show file tree
Hide file tree
Showing 27 changed files with 400 additions and 306 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ A [Java](http://www.java.com) client for the [NATS messaging system](https://nat
[![Javadoc](http://javadoc-badge.appspot.com/com.github.nats-io/jnats.svg?label=javadoc)](http://javadoc-badge.appspot.com/com.github.nats-io/jnats)
[![Coverage Status](https://coveralls.io/repos/nats-io/jnats/badge.svg?branch=master)](https://coveralls.io/r/nats-io/jnats?branch=master)

This is a WORK IN PROGRESS. It's not quite a stable release.
This is a WORK IN PROGRESS. It's not quite a stable release.
Test coverage is inadequate (currently around zero).
Documentation (javadoc) is in progress.

Please refer to the TODO.md for constantly updating information on what things are not complete or not working.
Watch this space for more info as it becomes available.

Expand All @@ -31,7 +34,7 @@ This compressed archive contains the jnats client jar, slf4j dependencies, and t

```java

ConnectionFactory cf = new ConnectionFactory(Cosntants.DEFAULT_URL)
ConnectionFactory cf = new ConnectionFactory(Constants.DEFAULT_URL)
Connection nc = cf.createConnection();

// Simple Publisher
Expand Down
7 changes: 4 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### jnats TODO list
# General

- [] JavaDoc
- [] Simplify Exceptions
- [] Protobuf Encoding
- [] TLS support
- [] Simplify Exception handling
- [] EncodedConnection (e.g. protobuf)
2 changes: 1 addition & 1 deletion examples/Publisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void run(String[] args)
System.out.printf("(%d msgs/second).\n",
(int)(count / elapsed));
} else {
System.err.println("\nTest not long enough to produce meaningful stats. "
System.out.println("\nTest not long enough to produce meaningful stats. "
+ "Please increase the message count (-count n)");
}
printStats(c);
Expand Down
82 changes: 35 additions & 47 deletions examples/Replier.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,32 @@ public class Replier implements MessageHandler {
boolean sync = false;
int received = 0;
boolean verbose = false;

long start = 0L;
long end = 0L;

Lock testLock = new ReentrantLock();
Condition allDone = testLock.newCondition();
boolean done = false;

String replyText = "reply";
byte[] replyBytes = replyText.getBytes(Charset.forName("UTF-8"));


byte[] replyBytes = "reply".getBytes(Charset.forName("UTF-8"));

public void run(String[] args)
{
parseArgs(args);
banner();

ConnectionFactory cf = null;
Connection c = null;

try {
cf = new ConnectionFactory(url);
c = cf.createConnection();
} catch (NATSException e) {
System.err.println("Couldn't connect: " + e.getCause());
System.exit(-1);
}

long elapsed, seconds;

if (sync)
Expand All @@ -55,11 +54,11 @@ public void run(String[] args)
elapsed = receiveAsyncSubscriber(c);
}
seconds = TimeUnit.NANOSECONDS.toSeconds(elapsed);
System.out.printf("Replied to %d msgs in %d seconds ", count, seconds);
System.out.printf("Replied to %d msgs in %d seconds ", received, seconds);

if (seconds > 0) {
System.out.printf("(%d msgs/second).\n",
(count / seconds));
System.out.printf("(%d msgs/second).\n",
(received / seconds));
} else {
System.out.println();
System.out.println("Test not long enough to produce meaningful stats. "
Expand All @@ -73,20 +72,18 @@ private void printStats(Connection c)
Statistics s = c.getStats();
System.out.printf("Statistics: \n");
System.out.printf(" Incoming Payload Bytes: %d\n", s.getInBytes());
System.out.printf(" Incoming Messages: %d", s.getInMsgs());
System.out.printf(" Outgoing Payload Bytes: %d", s.getOutBytes());
System.out.printf(" Outgoing Messages: %d", s.getOutMsgs());
System.out.printf(" Incoming Messages: %d\n", s.getInMsgs());
System.out.printf(" Outgoing Payload Bytes: %d\n", s.getOutBytes());
System.out.printf(" Outgoing Messages: %d\n", s.getOutMsgs());
}

@Override
public void onMessage(Message msg) {
if (received == 0)
if (received++ == 0)
start = System.nanoTime();

received++;

if (verbose)
System.err.println("Received: " + msg);
System.out.println("Received: " + msg);

Connection c = msg.getSubscription().getConnection();
try {
Expand All @@ -102,6 +99,7 @@ public void onMessage(Message msg) {
testLock.lock();
try
{
System.out.println("I'M DONE");
done=true;
allDone.signal();
}
Expand All @@ -112,57 +110,47 @@ public void onMessage(Message msg) {
}
private long receiveAsyncSubscriber(Connection c)
{
long t0 = System.nanoTime();

c.subscribeAsync(subject, this);
AsyncSubscription s = c.subscribeAsync(subject, this);
// just wait to complete
testLock.lock();
try
{
s.start();
while (!done)
allDone.await();
} catch (InterruptedException e) {
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finally {
testLock.unlock();
}

return System.nanoTime() - t0;
return end - start;
}


private long receiveSyncSubscriber(Connection c)
{
System.out.println("In receiveSyncSubscriber");
SyncSubscription s = c.subscribeSync(subject);
try {
s.nextMessage();
} catch (Exception e) {
e.printStackTrace();
}
received++;

start = System.nanoTime();

while (received < count)
{
received++;
Message m = null;
try {
System.out.println("nextMessage()");
Message m = null;
try {
while (received < count)
{
m = s.nextMessage();
System.out.println("Got message " + m);
} catch (Exception e) {
e.printStackTrace();
}
if (verbose)
System.out.println("Received: " + m);
if (received++ == 0)
start = System.nanoTime();

if (verbose)
System.out.println("Received: " + m);

try {
c.publish(m.getReplyTo(),replyBytes);
} catch (ConnectionClosedException e) {
e.printStackTrace();
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
end = System.nanoTime();
return end-start;
Expand All @@ -172,7 +160,7 @@ private void usage()
{
System.err.println(
"Usage: Publish [-url url] [-subject subject] " +
"-count [count] [-sync] [-verbose]");
"[-count count] [-sync] [-verbose]");

System.exit(-1);
}
Expand Down
23 changes: 17 additions & 6 deletions examples/Requestor.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,23 @@ public void run(String[] args)

start = System.nanoTime();

Message replyMsg = null;
int received = 0;

Message m = null;
byte[] reply = null;
try {
for (int i = 0; i < count; i++)
{
replyMsg = c.request(subject, payload);
System.out.print("Got reply: " + new String(replyMsg.getData()));
m = c.request(subject, payload, 10000);
if (m == null)
break;

received++;
reply = m.getData();
if (reply != null)
System.out.println("Got reply: " + new String(reply));
else
System.out.println("Got reply with null payload");
}
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -51,10 +62,10 @@ public void run(String[] args)
end = System.nanoTime();
elapsed = TimeUnit.NANOSECONDS.toSeconds(end-start);

System.out.printf("Completed %d requests in %d seconds ", count, elapsed);
System.out.printf("Completed %d requests in %d seconds ", received, elapsed);
if (elapsed > 0) {
System.out.printf("(%d msgs/second).\n",
(count / elapsed));
(received / elapsed));
} else {
System.out.println();
System.out.println("Test not long enough to produce meaningful stats. "
Expand All @@ -78,7 +89,7 @@ private void usage()
{
System.err.println(
"Usage: Requestor [-url url] [-subject subject] " +
"-count [count] [-payload payload]");
"[-count count] [-payload payload]");

System.exit(-1);
}
Expand Down
Loading

0 comments on commit ccec12d

Please sign in to comment.