From 3db422e89dc9688acd0f1b80a18c87dc13dd2508 Mon Sep 17 00:00:00 2001 From: Eddie Forson Date: Sun, 6 Oct 2013 01:09:23 +0100 Subject: [PATCH 1/7] Added Jackson Json parsing and started refactoring --- README.md | 2 +- pom.xml | 49 +++++ .../com/trendrr/nsq/AbstractNSQClient.java | 2 + src/main/java/com/trendrr/nsq/Connection.java | 9 +- .../com/trendrr/nsq/ConnectionAddress.java | 60 ++++-- .../java/com/trendrr/nsq/NSQConsumer.java | 1 + .../java/com/trendrr/nsq/NSQProducer.java | 12 +- .../nsq/lookup/CachingNSQLookupImpl.java | 116 +++++++++++ .../trendrr/nsq/{ => lookup}/NSQLookup.java | 10 +- .../nsq/lookup/NSQLookupDynMapImpl.java | 7 +- .../com/trendrr/nsq/model/HostAndPort.java | 56 ++++++ .../com/trendrr/nsq/model/LookupResponse.java | 181 ++++++++++++++++++ .../com/java}/trendrr/nsq/ExampleMain.java | 9 +- .../java/com/java/trendrr/nsq}/SpeedTest.java | 4 +- 14 files changed, 469 insertions(+), 49 deletions(-) create mode 100644 src/main/java/com/trendrr/nsq/lookup/CachingNSQLookupImpl.java rename src/main/java/com/trendrr/nsq/{ => lookup}/NSQLookup.java (66%) create mode 100644 src/main/java/com/trendrr/nsq/model/HostAndPort.java create mode 100644 src/main/java/com/trendrr/nsq/model/LookupResponse.java rename src/{main/java/com => test/java/com/java}/trendrr/nsq/ExampleMain.java (93%) rename src/{main/java => test/java/com/java/trendrr/nsq}/SpeedTest.java (97%) diff --git a/README.md b/README.md index 3f98eec..00257a1 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ for (int i=0; i < iterations; i++) { * [slf4j][slf4j] * [trendrr-oss][trendrr-oss] -Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.NSQLookup interface using a different json parser +Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.lookup.NSQLookup interface using a different json parser [nsq]: https://github.com/bitly/nsq diff --git a/pom.xml b/pom.xml index 730b42d..6e294c7 100644 --- a/pom.xml +++ b/pom.xml @@ -9,6 +9,12 @@ 1.1-SNAPSHOT + + com.trendrr.oss + trendrr-oss + 1.0 + + ch.qos.logback logback-classic @@ -32,6 +38,49 @@ slf4j-api 1.6.4 + + + com.fasterxml.jackson.core + jackson-annotations + 2.2.2 + + + + com.fasterxml.jackson.core + jackson-core + 2.2.2 + + + + com.fasterxml.jackson.core + jackson-databind + 2.2.2 + + + + com.google.guava + guava + r09 + + + + + + + h2o_repo + H2O releases repo + http://nexus01.elasticride.com:8081/nexus/content/repositories/releases + + + + h2o_repo + H2O snapshots repo + http://nexus01.elasticride.com:8081/nexus/content/repositories/snapshots + + + + + diff --git a/src/main/java/com/trendrr/nsq/AbstractNSQClient.java b/src/main/java/com/trendrr/nsq/AbstractNSQClient.java index 55a0076..d9ff1c1 100644 --- a/src/main/java/com/trendrr/nsq/AbstractNSQClient.java +++ b/src/main/java/com/trendrr/nsq/AbstractNSQClient.java @@ -67,6 +67,8 @@ public synchronized void start() { timer.cancel(); } timer = new Timer(); + + //TODO Use a scheduled executor service instead of a timer here timer.schedule(new TimerTask() { @Override public void run() { diff --git a/src/main/java/com/trendrr/nsq/Connection.java b/src/main/java/com/trendrr/nsq/Connection.java index 92675ab..3b51c9d 100644 --- a/src/main/java/com/trendrr/nsq/Connection.java +++ b/src/main/java/com/trendrr/nsq/Connection.java @@ -29,8 +29,7 @@ * */ public class Connection { - - protected static Logger log = LoggerFactory.getLogger(Connection.class); + private static Logger log = LoggerFactory.getLogger(Connection.class); Channel channel; int heartbeats = 0; @@ -87,9 +86,6 @@ public void setMessagesPerBatch(int messagesPerBatch) { } - - - public void incoming(NSQFrame frame) { if (frame instanceof ResponseFrame) { if ("_heartbeat_".equals(((ResponseFrame) frame).getMessage())) { @@ -127,7 +123,7 @@ public void incoming(NSQFrame frame) { message.setMessage(((MessageFrame) frame).getMessageBody()); message.setTimestamp(new Date(((MessageFrame) frame).getTimestamp())); if (this.callback == null) { - log.warn("NO CAllback, dropping message: " + message); + log.warn("NO Callback, dropping message: " + message); } else { this.callback.message(message); } @@ -138,7 +134,6 @@ public void incoming(NSQFrame frame) { void heartbeat() { - System.out.println("HEARTBEAT!"); this.heartbeats++; this.lastHeartbeat = new Date(); //send NOP here. diff --git a/src/main/java/com/trendrr/nsq/ConnectionAddress.java b/src/main/java/com/trendrr/nsq/ConnectionAddress.java index 526fb5a..9475763 100644 --- a/src/main/java/com/trendrr/nsq/ConnectionAddress.java +++ b/src/main/java/com/trendrr/nsq/ConnectionAddress.java @@ -8,40 +8,64 @@ /** + * Holds the details of a connection * @author Dustin Norlander * @created Jan 22, 2013 * */ public class ConnectionAddress { - protected static Logger log = LoggerFactory.getLogger(ConnectionAddress.class); - - private int poolsize = 1; - - /** + private static Logger log = LoggerFactory.getLogger(ConnectionAddress.class); + + private static final int DEFAULT_POOL_SIZE = 1; + + private final String host; + private final int port; + private final int poolsize; + + public ConnectionAddress(String host, int port) { + this(host, port, DEFAULT_POOL_SIZE); + } + + public ConnectionAddress(String host, int port, int poolsize) { + this.host = host; + this.port = port; + this.poolsize = poolsize; + } + + /** * How many connections should we have in place? * @return */ public int getPoolsize() { return poolsize; } - public void setPoolsize(int poolsize) { - this.poolsize = poolsize; - } - - + public String getHost() { return host; } - public void setHost(String host) { - this.host = host; - } + public int getPort() { return port; } - public void setPort(int port) { - this.port = port; - } - private String host; - private int port; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ConnectionAddress that = (ConnectionAddress) o; + + if (port != that.port) return false; + if (!host.equals(that.host)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = host.hashCode(); + result = 31 * result + port; + return result; + } } diff --git a/src/main/java/com/trendrr/nsq/NSQConsumer.java b/src/main/java/com/trendrr/nsq/NSQConsumer.java index 2899e25..abba8b7 100644 --- a/src/main/java/com/trendrr/nsq/NSQConsumer.java +++ b/src/main/java/com/trendrr/nsq/NSQConsumer.java @@ -5,6 +5,7 @@ import java.util.List; +import com.trendrr.nsq.lookup.NSQLookup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/trendrr/nsq/NSQProducer.java b/src/main/java/com/trendrr/nsq/NSQProducer.java index 8a045c6..e95d94f 100644 --- a/src/main/java/com/trendrr/nsq/NSQProducer.java +++ b/src/main/java/com/trendrr/nsq/NSQProducer.java @@ -213,16 +213,12 @@ public void produce(String topic, byte[] message) throws DisconnectedException, /** * Adds a new connection. - * @param host - * @param port - * @param poolsize + * @param host the host + * @param port the port + * @param poolsize the connection pool size */ public synchronized NSQProducer addAddress(String host, int port, int poolsize) { - ConnectionAddress addr = new ConnectionAddress(); - addr.setHost(host); - addr.setPoolsize(poolsize); - addr.setPort(port); - this.addresses.add(addr); + this.addresses.add(new ConnectionAddress(host, port, poolsize)); return this; } diff --git a/src/main/java/com/trendrr/nsq/lookup/CachingNSQLookupImpl.java b/src/main/java/com/trendrr/nsq/lookup/CachingNSQLookupImpl.java new file mode 100644 index 0000000..d45d65d --- /dev/null +++ b/src/main/java/com/trendrr/nsq/lookup/CachingNSQLookupImpl.java @@ -0,0 +1,116 @@ +package com.trendrr.nsq.lookup; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.trendrr.nsq.ConnectionAddress; +import com.trendrr.nsq.model.HostAndPort; +import com.trendrr.nsq.model.LookupResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; + +//TODO Need refactor AbstractNSQClient... +/** + * This implementation of {@link NSQLookup} uses Jackson as the underlying Json parser. + * Moreover, it caches the the connection details of producer of a given topic for a specified time. + * If no time is supplied, it will cache such details for 5 minutes + */ +public class CachingNSQLookupImpl implements NSQLookup{ + + private static Logger LOGGER = LoggerFactory.getLogger( CachingNSQLookupImpl.class); + + private static final long DEFAULT_LOOKUP_REFRESH_TIME = 5 * 60 * 1000; + + private static final String URL_SUFFIX = "/lookup?topic="; + + private static final String PROTOCOL = "http"; + + private final Set lookUpAddresses; + + private final ConcurrentMap> topicToConnections; + + private final long refreshInterval; + + public CachingNSQLookupImpl(){ + this(DEFAULT_LOOKUP_REFRESH_TIME); + } + + public CachingNSQLookupImpl(long refreshInterval) { + lookUpAddresses = new CopyOnWriteArraySet(); + this.refreshInterval = refreshInterval; + this.topicToConnections = new ConcurrentHashMap>(); + } + + /** + * Initialise the lookup refresher thread. For now it is not doing anything as I need to clean up more code in + * the AbstractNSQClient + */ + public void init(){ + //TODO Work on AbstractNSQClient so that lookup refresh logic is removed from this class. + } + + @Override + public void addAddr(String addr, int port) { + lookUpAddresses.add(new HostAndPort(addr, port)); + } + + @Override + public List lookup(String topic) { + InputStream is = null; + Set connectionAddresses = new HashSet(); + for(HostAndPort lookupAddress : lookUpAddresses){ + try{ + URL url = new URL(PROTOCOL, lookupAddress.getHost(), lookupAddress.getPort(), URL_SUFFIX + topic); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("GET"); + int responseCode = con.getResponseCode(); + + if(responseCode == HttpURLConnection.HTTP_OK){ + ObjectMapper mapper = new ObjectMapper(); + is = con.getInputStream(); + LookupResponse response = mapper.readValue(is, LookupResponse.class); + populateConnections(connectionAddresses, response.getLookupData()); + is.close(); + }else{ + LOGGER.warn("Received a response code that is not 200 [response-code=" + responseCode + ", lookup-address=" + + lookupAddress + ", topic=" + topic + "]"); + } + }catch(Exception e){ + LOGGER.error("An error occurred when trying to get producers for topic [lookup-addresses=" + lookupAddress + + ", topic=" + topic + "]", e); + }finally { + try{ + if(is != null){ + is.close(); + } + }catch(Exception e){ + LOGGER.error("An error occurred while trying to close input stream [lookup-address=" + lookupAddress + + ", topic=" + topic + "]", e); + } + } + } + + topicToConnections.put(topic, connectionAddresses); + return Lists.newArrayList(connectionAddresses); + } + + private void populateConnections(Set connections, + LookupResponse.LookupData lookupData){ + LookupResponse.ProducerDetails[] producers = lookupData.getProducers(); + if(producers == null){ + return; + } + for(LookupResponse.ProducerDetails producerDetails : producers){ + connections.add(new ConnectionAddress(producerDetails.getBroadcastAddress(), producerDetails.getTcpPort())); + } + } +} diff --git a/src/main/java/com/trendrr/nsq/NSQLookup.java b/src/main/java/com/trendrr/nsq/lookup/NSQLookup.java similarity index 66% rename from src/main/java/com/trendrr/nsq/NSQLookup.java rename to src/main/java/com/trendrr/nsq/lookup/NSQLookup.java index 83629d5..7d34136 100644 --- a/src/main/java/com/trendrr/nsq/NSQLookup.java +++ b/src/main/java/com/trendrr/nsq/lookup/NSQLookup.java @@ -1,15 +1,13 @@ -package com.trendrr.nsq; -/** - * - */ +package com.trendrr.nsq.lookup; +import com.trendrr.nsq.ConnectionAddress; import java.util.List; /** * An interface to the nsq lookup. We keep this as an interface because it depends on - * some json parsing library and we dont want to force a dependancy on a specific lib. + * some json parsing library and we don't want to force a dependency on a specific lib. * * * @author Dustin Norlander @@ -27,7 +25,7 @@ public interface NSQLookup { /** * Lookup topic addresses * @param topic - * @return + * @return the list of nsq servers where this topic is available */ public List lookup(String topic); } diff --git a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java index 781451a..835dd7a 100644 --- a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java +++ b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java @@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory; import com.trendrr.nsq.ConnectionAddress; -import com.trendrr.nsq.NSQLookup; import com.trendrr.oss.DynMap; @@ -52,10 +51,8 @@ public List lookup(String topic) { for (DynMap node : mp.getListOrEmpty(DynMap.class, "data.producers")) { String host = node.getString("broadcast_address", node.getString("address")); String key = host + ":" + node.getInteger("tcp_port"); - ConnectionAddress address = new ConnectionAddress(); - address.setHost(host); - address.setPort(node.getInteger("tcp_port")); - addresses.put(key, address); + + addresses.put(key, new ConnectionAddress(host, node.getInteger("tcp_port"))); } } return new ArrayList(addresses.values()); diff --git a/src/main/java/com/trendrr/nsq/model/HostAndPort.java b/src/main/java/com/trendrr/nsq/model/HostAndPort.java new file mode 100644 index 0000000..be22b11 --- /dev/null +++ b/src/main/java/com/trendrr/nsq/model/HostAndPort.java @@ -0,0 +1,56 @@ +package com.trendrr.nsq.model; + +import com.google.common.base.Preconditions; + +/** + * Simple container a host and port. + */ +public class HostAndPort { + + private String host; + + private int port; + + public HostAndPort(String host, int port) { + Preconditions.checkArgument(host != null && host.length() > 0, "Supplied host is invalid [host=" + host + "]"); + Preconditions.checkArgument(port > 0, "Supplied port is invalid [port=" + port + "]"); + this.host = host; + this.port = port; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HostAndPort that = (HostAndPort) o; + + if (port != that.port) return false; + if (host != null ? !host.equals(that.host) : that.host != null) return false; + + return true; + } + + @Override + public int hashCode() { + int result = host != null ? host.hashCode() : 0; + result = 31 * result + port; + return result; + } + + @Override + public String toString() { + return "HostAndPort{" + + "host='" + host + '\'' + + ", port=" + port + + '}'; + } +} diff --git a/src/main/java/com/trendrr/nsq/model/LookupResponse.java b/src/main/java/com/trendrr/nsq/model/LookupResponse.java new file mode 100644 index 0000000..af80b68 --- /dev/null +++ b/src/main/java/com/trendrr/nsq/model/LookupResponse.java @@ -0,0 +1,181 @@ +package com.trendrr.nsq.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Arrays; + +/** + * Models the response sent my NSQ upon lookup on a given topic. + * The class is annotated so that Jackson can easily populate it with + * json obtained from nsqlookupd + */ +public class LookupResponse { + + private int statusCode; + + private String statusText; + + private LookupData lookupData; + + @JsonProperty("status_code") + public int getStatusCode() { + return statusCode; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + @JsonProperty("status_txt") + public String getStatusText() { + return statusText; + } + + public void setStatusText(String statusText) { + this.statusText = statusText; + } + + @JsonProperty("data") + public LookupData getLookupData() { + return lookupData; + } + + public void setLookupData(LookupData lookupData) { + this.lookupData = lookupData; + } + + public static class LookupData{ + + private String[] channels; + + private ProducerDetails[] producers; + + @JsonProperty("channels") + private String[] getChannels() { + return channels; + } + + private void setChannels(String[] channels) { + this.channels = channels; + } + + @JsonProperty("producers") + public ProducerDetails[] getProducers() { + return producers; + } + + public void setProducers(ProducerDetails[] producers) { + this.producers = producers; + } + + @Override + public String toString() { + return "LookupData{" + + "channels=" + Arrays.toString(channels) + + ", producers=" + Arrays.toString(producers) + + '}'; + } + } + + public static class ProducerDetails{ + private String remoteAddress; + + private String address; + + private String hostName; + + private String broadcastAddress; + + private int tcpPort; + + private int httpPort; + + private String version; + + @JsonProperty("remote_address") + public String getRemoteAddress() { + return remoteAddress; + } + + public void setRemoteAddress(String remoteAddress) { + this.remoteAddress = remoteAddress; + } + + @JsonProperty("address") + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + @JsonProperty("hostname") + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + @JsonProperty("broadcast_address") + public String getBroadcastAddress() { + return broadcastAddress; + } + + public void setBroadcastAddress(String broadcastAddress) { + this.broadcastAddress = broadcastAddress; + } + + + @JsonProperty("tcp_port") + public int getTcpPort() { + return tcpPort; + } + + public void setTcpPort(int tcpPort) { + this.tcpPort = tcpPort; + } + + @JsonProperty("http_port") + public int getHttpPort() { + return httpPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } + + @JsonProperty("version") + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + @Override + public String toString() { + return "ProducerDetails{" + + "remoteAddress='" + remoteAddress + '\'' + + ", address='" + address + '\'' + + ", hostName='" + hostName + '\'' + + ", broadcastAddress='" + broadcastAddress + '\'' + + ", tcpPort=" + tcpPort + + ", httpPort=" + httpPort + + ", version='" + version + '\'' + + '}'; + } + } + + @Override + public String toString() { + return "LookupResponse{" + + "statusCode=" + statusCode + + ", statusText='" + statusText + '\'' + + ", lookupData=" + lookupData + + '}'; + } +} diff --git a/src/main/java/com/trendrr/nsq/ExampleMain.java b/src/test/java/com/java/trendrr/nsq/ExampleMain.java similarity index 93% rename from src/main/java/com/trendrr/nsq/ExampleMain.java rename to src/test/java/com/java/trendrr/nsq/ExampleMain.java index e1dafb0..54d61de 100644 --- a/src/main/java/com/trendrr/nsq/ExampleMain.java +++ b/src/test/java/com/java/trendrr/nsq/ExampleMain.java @@ -1,4 +1,4 @@ -package com.trendrr.nsq; +package com.java.trendrr.nsq; /** * */ @@ -9,6 +9,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import com.trendrr.nsq.NSQConsumer; +import com.trendrr.nsq.NSQMessage; +import com.trendrr.nsq.NSQMessageCallback; +import com.trendrr.nsq.NSQProducer; +import com.trendrr.nsq.lookup.NSQLookup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +41,7 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc * PRODUCER. produce 50k messages */ //producer - NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1); + NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1); producer.start(); start = new Date(); String msg = StringHelper.randomString(10); diff --git a/src/main/java/SpeedTest.java b/src/test/java/com/java/trendrr/nsq/SpeedTest.java similarity index 97% rename from src/main/java/SpeedTest.java rename to src/test/java/com/java/trendrr/nsq/SpeedTest.java index 8101ffe..c8ce112 100644 --- a/src/main/java/SpeedTest.java +++ b/src/test/java/com/java/trendrr/nsq/SpeedTest.java @@ -1,4 +1,4 @@ -/** +package com.java.trendrr.nsq; /** * */ @@ -13,7 +13,7 @@ import com.trendrr.nsq.BatchCallback; import com.trendrr.nsq.NSQMessageCallback; import com.trendrr.nsq.NSQConsumer; -import com.trendrr.nsq.NSQLookup; +import com.trendrr.nsq.lookup.NSQLookup; import com.trendrr.nsq.NSQMessage; import com.trendrr.nsq.NSQProducer; import com.trendrr.nsq.lookup.NSQLookupDynMapImpl; From dd5f26b2787a91fb98c3e00fc7b3d469c35591d0 Mon Sep 17 00:00:00 2001 From: Eddie Forson Date: Thu, 12 Dec 2013 23:14:40 +0000 Subject: [PATCH 2/7] Making sure we don't repeatedly log the NSQ error --- pom.xml | 34 ++++- .../nsq/lookup/NSQLookupDynMapImpl.java | 135 ++++++++++-------- src/test/resources/log4j-surefire.xml | 32 +++++ 3 files changed, 140 insertions(+), 61 deletions(-) create mode 100644 src/test/resources/log4j-surefire.xml diff --git a/pom.xml b/pom.xml index 6e294c7..592a754 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.dustismo trendrr-nsq-client - 1.1-SNAPSHOT + 1.4-SNAPSHOT @@ -83,4 +83,36 @@ + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.10 + + always + + file:${basedir}/src/test/resources/log4j-surefire.xml + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + verify + + jar-no-fork + + + + + + + diff --git a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java index 835dd7a..456bb14 100644 --- a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java +++ b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java @@ -1,5 +1,5 @@ /** - * + * */ package com.trendrr.nsq.lookup; @@ -22,68 +22,83 @@ /** * Lookup implementation based on trendrr-oss DynMap - * - * + * * @author Dustin Norlander * @created Jan 23, 2013 - * */ public class NSQLookupDynMapImpl implements NSQLookup { + private static Logger LOGGER = LoggerFactory.getLogger(NSQLookupDynMapImpl.class); - protected static Logger log = LoggerFactory.getLogger(NSQLookupDynMapImpl.class); - - Set addresses = new HashSet (); - - - public void addAddr(String addr, int port) { - if (!addr.startsWith("http")) { - addr = "http://" + addr; - } - addr = addr + ":" + port; - this.addresses.add(addr); - } - - public List lookup(String topic) { - HashMap addresses = new HashMap(); - - for (String addr : this.addresses) { - DynMap mp = DynMap.instance(this.getHTML(addr + "/lookup?topic=" + topic), new DynMap()); - for (DynMap node : mp.getListOrEmpty(DynMap.class, "data.producers")) { - String host = node.getString("broadcast_address", node.getString("address")); - String key = host + ":" + node.getInteger("tcp_port"); - - addresses.put(key, new ConnectionAddress(host, node.getInteger("tcp_port"))); - } - } - return new ArrayList(addresses.values()); - } - - public String getHTML(String url) { - URL u; - HttpURLConnection conn; - BufferedReader rd = null; - String line; - String result = ""; - try { - u = new URL(url); - conn = (HttpURLConnection) u.openConnection(); - conn.setRequestMethod("GET"); - rd = new BufferedReader(new InputStreamReader(conn.getInputStream())); - while ((line = rd.readLine()) != null) { - result += line; - } - - } catch (Exception e) { - log.error("Caught", e); - } finally { - try { - if (rd != null) - rd.close(); - } catch (Exception e) { - log.error("Caught", e); - } - } - return result; - } - + //By default log errors every five minutes otherwise logs become terribly verbose + private long errorLoggingInterval = 5 * 60 * 1000; + + private final Set addresses = new HashSet(); + + private volatile long lastError = 0; + + public void addAddr(String addr, int port) { + if (!addr.startsWith("http")) { + addr = "http://" + addr; + } + addr = addr + ":" + port; + this.addresses.add(addr); + } + + public List lookup(String topic) { + HashMap addresses = new HashMap(); + + for (String addr : this.addresses) { + DynMap mp = DynMap.instance(this.getHTML(addr + "/lookup?topic=" + topic), new DynMap()); + for (DynMap node : mp.getListOrEmpty(DynMap.class, "data.producers")) { + String host = node.getString("broadcast_address", node.getString("address")); + String key = host + ":" + node.getInteger("tcp_port"); + + addresses.put(key, new ConnectionAddress(host, node.getInteger("tcp_port"))); + } + } + return new ArrayList(addresses.values()); + } + + public String getHTML(String url) { + URL u; + HttpURLConnection conn; + BufferedReader rd = null; + String line; + String result = ""; + try { + u = new URL(url); + conn = (HttpURLConnection) u.openConnection(); + conn.setRequestMethod("GET"); + rd = new BufferedReader(new InputStreamReader(conn.getInputStream())); + while ((line = rd.readLine()) != null) { + result += line; + } + } catch (Exception e) { + logException("Caught an exception when trying to get nsq instances from lookup mechanism [url=" + url + "]", e); + } finally { + try { + if (rd != null) + rd.close(); + } catch (Exception e) { + logException("Caught an exception when trying to close buffered reader", e); + } + } + return result; + } + + private void logException(String message, Exception e) { + final long now = System.currentTimeMillis(); + if (now - lastError > errorLoggingInterval) { + LOGGER.error(message, e); + lastError = now; + } + } + + public long getErrorLoggingInterval() { + return errorLoggingInterval; + } + + public void setErrorLoggingInterval(long errorLoggingInterval) { + this.errorLoggingInterval = errorLoggingInterval; + } } diff --git a/src/test/resources/log4j-surefire.xml b/src/test/resources/log4j-surefire.xml new file mode 100644 index 0000000..b406525 --- /dev/null +++ b/src/test/resources/log4j-surefire.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From 07f72742dd733bbc4cb2bc4d95f63edcf1c0f323 Mon Sep 17 00:00:00 2001 From: Eddie Forson Date: Mon, 17 Feb 2014 22:16:38 +0000 Subject: [PATCH 3/7] Fixed memory leak due to non closing of http connection. Added Junit to deps. --- pom.xml | 9 ++++++++- .../java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java | 9 +++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 592a754..53f251b 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.dustismo trendrr-nsq-client - 1.4-SNAPSHOT + 1.5-SNAPSHOT @@ -63,6 +63,13 @@ r09 + + junit + junit + 4.11 + test + + diff --git a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java index 456bb14..8eeae10 100644 --- a/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java +++ b/src/main/java/com/trendrr/nsq/lookup/NSQLookupDynMapImpl.java @@ -61,7 +61,7 @@ public List lookup(String topic) { public String getHTML(String url) { URL u; - HttpURLConnection conn; + HttpURLConnection conn = null; BufferedReader rd = null; String line; String result = ""; @@ -77,11 +77,16 @@ public String getHTML(String url) { logException("Caught an exception when trying to get nsq instances from lookup mechanism [url=" + url + "]", e); } finally { try { - if (rd != null) + if (rd != null){ rd.close(); + } } catch (Exception e) { logException("Caught an exception when trying to close buffered reader", e); } + //Make sure to close the HTTP connection as well, otherwise the file descriptors will remain open + if (conn != null){ + conn.disconnect(); + } } return result; } From 4cb179165cff2be2114841bf0a8735ebaf2adb47 Mon Sep 17 00:00:00 2001 From: Eddie Forson Date: Tue, 15 Apr 2014 15:24:46 +0100 Subject: [PATCH 4/7] Fixed potential NPE thrown in AbstractConsumer + some code improvements --- pom.xml | 10 +++- .../com/trendrr/nsq/AbstractNSQClient.java | 51 +++++++++++-------- .../java/com/trendrr/nsq/NSQConsumer.java | 32 ++++++++---- .../java/com/trendrr/nsq/NSQProducer.java | 2 +- .../java/com/trendrr/nsq/NSQConsumerTest.java | 46 +++++++++++++++++ 5 files changed, 108 insertions(+), 33 deletions(-) create mode 100644 src/test/java/com/trendrr/nsq/NSQConsumerTest.java diff --git a/pom.xml b/pom.xml index 53f251b..053f508 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.dustismo trendrr-nsq-client - 1.5-SNAPSHOT + 1.6-SNAPSHOT @@ -70,6 +70,14 @@ test + + org.mockito + mockito-all + 1.9.5 + test + + + diff --git a/src/main/java/com/trendrr/nsq/AbstractNSQClient.java b/src/main/java/com/trendrr/nsq/AbstractNSQClient.java index d9ff1c1..fa4d2d2 100644 --- a/src/main/java/com/trendrr/nsq/AbstractNSQClient.java +++ b/src/main/java/com/trendrr/nsq/AbstractNSQClient.java @@ -13,8 +13,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledExecutorService; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.buffer.ChannelBuffer; @@ -22,7 +21,6 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,20 +37,22 @@ */ public abstract class AbstractNSQClient { - protected static Logger log = LoggerFactory.getLogger(AbstractNSQClient.class); - - + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNSQClient.class); + + private static final int CLEAN_UP_FREQUENCY = 1000 * 60 * 2; + /** * Protocol version sent to nsqd on initial connect */ - public static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes(); - public static long LOOKUP_PERIOD = 60*1000; //how often to recheck for new nodes (and clean up non responsive nodes) + private static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes(); + private static long LOOKUP_PERIOD = 60*1000; //how often to recheck for new nodes (and clean up non responsive nodes) - Connections connections = new Connections(); + private final Connections connections = new Connections(); // Configure the client. - protected ClientBootstrap bootstrap = null; - protected Timer timer = null; + private ClientBootstrap bootstrap = null; + private Timer timer = null; + //this executor is where the callback code is handled protected Executor executor = Executors.newSingleThreadExecutor(); @@ -131,10 +131,11 @@ protected Connection createConnection(String address, int port) { // Wait until the connection attempt succeeds or fails. Channel channel = future.awaitUninterruptibly().getChannel(); if (!future.isSuccess()) { - log.error("Caught", future.getCause()); + LOGGER.error("Unable to create connection, caught: ", future.getCause()); return null; } - log.warn("Creating connection: " + address + " : " + port); + + LOGGER.warn("Creating connection: " + address + " : " + port); Connection conn = new Connection(address, port, channel, this); ChannelBuffer buf = ChannelBuffers.dynamicBuffer(); buf.writeBytes(MAGIC_PROTOCOL_VERSION); @@ -151,7 +152,7 @@ protected Connection createConnection(String address, int port) { conn.command(ident); } catch (UnknownHostException e) { - log.error("Caught", e); + LOGGER.error("Caught", e); } @@ -178,10 +179,11 @@ protected synchronized void connect() { for (ConnectionAddress addr : addresses ) { - int num = addr.getPoolsize() - this.connections.connectionSize(addr.getHost(), addr.getPort()); + int num = addr.getPoolsize() - connections.connectionSize(addr.getHost(), addr.getPort()); for (int i=0; i < num; i++) { - Connection conn = this.createConnection(addr.getHost(), addr.getPort()); - this.connections.addConnection(conn); + Connection conn = createConnection(addr.getHost(), addr.getPort()); + connections.addConnection(conn); + } //TODO: handle negative num? (i.e. if user lowered the poolsize we should kill some connections) } @@ -192,11 +194,11 @@ protected synchronized void connect() { * will run through and remove any connections that have not recieved a ping in the last 2 minutes. */ public synchronized void cleanupOldConnections() { - Date cutoff = new Date(new Date().getTime() - (1000*60*2)); + Date cutoff = new Date(new Date().getTime() - CLEAN_UP_FREQUENCY); try { for (Connection c : this.connections.getConnections()) { if (cutoff.after(c.getLastHeartbeat())) { - log.warn("Removing dead connection: " + c.getHost() + ":" + c.getPort()); + LOGGER.warn("Removing dead connection [host={}, port={}]", c.getHost(), c.getPort()); c.close(); connections.remove(c); } @@ -205,17 +207,22 @@ public synchronized void cleanupOldConnections() { //ignore } } - + + public Connections getConnections(){ + return connections; + } + + /** * for internal use. called when a connection is disconnected * @param connection */ public synchronized void _disconnected(Connection connection) { - log.warn("Disconnected!" + connection); + LOGGER.warn("Client disconnected [connection={}]", connection); this.connections.remove(connection); } - public void close() { + public synchronized void close() { this.timer.cancel(); this.connections.close(); this.bootstrap.releaseExternalResources(); diff --git a/src/main/java/com/trendrr/nsq/NSQConsumer.java b/src/main/java/com/trendrr/nsq/NSQConsumer.java index abba8b7..732445f 100644 --- a/src/main/java/com/trendrr/nsq/NSQConsumer.java +++ b/src/main/java/com/trendrr/nsq/NSQConsumer.java @@ -18,7 +18,7 @@ */ public class NSQConsumer extends AbstractNSQClient { - protected static Logger log = LoggerFactory.getLogger(NSQConsumer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(NSQConsumer.class); NSQLookup lookup; String topic = null; @@ -35,17 +35,31 @@ public NSQConsumer(NSQLookup lookup, String topic, String channel, NSQMessageCal @Override protected Connection createConnection(String address, int port) { - Connection conn = super.createConnection(address, port); - - conn.setCallback(callback); + Connection conn = super.createConnection(address, port); + return addCommands(conn); + } + + /** + * Adds all required commands to a subscriber connection + * @param conn the connection + * @return the passed connection + */ + protected Connection addCommands(Connection conn){ + if(conn == null){ + //Connection could be null here + LOGGER.error("Connection returned is null - cannot go further"); + return null; + } + + conn.setCallback(callback); /* * subscribe */ - conn.command(NSQCommand.instance("SUB " + topic + " " + this.channel)); - conn.command(NSQCommand.instance("RDY " + conn.getMessagesPerBatch())); - return conn; - - } + conn.command(NSQCommand.instance("SUB " + topic + " " + this.channel)); + conn.command(NSQCommand.instance("RDY " + conn.getMessagesPerBatch())); + return conn; + } + /* (non-Javadoc) * @see com.trendrr.nsq.AbstractNSQClient#lookupAddresses() */ diff --git a/src/main/java/com/trendrr/nsq/NSQProducer.java b/src/main/java/com/trendrr/nsq/NSQProducer.java index e95d94f..b710ec1 100644 --- a/src/main/java/com/trendrr/nsq/NSQProducer.java +++ b/src/main/java/com/trendrr/nsq/NSQProducer.java @@ -120,7 +120,7 @@ protected synchronized Connection getConn() throws NoConnectionsException { NoConnectionsException ex = new NoConnectionsException("no connections", null); for (int i=0; i < this.connectionRetries; i++) { try { - return this.connections.next(); + return getConnections().next(); } catch (NoConnectionsException x) { ex = x; try { diff --git a/src/test/java/com/trendrr/nsq/NSQConsumerTest.java b/src/test/java/com/trendrr/nsq/NSQConsumerTest.java new file mode 100644 index 0000000..2d32669 --- /dev/null +++ b/src/test/java/com/trendrr/nsq/NSQConsumerTest.java @@ -0,0 +1,46 @@ +package com.trendrr.nsq; + + +import com.trendrr.nsq.lookup.NSQLookup; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class NSQConsumerTest { + + private NSQConsumer nsqConsumer; + + private final String topic = "myTopic"; + + private final String channel = "myChannel"; + + private NSQLookup nsqLookup; + + private NSQMessageCallback callback; + + @Before + public void setUp() throws Exception { + nsqLookup = mock(NSQLookup.class); + callback = mock(NSQMessageCallback.class); + + nsqConsumer = new NSQConsumer(nsqLookup, topic, channel, callback); + } + + @Test + public void testAddCommandsToNullConnectionReturnsNull() throws Exception { + Assert.assertNull(nsqConsumer.addCommands(null)); + } + + @Test + public void testAddCommandsToNoNullConnectionReturnsOriginalConnection() throws Exception { + Connection mockConn = mock(Connection.class); + Assert.assertEquals(mockConn, nsqConsumer.addCommands(mockConn)); + + verify(mockConn, times(2)).command(any(NSQCommand.class)); + } +} From 2a04caabc14b622bccf3a29ac9f65c95e592e51e Mon Sep 17 00:00:00 2001 From: Eddie Forson Date: Tue, 15 Apr 2014 16:35:08 +0100 Subject: [PATCH 5/7] Clean up --- pom.xml | 2 +- .../nsq/lookup/CachingNSQLookupImpl.java | 116 ------------------ 2 files changed, 1 insertion(+), 117 deletions(-) delete mode 100644 src/main/java/com/trendrr/nsq/lookup/CachingNSQLookupImpl.java diff --git a/pom.xml b/pom.xml index 053f508..c9f0bca 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.dustismo trendrr-nsq-client - 1.6-SNAPSHOT + 1.7-SNAPSHOT diff --git a/src/main/java/com/trendrr/nsq/lookup/CachingNSQLookupImpl.java b/src/main/java/com/trendrr/nsq/lookup/CachingNSQLookupImpl.java deleted file mode 100644 index d45d65d..0000000 --- a/src/main/java/com/trendrr/nsq/lookup/CachingNSQLookupImpl.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.trendrr.nsq.lookup; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.trendrr.nsq.ConnectionAddress; -import com.trendrr.nsq.model.HostAndPort; -import com.trendrr.nsq.model.LookupResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArraySet; - -//TODO Need refactor AbstractNSQClient... -/** - * This implementation of {@link NSQLookup} uses Jackson as the underlying Json parser. - * Moreover, it caches the the connection details of producer of a given topic for a specified time. - * If no time is supplied, it will cache such details for 5 minutes - */ -public class CachingNSQLookupImpl implements NSQLookup{ - - private static Logger LOGGER = LoggerFactory.getLogger( CachingNSQLookupImpl.class); - - private static final long DEFAULT_LOOKUP_REFRESH_TIME = 5 * 60 * 1000; - - private static final String URL_SUFFIX = "/lookup?topic="; - - private static final String PROTOCOL = "http"; - - private final Set lookUpAddresses; - - private final ConcurrentMap> topicToConnections; - - private final long refreshInterval; - - public CachingNSQLookupImpl(){ - this(DEFAULT_LOOKUP_REFRESH_TIME); - } - - public CachingNSQLookupImpl(long refreshInterval) { - lookUpAddresses = new CopyOnWriteArraySet(); - this.refreshInterval = refreshInterval; - this.topicToConnections = new ConcurrentHashMap>(); - } - - /** - * Initialise the lookup refresher thread. For now it is not doing anything as I need to clean up more code in - * the AbstractNSQClient - */ - public void init(){ - //TODO Work on AbstractNSQClient so that lookup refresh logic is removed from this class. - } - - @Override - public void addAddr(String addr, int port) { - lookUpAddresses.add(new HostAndPort(addr, port)); - } - - @Override - public List lookup(String topic) { - InputStream is = null; - Set connectionAddresses = new HashSet(); - for(HostAndPort lookupAddress : lookUpAddresses){ - try{ - URL url = new URL(PROTOCOL, lookupAddress.getHost(), lookupAddress.getPort(), URL_SUFFIX + topic); - HttpURLConnection con = (HttpURLConnection) url.openConnection(); - con.setRequestMethod("GET"); - int responseCode = con.getResponseCode(); - - if(responseCode == HttpURLConnection.HTTP_OK){ - ObjectMapper mapper = new ObjectMapper(); - is = con.getInputStream(); - LookupResponse response = mapper.readValue(is, LookupResponse.class); - populateConnections(connectionAddresses, response.getLookupData()); - is.close(); - }else{ - LOGGER.warn("Received a response code that is not 200 [response-code=" + responseCode + ", lookup-address=" + - lookupAddress + ", topic=" + topic + "]"); - } - }catch(Exception e){ - LOGGER.error("An error occurred when trying to get producers for topic [lookup-addresses=" + lookupAddress + - ", topic=" + topic + "]", e); - }finally { - try{ - if(is != null){ - is.close(); - } - }catch(Exception e){ - LOGGER.error("An error occurred while trying to close input stream [lookup-address=" + lookupAddress + - ", topic=" + topic + "]", e); - } - } - } - - topicToConnections.put(topic, connectionAddresses); - return Lists.newArrayList(connectionAddresses); - } - - private void populateConnections(Set connections, - LookupResponse.LookupData lookupData){ - LookupResponse.ProducerDetails[] producers = lookupData.getProducers(); - if(producers == null){ - return; - } - for(LookupResponse.ProducerDetails producerDetails : producers){ - connections.add(new ConnectionAddress(producerDetails.getBroadcastAddress(), producerDetails.getTcpPort())); - } - } -} From 9615bb98ebbc02f3e200fda23eef9fd0971a2356 Mon Sep 17 00:00:00 2001 From: Eddie Forson Date: Tue, 15 Apr 2014 16:36:51 +0100 Subject: [PATCH 6/7] Upgrading original trendrr library version --- pom.xml | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pom.xml b/pom.xml index c9f0bca..4b5e6ff 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.dustismo trendrr-nsq-client - 1.7-SNAPSHOT + 1.3-SNAPSHOT @@ -81,23 +81,6 @@ - - - - h2o_repo - H2O releases repo - http://nexus01.elasticride.com:8081/nexus/content/repositories/releases - - - - h2o_repo - H2O snapshots repo - http://nexus01.elasticride.com:8081/nexus/content/repositories/snapshots - - - - - From 51d76c69af7d031fae43d36a804947fdf837575d Mon Sep 17 00:00:00 2001 From: Eddie Forson Date: Sun, 11 May 2014 17:09:38 +0100 Subject: [PATCH 7/7] Deleted model package. Not using it for now --- .../com/trendrr/nsq/model/HostAndPort.java | 56 ------ .../com/trendrr/nsq/model/LookupResponse.java | 181 ------------------ 2 files changed, 237 deletions(-) delete mode 100644 src/main/java/com/trendrr/nsq/model/HostAndPort.java delete mode 100644 src/main/java/com/trendrr/nsq/model/LookupResponse.java diff --git a/src/main/java/com/trendrr/nsq/model/HostAndPort.java b/src/main/java/com/trendrr/nsq/model/HostAndPort.java deleted file mode 100644 index be22b11..0000000 --- a/src/main/java/com/trendrr/nsq/model/HostAndPort.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.trendrr.nsq.model; - -import com.google.common.base.Preconditions; - -/** - * Simple container a host and port. - */ -public class HostAndPort { - - private String host; - - private int port; - - public HostAndPort(String host, int port) { - Preconditions.checkArgument(host != null && host.length() > 0, "Supplied host is invalid [host=" + host + "]"); - Preconditions.checkArgument(port > 0, "Supplied port is invalid [port=" + port + "]"); - this.host = host; - this.port = port; - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - HostAndPort that = (HostAndPort) o; - - if (port != that.port) return false; - if (host != null ? !host.equals(that.host) : that.host != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = host != null ? host.hashCode() : 0; - result = 31 * result + port; - return result; - } - - @Override - public String toString() { - return "HostAndPort{" + - "host='" + host + '\'' + - ", port=" + port + - '}'; - } -} diff --git a/src/main/java/com/trendrr/nsq/model/LookupResponse.java b/src/main/java/com/trendrr/nsq/model/LookupResponse.java deleted file mode 100644 index af80b68..0000000 --- a/src/main/java/com/trendrr/nsq/model/LookupResponse.java +++ /dev/null @@ -1,181 +0,0 @@ -package com.trendrr.nsq.model; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Arrays; - -/** - * Models the response sent my NSQ upon lookup on a given topic. - * The class is annotated so that Jackson can easily populate it with - * json obtained from nsqlookupd - */ -public class LookupResponse { - - private int statusCode; - - private String statusText; - - private LookupData lookupData; - - @JsonProperty("status_code") - public int getStatusCode() { - return statusCode; - } - - public void setStatusCode(int statusCode) { - this.statusCode = statusCode; - } - - @JsonProperty("status_txt") - public String getStatusText() { - return statusText; - } - - public void setStatusText(String statusText) { - this.statusText = statusText; - } - - @JsonProperty("data") - public LookupData getLookupData() { - return lookupData; - } - - public void setLookupData(LookupData lookupData) { - this.lookupData = lookupData; - } - - public static class LookupData{ - - private String[] channels; - - private ProducerDetails[] producers; - - @JsonProperty("channels") - private String[] getChannels() { - return channels; - } - - private void setChannels(String[] channels) { - this.channels = channels; - } - - @JsonProperty("producers") - public ProducerDetails[] getProducers() { - return producers; - } - - public void setProducers(ProducerDetails[] producers) { - this.producers = producers; - } - - @Override - public String toString() { - return "LookupData{" + - "channels=" + Arrays.toString(channels) + - ", producers=" + Arrays.toString(producers) + - '}'; - } - } - - public static class ProducerDetails{ - private String remoteAddress; - - private String address; - - private String hostName; - - private String broadcastAddress; - - private int tcpPort; - - private int httpPort; - - private String version; - - @JsonProperty("remote_address") - public String getRemoteAddress() { - return remoteAddress; - } - - public void setRemoteAddress(String remoteAddress) { - this.remoteAddress = remoteAddress; - } - - @JsonProperty("address") - public String getAddress() { - return address; - } - - public void setAddress(String address) { - this.address = address; - } - - @JsonProperty("hostname") - public String getHostName() { - return hostName; - } - - public void setHostName(String hostName) { - this.hostName = hostName; - } - - @JsonProperty("broadcast_address") - public String getBroadcastAddress() { - return broadcastAddress; - } - - public void setBroadcastAddress(String broadcastAddress) { - this.broadcastAddress = broadcastAddress; - } - - - @JsonProperty("tcp_port") - public int getTcpPort() { - return tcpPort; - } - - public void setTcpPort(int tcpPort) { - this.tcpPort = tcpPort; - } - - @JsonProperty("http_port") - public int getHttpPort() { - return httpPort; - } - - public void setHttpPort(int httpPort) { - this.httpPort = httpPort; - } - - @JsonProperty("version") - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - @Override - public String toString() { - return "ProducerDetails{" + - "remoteAddress='" + remoteAddress + '\'' + - ", address='" + address + '\'' + - ", hostName='" + hostName + '\'' + - ", broadcastAddress='" + broadcastAddress + '\'' + - ", tcpPort=" + tcpPort + - ", httpPort=" + httpPort + - ", version='" + version + '\'' + - '}'; - } - } - - @Override - public String toString() { - return "LookupResponse{" + - "statusCode=" + statusCode + - ", statusText='" + statusText + '\'' + - ", lookupData=" + lookupData + - '}'; - } -}