Skip to content

Commit

Permalink
Merge pull request #58 from PolinaRodimova/master
Browse files Browse the repository at this point in the history
  • Loading branch information
gzussa authored Feb 14, 2019
2 parents 5124961 + 5f2b9e4 commit 90771e0
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 176 deletions.
100 changes: 27 additions & 73 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
package com.timgroup.statsd;

import jnr.unixsocket.UnixDatagramChannel;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketOptions;

import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixDatagramChannel;
import jnr.unixsocket.UnixSocketOptions;


/**
Expand Down Expand Up @@ -53,9 +50,8 @@
* @author Tom Denley
*
*/
public final class NonBlockingStatsDClient implements StatsDClient {
public class NonBlockingStatsDClient implements StatsDClient {

private static final int PACKET_SIZE_BYTES = 1400;
private static final int SOCKET_TIMEOUT_MS = 100;
private static final int SOCKET_BUFFER_BYTES = -1;

Expand Down Expand Up @@ -123,7 +119,7 @@ protected NumberFormat initialValue() {
}
});

private final BlockingQueue<String> queue;
private final StatsDSender statsDSender;

/**
* Create a new StatsD client communicating with a StatsD instance on the
Expand Down Expand Up @@ -420,8 +416,13 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String
} catch (final Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
queue = new LinkedBlockingQueue<String>(queueSize);
executor.submit(new QueueConsumer(addressLookup));
statsDSender = createSender(addressLookup, queueSize, handler, clientChannel);
executor.submit(statsDSender);
}

protected StatsDSender createSender(final Callable<SocketAddress> addressLookup, final int queueSize,
final StatsDClientErrorHandler handler, final DatagramChannel clientChannel) {
return new StatsDSender(addressLookup, queueSize, handler, clientChannel);
}

/**
Expand All @@ -431,8 +432,19 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String
@Override
public void stop() {
try {
statsDSender.shutdown();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
if (!executor.isTerminated()) {
executor.shutdownNow();
}
} catch (Exception e) {
handler.handle(e);
if (!executor.isTerminated()) {
executor.shutdownNow();
}
}
}
catch (final Exception e) {
handler.handle(e);
Expand Down Expand Up @@ -1069,71 +1081,13 @@ public void recordSetValue(final String aspect, final String value, final String
}

private void send(final String message) {
queue.offer(message);
statsDSender.send(message);
}

private boolean isInvalidSample(double sampleRate) {
return sampleRate != 1 && ThreadLocalRandom.current().nextDouble() > sampleRate;
}

public static final Charset MESSAGE_CHARSET = Charset.forName("UTF-8");


private class QueueConsumer implements Runnable {
private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES);
private final Callable<SocketAddress> addressLookup;



QueueConsumer(final Callable<SocketAddress> addressLookup) {
this.addressLookup = addressLookup;
}

@Override public void run() {
while(!executor.isShutdown()) {
try {
final String message = queue.poll(1, TimeUnit.SECONDS);
if(null != message) {
final SocketAddress address = addressLookup.call();
final byte[] data = message.getBytes(MESSAGE_CHARSET);
if(sendBuffer.remaining() < (data.length + 1)) {
blockingSend(address);
}
if(sendBuffer.position() > 0) {
sendBuffer.put( (byte) '\n');
}
sendBuffer.put(data);
if(null == queue.peek()) {
blockingSend(address);
}
}
} catch (final Exception e) {
handler.handle(e);
}
}
}

private void blockingSend(final SocketAddress address) throws IOException {
final int sizeOfBuffer = sendBuffer.position();
sendBuffer.flip();

final int sentBytes = clientChannel.send(sendBuffer, address);
sendBuffer.limit(sendBuffer.capacity());
sendBuffer.rewind();

if (sizeOfBuffer != sentBytes) {
handler.handle(
new IOException(
String.format(
"Could not send entirely stat %s to %s. Only sent %d bytes out of %d bytes",
sendBuffer.toString(),
address.toString(),
sentBytes,
sizeOfBuffer)));
}
}
}

/**
* Create dynamic lookup for the given host name and port.
*
Expand Down
108 changes: 108 additions & 0 deletions src/main/java/com/timgroup/statsd/StatsDSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.timgroup.statsd;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class StatsDSender implements Runnable {
private static final Charset MESSAGE_CHARSET = Charset.forName("UTF-8");
private static final int PACKET_SIZE_BYTES = 1400;


private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES);
private final Callable<SocketAddress> addressLookup;
private final BlockingQueue<String> queue;
private final StatsDClientErrorHandler handler;
private final DatagramChannel clientChannel;

private volatile boolean shutdown;


StatsDSender(final Callable<SocketAddress> addressLookup, final int queueSize,
final StatsDClientErrorHandler handler, final DatagramChannel clientChannel) {
this(addressLookup, new LinkedBlockingQueue<String>(queueSize), handler, clientChannel);
}

StatsDSender(final Callable<SocketAddress> addressLookup, final BlockingQueue<String> queue,
final StatsDClientErrorHandler handler, final DatagramChannel clientChannel) {
this.addressLookup = addressLookup;
this.queue = queue;
this.handler = handler;
this.clientChannel = clientChannel;
}


boolean send(final String message) {
if (!shutdown) {
queue.offer(message);
return true;
}
return false;
}

@Override
public void run() {
while (!(queue.isEmpty() && shutdown)) {
try {
if (Thread.interrupted()) {
return;
}
final String message = queue.poll(1, TimeUnit.SECONDS);
if (null != message) {
final SocketAddress address = addressLookup.call();
final byte[] data = message.getBytes(MESSAGE_CHARSET);
if (sendBuffer.remaining() < (data.length + 1)) {
blockingSend(address);
}
if (sendBuffer.position() > 0) {
sendBuffer.put((byte) '\n');
}
sendBuffer.put(data);
if (null == queue.peek()) {
blockingSend(address);
}
}
} catch (final InterruptedException e) {
if (shutdown) {
return;
}
} catch (final Exception e) {
handler.handle(e);
}
}
}

private void blockingSend(final SocketAddress address) throws IOException {
final int sizeOfBuffer = sendBuffer.position();
sendBuffer.flip();

final int sentBytes = clientChannel.send(sendBuffer, address);
sendBuffer.limit(sendBuffer.capacity());
sendBuffer.rewind();

if (sizeOfBuffer != sentBytes) {
handler.handle(
new IOException(
String.format(
"Could not send entirely stat %s to %s. Only sent %d bytes out of %d bytes",
sendBuffer.toString(),
address.toString(),
sentBytes,
sizeOfBuffer)));
}
}

boolean isShutdown() {
return shutdown;
}

void shutdown() {
shutdown = true;
}
}
6 changes: 5 additions & 1 deletion src/test/java/com/timgroup/statsd/DummyStatsDServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ public void unfreeze() {
}

public void close() throws IOException {
server.close();
try {
server.close();
} catch (Exception e) {
//ignore
}
}

public void clear() {
Expand Down
Loading

0 comments on commit 90771e0

Please sign in to comment.