diff --git a/README.md b/README.md
index 8ef6a5e..72e09fd 100644
--- a/README.md
+++ b/README.md
@@ -79,7 +79,7 @@ for (int i=0; i < iterations; i++) {
* [slf4j][slf4j]
* [trendrr-oss][trendrr-oss]
-Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.NSQLookup interface using a different json parser
+Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.lookup.NSQLookup interface using a different json parser
[nsq]: https://github.com/bitly/nsq
diff --git a/pom.xml b/pom.xml
index fd07982..f505ac7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,9 +6,15 @@
com.github.dustismo
trendrr-nsq-client
- 1.2-SNAPSHOT
+ 1.3-SNAPSHOT
+
+ com.trendrr.oss
+ trendrr-oss
+ 1.0
+
+
ch.qos.logback
logback-classic
@@ -32,23 +38,78 @@
slf4j-api
1.6.4
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.2.2
+
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.2.2
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.2.2
+
+
+
+ com.google.guava
+ guava
+ r09
+
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+
+ org.mockito
+ mockito-all
+ 1.9.5
+ test
+
+
+
+
-
-
-
- maven-source-plugin
-
-
- attach-sources
- verify
-
- jar
-
-
-
-
-
-
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.10
+
+ always
+
+ file:${basedir}/src/test/resources/log4j-surefire.xml
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 2.2.1
+
+
+ attach-sources
+ verify
+
+ jar-no-fork
+
+
+
+
+
+
diff --git a/src/main/java/com/trendrr/nsq/AbstractNSQClient.java b/src/main/java/com/trendrr/nsq/AbstractNSQClient.java
index a422102..d6b0a72 100644
--- a/src/main/java/com/trendrr/nsq/AbstractNSQClient.java
+++ b/src/main/java/com/trendrr/nsq/AbstractNSQClient.java
@@ -35,29 +35,27 @@
*/
public abstract class AbstractNSQClient {
- protected static Logger log = LoggerFactory.getLogger(AbstractNSQClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNSQClient.class);
+ private static final int CLEAN_UP_FREQUENCY = 1000 * 60 * 2;
/**
* Protocol version sent to nsqd on initial connect
*/
- public static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes();
+ private static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes();
+ private volatile long lookupPeriod = 60 * 1000; //how often to recheck for new nodes (and clean up non responsive nodes)
- private int messagesPerBatch = 200;
- private long lookupPeriod = 60 * 1000; // how often to recheck for new nodes (and clean up non responsive nodes)
-
-
- Connections connections = new Connections();
+ private int messagesPerBatch = 200;
+ private final Connections connections = new Connections();
// Configure the client.
- protected ClientBootstrap bootstrap = null;
- protected Timer timer = null;
+ private ClientBootstrap bootstrap = null;
+ private Timer timer = null;
+
+
+ //this executor is where the callback code is handled
+ protected Executor executor = Executors.newSingleThreadExecutor();
- //this executor is where the callback code is handled
- protected Executor executor = Executors.newSingleThreadExecutor();
- /**
- * connects, ready to produce.
- */
public synchronized void start() {
this.connect();
@@ -65,6 +63,8 @@ public synchronized void start() {
timer.cancel();
}
timer = new Timer();
+
+ //TODO Use a scheduled executor service instead of a timer here
timer.schedule(new TimerTask() {
@Override
public void run() {
@@ -113,28 +113,27 @@ public synchronized void setNettyExecutors(Executor boss, Executor worker) {
* Handles connection and sending magic protocol
* @param address
* @param port
- * @return
+ * @return the created connection; null if no connection could be created
*/
protected Connection createConnection(String address, int port) {
-
// Start the connection attempt.
- ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));
-
- // Wait until the connection attempt succeeds or fails.
- Channel channel = future.awaitUninterruptibly().getChannel();
- if (!future.isSuccess()) {
- log.error("Caught", future.getCause());
- return null;
- }
- log.info("Creating connection: " + address + " : " + port);
- Connection conn = new Connection(address, port, channel, this);
- conn.setMessagesPerBatch(this.messagesPerBatch);
-
- ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
- buf.writeBytes(MAGIC_PROTOCOL_VERSION);
- channel.write(buf);
-
- //indentify
+ ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));
+
+ // Wait until the connection attempt succeeds or fails.
+ Channel channel = future.awaitUninterruptibly().getChannel();
+ if (!future.isSuccess()) {
+ LOGGER.error("Unable to create connection, caught: ", future.getCause());
+ return null;
+ }
+
+ LOGGER.info("Creating connection: " + address + " : " + port);
+ Connection conn = new Connection(address, port, channel, this);
+ conn.setMessagesPerBatch(this.messagesPerBatch);
+ ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
+ buf.writeBytes(MAGIC_PROTOCOL_VERSION);
+ channel.write(buf);
+
+ //indentify
try {
String identJson = "{" +
"\"short_id\":\"" + InetAddress.getLocalHost().getHostName() + "\"" +
@@ -145,7 +144,7 @@ protected Connection createConnection(String address, int port) {
conn.command(ident);
} catch (UnknownHostException e) {
- log.error("Caught", e);
+ LOGGER.error("Caught", e);
}
return conn;
@@ -168,10 +167,11 @@ protected synchronized void connect() {
for (ConnectionAddress addr : addresses ) {
- int num = addr.getPoolsize() - this.connections.connectionSize(addr.getHost(), addr.getPort());
+ int num = addr.getPoolsize() - connections.connectionSize(addr.getHost(), addr.getPort());
for (int i=0; i < num; i++) {
- Connection conn = this.createConnection(addr.getHost(), addr.getPort());
- this.connections.addConnection(conn);
+ Connection conn = createConnection(addr.getHost(), addr.getPort());
+ connections.addConnection(conn);
+
}
//TODO: handle negative num? (i.e. if user lowered the poolsize we should kill some connections)
}
@@ -179,23 +179,27 @@ protected synchronized void connect() {
}
/**
- * will run through and remove any connections that have not recieved a ping in the last 2 minutes.
+ * will run through and remove any connections that have not recieved a ping in the last N milliseconds
*/
public synchronized void cleanupOldConnections() {
- Date cutoff = new Date(new Date().getTime() - (1000*60*2));
+ Date cutoff = new Date(new Date().getTime() - CLEAN_UP_FREQUENCY);
try {
for (Connection c : this.connections.getConnections()) {
if (cutoff.after(c.getLastHeartbeat())) {
- log.warn("Removing dead connection: " + c.getHost() + ":" + c.getPort());
+ LOGGER.warn("Removing dead connection [host={}, port={}]", c.getHost(), c.getPort());
c.close();
connections.remove(c);
}
}
} catch (NoConnectionsException e) {
- //ignore
+ //ignore - TODO: is it a good idea to swallow this exception?
}
}
+ public Connections getConnections(){
+ return connections;
+ }
+
public void setMessagesPerBatch(int messagesPerBatch) {
this.messagesPerBatch = messagesPerBatch;
}
@@ -209,7 +213,7 @@ public void setLookupPeriod(long periodMillis) {
* @param connection
*/
public synchronized void _disconnected(Connection connection) {
- log.warn("Disconnected!" + connection);
+ LOGGER.warn("Client disconnected [connection={}]", connection);
this.connections.remove(connection);
}
@@ -217,6 +221,5 @@ public void close() {
this.timer.cancel();
this.connections.close();
this.bootstrap.releaseExternalResources();
-
}
}
diff --git a/src/main/java/com/trendrr/nsq/Connection.java b/src/main/java/com/trendrr/nsq/Connection.java
index 8c96576..b053185 100644
--- a/src/main/java/com/trendrr/nsq/Connection.java
+++ b/src/main/java/com/trendrr/nsq/Connection.java
@@ -29,24 +29,23 @@
*
*/
public class Connection {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);
- protected static Logger log = LoggerFactory.getLogger(Connection.class);
+ private Channel channel;
+ private int heartbeats = 0;
+ private Date lastHeartbeat = new Date();
- Channel channel;
- int heartbeats = 0;
- Date lastHeartbeat = new Date();
+ private NSQMessageCallback callback = null;
+ private AtomicLong totalMessages = new AtomicLong(0l);
+ private int messagesPerBatch = 200;
- NSQMessageCallback callback = null;
- AtomicLong totalMessages = new AtomicLong(0l);
- int messagesPerBatch = 200;
+ private AbstractNSQClient client = null;
- AbstractNSQClient client = null;
+ private String host = null;
+ private int port;
- String host = null;
- int port;
-
- LinkedBlockingQueue requests = new LinkedBlockingQueue(1);
- LinkedBlockingQueue responses = new LinkedBlockingQueue(1);
+ private LinkedBlockingQueue requests = new LinkedBlockingQueue(1);
+ private LinkedBlockingQueue responses = new LinkedBlockingQueue(1);
public Connection(String host, int port, Channel channel, AbstractNSQClient client) {
@@ -94,7 +93,7 @@ public void incoming(NSQFrame frame) {
try {
this.responses.offer(frame, 20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- log.error("Incoming frame error", e);
+ LOGGER.error("Incoming frame error", e);
//TODO: what to do here? we should probably disconnect!
this.close();
}
@@ -123,19 +122,20 @@ public void incoming(NSQFrame frame) {
message.setMessage(msg.getMessageBody());
message.setTimestamp(new Date(TimeUnit.NANOSECONDS.toMillis(msg.getTimestamp())));
if (this.callback == null) {
- log.warn("NO CAllback, dropping message: " + message);
+ LOGGER.warn("NO Callback, dropping message: " + message);
} else {
this.callback.message(message);
}
return;
}
- log.warn("Unknown frame type: " + frame);
+ LOGGER.warn("Unknown frame type: " + frame);
}
void heartbeat() {
- log.info("HEARTBEAT!");
+ // This should be logged at debug level - it's more useful for troubleshooting and should not interfere with actual INFO logs
+ LOGGER.debug("HEARTBEAT!");
this.heartbeats++;
this.lastHeartbeat = new Date();
//send NOP here.
@@ -184,9 +184,9 @@ public void close() {
try {
channel.close().await(10000);
} catch (Exception x) {
- log.error("Caught", x);
+ LOGGER.error("Caught", x);
}
- log.warn("Close called on connection: " + this);
+ LOGGER.warn("Close called on connection: " + this);
this._disconnected();
}
diff --git a/src/main/java/com/trendrr/nsq/ConnectionAddress.java b/src/main/java/com/trendrr/nsq/ConnectionAddress.java
index 526fb5a..9475763 100644
--- a/src/main/java/com/trendrr/nsq/ConnectionAddress.java
+++ b/src/main/java/com/trendrr/nsq/ConnectionAddress.java
@@ -8,40 +8,64 @@
/**
+ * Holds the details of a connection
* @author Dustin Norlander
* @created Jan 22, 2013
*
*/
public class ConnectionAddress {
- protected static Logger log = LoggerFactory.getLogger(ConnectionAddress.class);
-
- private int poolsize = 1;
-
- /**
+ private static Logger log = LoggerFactory.getLogger(ConnectionAddress.class);
+
+ private static final int DEFAULT_POOL_SIZE = 1;
+
+ private final String host;
+ private final int port;
+ private final int poolsize;
+
+ public ConnectionAddress(String host, int port) {
+ this(host, port, DEFAULT_POOL_SIZE);
+ }
+
+ public ConnectionAddress(String host, int port, int poolsize) {
+ this.host = host;
+ this.port = port;
+ this.poolsize = poolsize;
+ }
+
+ /**
* How many connections should we have in place?
* @return
*/
public int getPoolsize() {
return poolsize;
}
- public void setPoolsize(int poolsize) {
- this.poolsize = poolsize;
- }
-
-
+
public String getHost() {
return host;
}
- public void setHost(String host) {
- this.host = host;
- }
+
public int getPort() {
return port;
}
- public void setPort(int port) {
- this.port = port;
- }
- private String host;
- private int port;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ConnectionAddress that = (ConnectionAddress) o;
+
+ if (port != that.port) return false;
+ if (!host.equals(that.host)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = host.hashCode();
+ result = 31 * result + port;
+ return result;
+ }
}
diff --git a/src/main/java/com/trendrr/nsq/NSQConsumer.java b/src/main/java/com/trendrr/nsq/NSQConsumer.java
index 7fcbef6..c187a86 100644
--- a/src/main/java/com/trendrr/nsq/NSQConsumer.java
+++ b/src/main/java/com/trendrr/nsq/NSQConsumer.java
@@ -5,6 +5,7 @@
import java.util.List;
+import com.trendrr.nsq.lookup.NSQLookup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -17,7 +18,7 @@
*/
public class NSQConsumer extends AbstractNSQClient {
- protected static Logger log = LoggerFactory.getLogger(NSQConsumer.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(NSQConsumer.class);
NSQLookup lookup;
String topic = null;
@@ -34,17 +35,31 @@ public NSQConsumer(NSQLookup lookup, String topic, String channel, NSQMessageCal
@Override
protected Connection createConnection(String address, int port) {
- Connection conn = super.createConnection(address, port);
-
- conn.setCallback(callback);
+ Connection conn = super.createConnection(address, port);
+ return addCommands(conn);
+ }
+
+ /**
+ * Adds all required commands to a subscriber connection
+ * @param conn the connection
+ * @return the passed connection
+ */
+ protected Connection addCommands(Connection conn){
+ if(conn == null){
+ //Connection could be null here
+ LOGGER.error("Connection returned is null - cannot go further");
+ return null;
+ }
+
+ conn.setCallback(callback);
/*
* subscribe
*/
- conn.command(NSQCommand.instance("SUB " + topic + " " + this.channel));
- conn.command(NSQCommand.instance("RDY " + conn.getMessagesPerBatch()));
- return conn;
-
- }
+ conn.command(NSQCommand.instance("SUB " + topic + " " + this.channel));
+ conn.command(NSQCommand.instance("RDY " + conn.getMessagesPerBatch()));
+ return conn;
+ }
+
/* (non-Javadoc)
* @see com.trendrr.nsq.AbstractNSQClient#lookupAddresses()
*/
diff --git a/src/main/java/com/trendrr/nsq/NSQProducer.java b/src/main/java/com/trendrr/nsq/NSQProducer.java
index 24694e5..1505dc0 100644
--- a/src/main/java/com/trendrr/nsq/NSQProducer.java
+++ b/src/main/java/com/trendrr/nsq/NSQProducer.java
@@ -118,7 +118,7 @@ protected synchronized Connection getConn() throws NoConnectionsException {
NoConnectionsException ex = new NoConnectionsException("no connections", null);
for (int i=0; i < this.connectionRetries; i++) {
try {
- return this.connections.next();
+ return getConnections().next();
} catch (NoConnectionsException x) {
ex = x;
try {
@@ -211,16 +211,12 @@ public void produce(String topic, byte[] message) throws DisconnectedException,
/**
* Adds a new connection.
- * @param host
- * @param port
- * @param poolsize
+ * @param host the host
+ * @param port the port
+ * @param poolsize the connection pool size
*/
public synchronized NSQProducer addAddress(String host, int port, int poolsize) {
- ConnectionAddress addr = new ConnectionAddress();
- addr.setHost(host);
- addr.setPoolsize(poolsize);
- addr.setPort(port);
- this.addresses.add(addr);
+ this.addresses.add(new ConnectionAddress(host, port, poolsize));
return this;
}
diff --git a/src/main/java/com/trendrr/nsq/NSQLookup.java b/src/main/java/com/trendrr/nsq/lookup/NSQLookup.java
similarity index 66%
rename from src/main/java/com/trendrr/nsq/NSQLookup.java
rename to src/main/java/com/trendrr/nsq/lookup/NSQLookup.java
index 83629d5..7d34136 100644
--- a/src/main/java/com/trendrr/nsq/NSQLookup.java
+++ b/src/main/java/com/trendrr/nsq/lookup/NSQLookup.java
@@ -1,15 +1,13 @@
-package com.trendrr.nsq;
-/**
- *
- */
+package com.trendrr.nsq.lookup;
+import com.trendrr.nsq.ConnectionAddress;
import java.util.List;
/**
* An interface to the nsq lookup. We keep this as an interface because it depends on
- * some json parsing library and we dont want to force a dependancy on a specific lib.
+ * some json parsing library and we don't want to force a dependency on a specific lib.
*
*
* @author Dustin Norlander
@@ -27,7 +25,7 @@ public interface NSQLookup {
/**
* Lookup topic addresses
* @param topic
- * @return
+ * @return the list of nsq servers where this topic is available
*/
public List lookup(String topic);
}
diff --git a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java
index 02ba65f..9367963 100644
--- a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java
+++ b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java
@@ -1,5 +1,5 @@
/**
- *
+ *
*/
package com.trendrr.nsq.lookup;
@@ -17,82 +17,93 @@
import org.slf4j.LoggerFactory;
import com.trendrr.nsq.ConnectionAddress;
-import com.trendrr.nsq.NSQLookup;
import com.trendrr.oss.DynMap;
/**
* Lookup implementation based on trendrr-oss DynMap
- *
- *
+ *
* @author Dustin Norlander
* @created Jan 23, 2013
- *
*/
public class NSQLookupDynMapImpl implements NSQLookup {
+ private static Logger LOGGER = LoggerFactory.getLogger(NSQLookupDynMapImpl.class);
- protected static Logger log = LoggerFactory.getLogger(NSQLookupDynMapImpl.class);
-
- Set addresses = new HashSet ();
-
-
- public void addAddr(String addr, int port) {
- if (!addr.startsWith("http")) {
- addr = "http://" + addr;
- }
- addr = addr + ":" + port;
- this.addresses.add(addr);
- }
-
- public List lookup(String topic) {
- HashMap addresses = new HashMap();
-
- for (String addr : this.addresses) {
- DynMap mp = DynMap.instance(this.getHTML(addr + "/lookup?topic=" + topic), new DynMap());
- for (DynMap node : mp.getListOrEmpty(DynMap.class, "data.producers")) {
- String host = node.getString("broadcast_address", node.getString("address"));
- String key = host + ":" + node.getInteger("tcp_port");
- ConnectionAddress address = new ConnectionAddress();
- address.setHost(host);
- address.setPort(node.getInteger("tcp_port"));
- addresses.put(key, address);
- }
- }
- return new ArrayList(addresses.values());
- }
-
- public String getHTML(String url) {
- URL u;
- HttpURLConnection conn = null;
- BufferedReader rd = null;
- String line;
- String result = "";
- try {
- u = new URL(url);
- conn = (HttpURLConnection) u.openConnection();
- conn.setRequestMethod("GET");
- rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
- while ((line = rd.readLine()) != null) {
- result += line;
- }
-
- } catch (Exception e) {
- log.error("Caught", e);
- } finally {
- try {
- if (rd != null){
- rd.close();
- }
- } catch (Exception e) {
- log.error("Caught", e);
- }
-
- // Release memory and underlying resources on the HttpURLConnection otherwise we may run out of file descriptors and leak memory
- if (conn != null){
- conn.disconnect();
- }
- }
- return result;
- }
-
+ //By default log errors every five minutes otherwise logs become terribly verbose
+ private volatile long errorLoggingInterval = 5 * 60 * 1000;
+
+ private final Set addresses = new HashSet();
+
+ private volatile long lastError = 0;
+
+ public void addAddr(String addr, int port) {
+ if (!addr.startsWith("http")) {
+ addr = "http://" + addr;
+ }
+ addr = addr + ":" + port;
+ this.addresses.add(addr);
+ }
+
+ public List lookup(String topic) {
+ HashMap addresses = new HashMap();
+
+ for (String addr : this.addresses) {
+ DynMap mp = DynMap.instance(this.getHTML(addr + "/lookup?topic=" + topic), new DynMap());
+ for (DynMap node : mp.getListOrEmpty(DynMap.class, "data.producers")) {
+ String host = node.getString("broadcast_address", node.getString("address"));
+ String key = host + ":" + node.getInteger("tcp_port");
+
+ addresses.put(key, new ConnectionAddress(host, node.getInteger("tcp_port")));
+ }
+ }
+ return new ArrayList(addresses.values());
+ }
+
+ public String getHTML(String url) {
+ URL u;
+ HttpURLConnection conn = null;
+ BufferedReader rd = null;
+ String line;
+ String result = "";
+ try {
+ u = new URL(url);
+ conn = (HttpURLConnection) u.openConnection();
+ conn.setRequestMethod("GET");
+ rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+ while ((line = rd.readLine()) != null) {
+ result += line;
+ }
+ } catch (Exception e) {
+ logException("Caught an exception when trying to get nsq instances from lookup mechanism [url=" + url + "]", e);
+ } finally {
+ try {
+ if (rd != null){
+ rd.close();
+ }
+ } catch (Exception e) {
+ logException("Caught an exception when trying to close buffered reader", e);
+ }
+ //Make sure to close the HTTP connection as well, otherwise the file descriptors will remain open
+ if (conn != null){
+ conn.disconnect();
+ }
+ }
+ return result;
+ }
+
+ private void logException(String message, Exception e) {
+ final long now = System.currentTimeMillis();
+ if (now - lastError > errorLoggingInterval) {
+ LOGGER.error(message, e);
+ lastError = now;
+ }
+ }
+
+ public long getErrorLoggingInterval() {
+ return errorLoggingInterval;
+ }
+
+ public void setErrorLoggingInterval(long errorLoggingInterval) {
+ this.errorLoggingInterval = errorLoggingInterval;
+ }
}
diff --git a/src/main/java/com/trendrr/nsq/ExampleMain.java b/src/test/java/com/java/trendrr/nsq/ExampleMain.java
similarity index 93%
rename from src/main/java/com/trendrr/nsq/ExampleMain.java
rename to src/test/java/com/java/trendrr/nsq/ExampleMain.java
index e1dafb0..54d61de 100644
--- a/src/main/java/com/trendrr/nsq/ExampleMain.java
+++ b/src/test/java/com/java/trendrr/nsq/ExampleMain.java
@@ -1,4 +1,4 @@
-package com.trendrr.nsq;
+package com.java.trendrr.nsq;
/**
*
*/
@@ -9,6 +9,11 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
+import com.trendrr.nsq.NSQConsumer;
+import com.trendrr.nsq.NSQMessage;
+import com.trendrr.nsq.NSQMessageCallback;
+import com.trendrr.nsq.NSQProducer;
+import com.trendrr.nsq.lookup.NSQLookup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +41,7 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
* PRODUCER. produce 50k messages
*/
//producer
- NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1);
+ NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1);
producer.start();
start = new Date();
String msg = StringHelper.randomString(10);
diff --git a/src/main/java/SpeedTest.java b/src/test/java/com/java/trendrr/nsq/SpeedTest.java
similarity index 97%
rename from src/main/java/SpeedTest.java
rename to src/test/java/com/java/trendrr/nsq/SpeedTest.java
index 8101ffe..c8ce112 100644
--- a/src/main/java/SpeedTest.java
+++ b/src/test/java/com/java/trendrr/nsq/SpeedTest.java
@@ -1,4 +1,4 @@
-/**
+package com.java.trendrr.nsq; /**
*
*/
@@ -13,7 +13,7 @@
import com.trendrr.nsq.BatchCallback;
import com.trendrr.nsq.NSQMessageCallback;
import com.trendrr.nsq.NSQConsumer;
-import com.trendrr.nsq.NSQLookup;
+import com.trendrr.nsq.lookup.NSQLookup;
import com.trendrr.nsq.NSQMessage;
import com.trendrr.nsq.NSQProducer;
import com.trendrr.nsq.lookup.NSQLookupDynMapImpl;
diff --git a/src/test/java/com/trendrr/nsq/NSQConsumerTest.java b/src/test/java/com/trendrr/nsq/NSQConsumerTest.java
new file mode 100644
index 0000000..2d32669
--- /dev/null
+++ b/src/test/java/com/trendrr/nsq/NSQConsumerTest.java
@@ -0,0 +1,46 @@
+package com.trendrr.nsq;
+
+
+import com.trendrr.nsq.lookup.NSQLookup;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class NSQConsumerTest {
+
+ private NSQConsumer nsqConsumer;
+
+ private final String topic = "myTopic";
+
+ private final String channel = "myChannel";
+
+ private NSQLookup nsqLookup;
+
+ private NSQMessageCallback callback;
+
+ @Before
+ public void setUp() throws Exception {
+ nsqLookup = mock(NSQLookup.class);
+ callback = mock(NSQMessageCallback.class);
+
+ nsqConsumer = new NSQConsumer(nsqLookup, topic, channel, callback);
+ }
+
+ @Test
+ public void testAddCommandsToNullConnectionReturnsNull() throws Exception {
+ Assert.assertNull(nsqConsumer.addCommands(null));
+ }
+
+ @Test
+ public void testAddCommandsToNoNullConnectionReturnsOriginalConnection() throws Exception {
+ Connection mockConn = mock(Connection.class);
+ Assert.assertEquals(mockConn, nsqConsumer.addCommands(mockConn));
+
+ verify(mockConn, times(2)).command(any(NSQCommand.class));
+ }
+}
diff --git a/src/test/resources/log4j-surefire.xml b/src/test/resources/log4j-surefire.xml
new file mode 100644
index 0000000..b406525
--- /dev/null
+++ b/src/test/resources/log4j-surefire.xml
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file