diff --git a/README.md b/README.md index 8ef6a5e..72e09fd 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ for (int i=0; i < iterations; i++) { * [slf4j][slf4j] * [trendrr-oss][trendrr-oss] -Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.NSQLookup interface using a different json parser +Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.lookup.NSQLookup interface using a different json parser [nsq]: https://github.com/bitly/nsq diff --git a/pom.xml b/pom.xml index fd07982..f505ac7 100644 --- a/pom.xml +++ b/pom.xml @@ -6,9 +6,15 @@ com.github.dustismo trendrr-nsq-client - 1.2-SNAPSHOT + 1.3-SNAPSHOT + + com.trendrr.oss + trendrr-oss + 1.0 + + ch.qos.logback logback-classic @@ -32,23 +38,78 @@ slf4j-api 1.6.4 + + + com.fasterxml.jackson.core + jackson-annotations + 2.2.2 + + + + com.fasterxml.jackson.core + jackson-core + 2.2.2 + + + + com.fasterxml.jackson.core + jackson-databind + 2.2.2 + + + + com.google.guava + guava + r09 + + + + junit + junit + 4.11 + test + + + + org.mockito + mockito-all + 1.9.5 + test + + + + - - - - maven-source-plugin - - - attach-sources - verify - - jar - - - - - - + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.10 + + always + + file:${basedir}/src/test/resources/log4j-surefire.xml + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + verify + + jar-no-fork + + + + + + diff --git a/src/main/java/com/trendrr/nsq/AbstractNSQClient.java b/src/main/java/com/trendrr/nsq/AbstractNSQClient.java index a422102..d6b0a72 100644 --- a/src/main/java/com/trendrr/nsq/AbstractNSQClient.java +++ b/src/main/java/com/trendrr/nsq/AbstractNSQClient.java @@ -35,29 +35,27 @@ */ public abstract class AbstractNSQClient { - protected static Logger log = LoggerFactory.getLogger(AbstractNSQClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNSQClient.class); + private static final int CLEAN_UP_FREQUENCY = 1000 * 60 * 2; /** * Protocol version sent to nsqd on initial connect */ - public static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes(); + private static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes(); + private volatile long lookupPeriod = 60 * 1000; //how often to recheck for new nodes (and clean up non responsive nodes) - private int messagesPerBatch = 200; - private long lookupPeriod = 60 * 1000; // how often to recheck for new nodes (and clean up non responsive nodes) - - - Connections connections = new Connections(); + private int messagesPerBatch = 200; + private final Connections connections = new Connections(); // Configure the client. - protected ClientBootstrap bootstrap = null; - protected Timer timer = null; + private ClientBootstrap bootstrap = null; + private Timer timer = null; + + + //this executor is where the callback code is handled + protected Executor executor = Executors.newSingleThreadExecutor(); - //this executor is where the callback code is handled - protected Executor executor = Executors.newSingleThreadExecutor(); - /** - * connects, ready to produce. - */ public synchronized void start() { this.connect(); @@ -65,6 +63,8 @@ public synchronized void start() { timer.cancel(); } timer = new Timer(); + + //TODO Use a scheduled executor service instead of a timer here timer.schedule(new TimerTask() { @Override public void run() { @@ -113,28 +113,27 @@ public synchronized void setNettyExecutors(Executor boss, Executor worker) { * Handles connection and sending magic protocol * @param address * @param port - * @return + * @return the created connection; null if no connection could be created */ protected Connection createConnection(String address, int port) { - // Start the connection attempt. - ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port)); - - // Wait until the connection attempt succeeds or fails. - Channel channel = future.awaitUninterruptibly().getChannel(); - if (!future.isSuccess()) { - log.error("Caught", future.getCause()); - return null; - } - log.info("Creating connection: " + address + " : " + port); - Connection conn = new Connection(address, port, channel, this); - conn.setMessagesPerBatch(this.messagesPerBatch); - - ChannelBuffer buf = ChannelBuffers.dynamicBuffer(); - buf.writeBytes(MAGIC_PROTOCOL_VERSION); - channel.write(buf); - - //indentify + ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port)); + + // Wait until the connection attempt succeeds or fails. + Channel channel = future.awaitUninterruptibly().getChannel(); + if (!future.isSuccess()) { + LOGGER.error("Unable to create connection, caught: ", future.getCause()); + return null; + } + + LOGGER.info("Creating connection: " + address + " : " + port); + Connection conn = new Connection(address, port, channel, this); + conn.setMessagesPerBatch(this.messagesPerBatch); + ChannelBuffer buf = ChannelBuffers.dynamicBuffer(); + buf.writeBytes(MAGIC_PROTOCOL_VERSION); + channel.write(buf); + + //indentify try { String identJson = "{" + "\"short_id\":\"" + InetAddress.getLocalHost().getHostName() + "\"" + @@ -145,7 +144,7 @@ protected Connection createConnection(String address, int port) { conn.command(ident); } catch (UnknownHostException e) { - log.error("Caught", e); + LOGGER.error("Caught", e); } return conn; @@ -168,10 +167,11 @@ protected synchronized void connect() { for (ConnectionAddress addr : addresses ) { - int num = addr.getPoolsize() - this.connections.connectionSize(addr.getHost(), addr.getPort()); + int num = addr.getPoolsize() - connections.connectionSize(addr.getHost(), addr.getPort()); for (int i=0; i < num; i++) { - Connection conn = this.createConnection(addr.getHost(), addr.getPort()); - this.connections.addConnection(conn); + Connection conn = createConnection(addr.getHost(), addr.getPort()); + connections.addConnection(conn); + } //TODO: handle negative num? (i.e. if user lowered the poolsize we should kill some connections) } @@ -179,23 +179,27 @@ protected synchronized void connect() { } /** - * will run through and remove any connections that have not recieved a ping in the last 2 minutes. + * will run through and remove any connections that have not recieved a ping in the last N milliseconds */ public synchronized void cleanupOldConnections() { - Date cutoff = new Date(new Date().getTime() - (1000*60*2)); + Date cutoff = new Date(new Date().getTime() - CLEAN_UP_FREQUENCY); try { for (Connection c : this.connections.getConnections()) { if (cutoff.after(c.getLastHeartbeat())) { - log.warn("Removing dead connection: " + c.getHost() + ":" + c.getPort()); + LOGGER.warn("Removing dead connection [host={}, port={}]", c.getHost(), c.getPort()); c.close(); connections.remove(c); } } } catch (NoConnectionsException e) { - //ignore + //ignore - TODO: is it a good idea to swallow this exception? } } + public Connections getConnections(){ + return connections; + } + public void setMessagesPerBatch(int messagesPerBatch) { this.messagesPerBatch = messagesPerBatch; } @@ -209,7 +213,7 @@ public void setLookupPeriod(long periodMillis) { * @param connection */ public synchronized void _disconnected(Connection connection) { - log.warn("Disconnected!" + connection); + LOGGER.warn("Client disconnected [connection={}]", connection); this.connections.remove(connection); } @@ -217,6 +221,5 @@ public void close() { this.timer.cancel(); this.connections.close(); this.bootstrap.releaseExternalResources(); - } } diff --git a/src/main/java/com/trendrr/nsq/Connection.java b/src/main/java/com/trendrr/nsq/Connection.java index 8c96576..b053185 100644 --- a/src/main/java/com/trendrr/nsq/Connection.java +++ b/src/main/java/com/trendrr/nsq/Connection.java @@ -29,24 +29,23 @@ * */ public class Connection { + private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class); - protected static Logger log = LoggerFactory.getLogger(Connection.class); + private Channel channel; + private int heartbeats = 0; + private Date lastHeartbeat = new Date(); - Channel channel; - int heartbeats = 0; - Date lastHeartbeat = new Date(); + private NSQMessageCallback callback = null; + private AtomicLong totalMessages = new AtomicLong(0l); + private int messagesPerBatch = 200; - NSQMessageCallback callback = null; - AtomicLong totalMessages = new AtomicLong(0l); - int messagesPerBatch = 200; + private AbstractNSQClient client = null; - AbstractNSQClient client = null; + private String host = null; + private int port; - String host = null; - int port; - - LinkedBlockingQueue requests = new LinkedBlockingQueue(1); - LinkedBlockingQueue responses = new LinkedBlockingQueue(1); + private LinkedBlockingQueue requests = new LinkedBlockingQueue(1); + private LinkedBlockingQueue responses = new LinkedBlockingQueue(1); public Connection(String host, int port, Channel channel, AbstractNSQClient client) { @@ -94,7 +93,7 @@ public void incoming(NSQFrame frame) { try { this.responses.offer(frame, 20, TimeUnit.SECONDS); } catch (InterruptedException e) { - log.error("Incoming frame error", e); + LOGGER.error("Incoming frame error", e); //TODO: what to do here? we should probably disconnect! this.close(); } @@ -123,19 +122,20 @@ public void incoming(NSQFrame frame) { message.setMessage(msg.getMessageBody()); message.setTimestamp(new Date(TimeUnit.NANOSECONDS.toMillis(msg.getTimestamp()))); if (this.callback == null) { - log.warn("NO CAllback, dropping message: " + message); + LOGGER.warn("NO Callback, dropping message: " + message); } else { this.callback.message(message); } return; } - log.warn("Unknown frame type: " + frame); + LOGGER.warn("Unknown frame type: " + frame); } void heartbeat() { - log.info("HEARTBEAT!"); + // This should be logged at debug level - it's more useful for troubleshooting and should not interfere with actual INFO logs + LOGGER.debug("HEARTBEAT!"); this.heartbeats++; this.lastHeartbeat = new Date(); //send NOP here. @@ -184,9 +184,9 @@ public void close() { try { channel.close().await(10000); } catch (Exception x) { - log.error("Caught", x); + LOGGER.error("Caught", x); } - log.warn("Close called on connection: " + this); + LOGGER.warn("Close called on connection: " + this); this._disconnected(); } diff --git a/src/main/java/com/trendrr/nsq/ConnectionAddress.java b/src/main/java/com/trendrr/nsq/ConnectionAddress.java index 526fb5a..9475763 100644 --- a/src/main/java/com/trendrr/nsq/ConnectionAddress.java +++ b/src/main/java/com/trendrr/nsq/ConnectionAddress.java @@ -8,40 +8,64 @@ /** + * Holds the details of a connection * @author Dustin Norlander * @created Jan 22, 2013 * */ public class ConnectionAddress { - protected static Logger log = LoggerFactory.getLogger(ConnectionAddress.class); - - private int poolsize = 1; - - /** + private static Logger log = LoggerFactory.getLogger(ConnectionAddress.class); + + private static final int DEFAULT_POOL_SIZE = 1; + + private final String host; + private final int port; + private final int poolsize; + + public ConnectionAddress(String host, int port) { + this(host, port, DEFAULT_POOL_SIZE); + } + + public ConnectionAddress(String host, int port, int poolsize) { + this.host = host; + this.port = port; + this.poolsize = poolsize; + } + + /** * How many connections should we have in place? * @return */ public int getPoolsize() { return poolsize; } - public void setPoolsize(int poolsize) { - this.poolsize = poolsize; - } - - + public String getHost() { return host; } - public void setHost(String host) { - this.host = host; - } + public int getPort() { return port; } - public void setPort(int port) { - this.port = port; - } - private String host; - private int port; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ConnectionAddress that = (ConnectionAddress) o; + + if (port != that.port) return false; + if (!host.equals(that.host)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = host.hashCode(); + result = 31 * result + port; + return result; + } } diff --git a/src/main/java/com/trendrr/nsq/NSQConsumer.java b/src/main/java/com/trendrr/nsq/NSQConsumer.java index 7fcbef6..c187a86 100644 --- a/src/main/java/com/trendrr/nsq/NSQConsumer.java +++ b/src/main/java/com/trendrr/nsq/NSQConsumer.java @@ -5,6 +5,7 @@ import java.util.List; +import com.trendrr.nsq.lookup.NSQLookup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +18,7 @@ */ public class NSQConsumer extends AbstractNSQClient { - protected static Logger log = LoggerFactory.getLogger(NSQConsumer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(NSQConsumer.class); NSQLookup lookup; String topic = null; @@ -34,17 +35,31 @@ public NSQConsumer(NSQLookup lookup, String topic, String channel, NSQMessageCal @Override protected Connection createConnection(String address, int port) { - Connection conn = super.createConnection(address, port); - - conn.setCallback(callback); + Connection conn = super.createConnection(address, port); + return addCommands(conn); + } + + /** + * Adds all required commands to a subscriber connection + * @param conn the connection + * @return the passed connection + */ + protected Connection addCommands(Connection conn){ + if(conn == null){ + //Connection could be null here + LOGGER.error("Connection returned is null - cannot go further"); + return null; + } + + conn.setCallback(callback); /* * subscribe */ - conn.command(NSQCommand.instance("SUB " + topic + " " + this.channel)); - conn.command(NSQCommand.instance("RDY " + conn.getMessagesPerBatch())); - return conn; - - } + conn.command(NSQCommand.instance("SUB " + topic + " " + this.channel)); + conn.command(NSQCommand.instance("RDY " + conn.getMessagesPerBatch())); + return conn; + } + /* (non-Javadoc) * @see com.trendrr.nsq.AbstractNSQClient#lookupAddresses() */ diff --git a/src/main/java/com/trendrr/nsq/NSQProducer.java b/src/main/java/com/trendrr/nsq/NSQProducer.java index 24694e5..1505dc0 100644 --- a/src/main/java/com/trendrr/nsq/NSQProducer.java +++ b/src/main/java/com/trendrr/nsq/NSQProducer.java @@ -118,7 +118,7 @@ protected synchronized Connection getConn() throws NoConnectionsException { NoConnectionsException ex = new NoConnectionsException("no connections", null); for (int i=0; i < this.connectionRetries; i++) { try { - return this.connections.next(); + return getConnections().next(); } catch (NoConnectionsException x) { ex = x; try { @@ -211,16 +211,12 @@ public void produce(String topic, byte[] message) throws DisconnectedException, /** * Adds a new connection. - * @param host - * @param port - * @param poolsize + * @param host the host + * @param port the port + * @param poolsize the connection pool size */ public synchronized NSQProducer addAddress(String host, int port, int poolsize) { - ConnectionAddress addr = new ConnectionAddress(); - addr.setHost(host); - addr.setPoolsize(poolsize); - addr.setPort(port); - this.addresses.add(addr); + this.addresses.add(new ConnectionAddress(host, port, poolsize)); return this; } diff --git a/src/main/java/com/trendrr/nsq/NSQLookup.java b/src/main/java/com/trendrr/nsq/lookup/NSQLookup.java similarity index 66% rename from src/main/java/com/trendrr/nsq/NSQLookup.java rename to src/main/java/com/trendrr/nsq/lookup/NSQLookup.java index 83629d5..7d34136 100644 --- a/src/main/java/com/trendrr/nsq/NSQLookup.java +++ b/src/main/java/com/trendrr/nsq/lookup/NSQLookup.java @@ -1,15 +1,13 @@ -package com.trendrr.nsq; -/** - * - */ +package com.trendrr.nsq.lookup; +import com.trendrr.nsq.ConnectionAddress; import java.util.List; /** * An interface to the nsq lookup. We keep this as an interface because it depends on - * some json parsing library and we dont want to force a dependancy on a specific lib. + * some json parsing library and we don't want to force a dependency on a specific lib. * * * @author Dustin Norlander @@ -27,7 +25,7 @@ public interface NSQLookup { /** * Lookup topic addresses * @param topic - * @return + * @return the list of nsq servers where this topic is available */ public List lookup(String topic); } diff --git a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java index 02ba65f..9367963 100644 --- a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java +++ b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java @@ -1,5 +1,5 @@ /** - * + * */ package com.trendrr.nsq.lookup; @@ -17,82 +17,93 @@ import org.slf4j.LoggerFactory; import com.trendrr.nsq.ConnectionAddress; -import com.trendrr.nsq.NSQLookup; import com.trendrr.oss.DynMap; /** * Lookup implementation based on trendrr-oss DynMap - * - * + * * @author Dustin Norlander * @created Jan 23, 2013 - * */ public class NSQLookupDynMapImpl implements NSQLookup { + private static Logger LOGGER = LoggerFactory.getLogger(NSQLookupDynMapImpl.class); - protected static Logger log = LoggerFactory.getLogger(NSQLookupDynMapImpl.class); - - Set addresses = new HashSet (); - - - public void addAddr(String addr, int port) { - if (!addr.startsWith("http")) { - addr = "http://" + addr; - } - addr = addr + ":" + port; - this.addresses.add(addr); - } - - public List lookup(String topic) { - HashMap addresses = new HashMap(); - - for (String addr : this.addresses) { - DynMap mp = DynMap.instance(this.getHTML(addr + "/lookup?topic=" + topic), new DynMap()); - for (DynMap node : mp.getListOrEmpty(DynMap.class, "data.producers")) { - String host = node.getString("broadcast_address", node.getString("address")); - String key = host + ":" + node.getInteger("tcp_port"); - ConnectionAddress address = new ConnectionAddress(); - address.setHost(host); - address.setPort(node.getInteger("tcp_port")); - addresses.put(key, address); - } - } - return new ArrayList(addresses.values()); - } - - public String getHTML(String url) { - URL u; - HttpURLConnection conn = null; - BufferedReader rd = null; - String line; - String result = ""; - try { - u = new URL(url); - conn = (HttpURLConnection) u.openConnection(); - conn.setRequestMethod("GET"); - rd = new BufferedReader(new InputStreamReader(conn.getInputStream())); - while ((line = rd.readLine()) != null) { - result += line; - } - - } catch (Exception e) { - log.error("Caught", e); - } finally { - try { - if (rd != null){ - rd.close(); - } - } catch (Exception e) { - log.error("Caught", e); - } - - // Release memory and underlying resources on the HttpURLConnection otherwise we may run out of file descriptors and leak memory - if (conn != null){ - conn.disconnect(); - } - } - return result; - } - + //By default log errors every five minutes otherwise logs become terribly verbose + private volatile long errorLoggingInterval = 5 * 60 * 1000; + + private final Set addresses = new HashSet(); + + private volatile long lastError = 0; + + public void addAddr(String addr, int port) { + if (!addr.startsWith("http")) { + addr = "http://" + addr; + } + addr = addr + ":" + port; + this.addresses.add(addr); + } + + public List lookup(String topic) { + HashMap addresses = new HashMap(); + + for (String addr : this.addresses) { + DynMap mp = DynMap.instance(this.getHTML(addr + "/lookup?topic=" + topic), new DynMap()); + for (DynMap node : mp.getListOrEmpty(DynMap.class, "data.producers")) { + String host = node.getString("broadcast_address", node.getString("address")); + String key = host + ":" + node.getInteger("tcp_port"); + + addresses.put(key, new ConnectionAddress(host, node.getInteger("tcp_port"))); + } + } + return new ArrayList(addresses.values()); + } + + public String getHTML(String url) { + URL u; + HttpURLConnection conn = null; + BufferedReader rd = null; + String line; + String result = ""; + try { + u = new URL(url); + conn = (HttpURLConnection) u.openConnection(); + conn.setRequestMethod("GET"); + rd = new BufferedReader(new InputStreamReader(conn.getInputStream())); + while ((line = rd.readLine()) != null) { + result += line; + } + } catch (Exception e) { + logException("Caught an exception when trying to get nsq instances from lookup mechanism [url=" + url + "]", e); + } finally { + try { + if (rd != null){ + rd.close(); + } + } catch (Exception e) { + logException("Caught an exception when trying to close buffered reader", e); + } + //Make sure to close the HTTP connection as well, otherwise the file descriptors will remain open + if (conn != null){ + conn.disconnect(); + } + } + return result; + } + + private void logException(String message, Exception e) { + final long now = System.currentTimeMillis(); + if (now - lastError > errorLoggingInterval) { + LOGGER.error(message, e); + lastError = now; + } + } + + public long getErrorLoggingInterval() { + return errorLoggingInterval; + } + + public void setErrorLoggingInterval(long errorLoggingInterval) { + this.errorLoggingInterval = errorLoggingInterval; + } } diff --git a/src/main/java/com/trendrr/nsq/ExampleMain.java b/src/test/java/com/java/trendrr/nsq/ExampleMain.java similarity index 93% rename from src/main/java/com/trendrr/nsq/ExampleMain.java rename to src/test/java/com/java/trendrr/nsq/ExampleMain.java index e1dafb0..54d61de 100644 --- a/src/main/java/com/trendrr/nsq/ExampleMain.java +++ b/src/test/java/com/java/trendrr/nsq/ExampleMain.java @@ -1,4 +1,4 @@ -package com.trendrr.nsq; +package com.java.trendrr.nsq; /** * */ @@ -9,6 +9,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import com.trendrr.nsq.NSQConsumer; +import com.trendrr.nsq.NSQMessage; +import com.trendrr.nsq.NSQMessageCallback; +import com.trendrr.nsq.NSQProducer; +import com.trendrr.nsq.lookup.NSQLookup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +41,7 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc * PRODUCER. produce 50k messages */ //producer - NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1); + NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1); producer.start(); start = new Date(); String msg = StringHelper.randomString(10); diff --git a/src/main/java/SpeedTest.java b/src/test/java/com/java/trendrr/nsq/SpeedTest.java similarity index 97% rename from src/main/java/SpeedTest.java rename to src/test/java/com/java/trendrr/nsq/SpeedTest.java index 8101ffe..c8ce112 100644 --- a/src/main/java/SpeedTest.java +++ b/src/test/java/com/java/trendrr/nsq/SpeedTest.java @@ -1,4 +1,4 @@ -/** +package com.java.trendrr.nsq; /** * */ @@ -13,7 +13,7 @@ import com.trendrr.nsq.BatchCallback; import com.trendrr.nsq.NSQMessageCallback; import com.trendrr.nsq.NSQConsumer; -import com.trendrr.nsq.NSQLookup; +import com.trendrr.nsq.lookup.NSQLookup; import com.trendrr.nsq.NSQMessage; import com.trendrr.nsq.NSQProducer; import com.trendrr.nsq.lookup.NSQLookupDynMapImpl; diff --git a/src/test/java/com/trendrr/nsq/NSQConsumerTest.java b/src/test/java/com/trendrr/nsq/NSQConsumerTest.java new file mode 100644 index 0000000..2d32669 --- /dev/null +++ b/src/test/java/com/trendrr/nsq/NSQConsumerTest.java @@ -0,0 +1,46 @@ +package com.trendrr.nsq; + + +import com.trendrr.nsq.lookup.NSQLookup; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class NSQConsumerTest { + + private NSQConsumer nsqConsumer; + + private final String topic = "myTopic"; + + private final String channel = "myChannel"; + + private NSQLookup nsqLookup; + + private NSQMessageCallback callback; + + @Before + public void setUp() throws Exception { + nsqLookup = mock(NSQLookup.class); + callback = mock(NSQMessageCallback.class); + + nsqConsumer = new NSQConsumer(nsqLookup, topic, channel, callback); + } + + @Test + public void testAddCommandsToNullConnectionReturnsNull() throws Exception { + Assert.assertNull(nsqConsumer.addCommands(null)); + } + + @Test + public void testAddCommandsToNoNullConnectionReturnsOriginalConnection() throws Exception { + Connection mockConn = mock(Connection.class); + Assert.assertEquals(mockConn, nsqConsumer.addCommands(mockConn)); + + verify(mockConn, times(2)).command(any(NSQCommand.class)); + } +} diff --git a/src/test/resources/log4j-surefire.xml b/src/test/resources/log4j-surefire.xml new file mode 100644 index 0000000..b406525 --- /dev/null +++ b/src/test/resources/log4j-surefire.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file