Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Dogstatsd 1.1 protocol for distributions #216

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/main/java/com/timgroup/statsd/NoOpStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ public final class NoOpStatsDClient implements StatsDClient {

@Override public void recordDistributionValue(String aspect, long value, double sampleRate, String... tags) { }

@Override
public void recordDistributionValue(String aspect, double[] values, String... tags) { }

@Override
public void recordDistributionValue(String aspect, double[] values, double sampleRate, String... tags) { }

@Override
public void recordDistributionValue(String aspect, long[] values, String... tags) { }

@Override
public void recordDistributionValue(String aspect, long[] values, double sampleRate, String... tags) { }

@Override
public void distribution(String aspect, double[] values, String... tags) { }

@Override
public void distribution(String aspect, double[] values, double sampleRate, String... tags) { }

@Override
public void distribution(String aspect, long[] values, String... tags) { }

@Override
public void distribution(String aspect, long[] values, double sampleRate, String... tags) { }

@Override public void distribution(String aspect, double value, String... tags) { }

@Override public void distribution(String aspect, double value, double sampleRate, String... tags) { }
Expand Down
189 changes: 188 additions & 1 deletion src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;


/**
Expand Down Expand Up @@ -531,6 +530,80 @@ public boolean canAggregate() {
protected abstract void writeValue(StringBuilder builder);
}

abstract class MultiValuedStatsDMessage extends Message {
private final double sampleRate; // NaN for none
private final long timestamp; // zero for none

MultiValuedStatsDMessage(String aspect, Message.Type type, String[] tags, double sampleRate, long timestamp) {
super(aspect, type, tags);
this.sampleRate = sampleRate;
this.timestamp = timestamp;
}

@Override
public final boolean canAggregate() {
return false;
}

@Override
public final void aggregate(Message message) { }

@Override
public final void writeTo(StringBuilder builder, String containerID) {
builder.append(prefix).append(aspect);
writeValuesTo(builder);
builder.append('|').append(type);
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
}
if (timestamp != 0) {
builder.append("|T").append(timestamp);
}
tagString(tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}

protected abstract void writeValuesTo(StringBuilder builder);
}

final class LongsStatsDMessage extends MultiValuedStatsDMessage {
private final long[] values;

LongsStatsDMessage(String aspect, Message.Type type, long[] values, double sampleRate, long timestamp, String[] tags) {
super(aspect, type, tags, sampleRate, timestamp);
this.values = values;
}

@Override
protected void writeValuesTo(StringBuilder builder) {
for (long value: values) {
builder.append(':').append(value);
}
}
}

final class DoublesStatsDMessage extends MultiValuedStatsDMessage {
private final double[] values;

DoublesStatsDMessage(String aspect, Message.Type type, double[] values, double sampleRate, long timestamp,
String[] tags) {
super(aspect, type, tags, sampleRate,timestamp);
this.values = values;
}

@Override
protected void writeValuesTo(StringBuilder builder) {
for (double value: values) {
builder.append(':').append(value);
}
}
}



private boolean sendMetric(final Message message) {
return send(message);
Expand Down Expand Up @@ -609,6 +682,56 @@ private void send(String aspect, final long value, Message.Type type, String[] t
send(aspect, value, type, Double.NaN, 0, tags);
}

private void send(String aspect, long[] values, Message.Type type, double sampleRate, long timestamp, String[] tags) {
if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) {
switch (type) {
case COUNT:
sampleRate = Double.NaN;
break;
default:
break;
}
}

if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
sendMetric(new LongsStatsDMessage(aspect, type, values, sampleRate, timestamp, tags));
}
}

private void send(String aspect, long[] values, Message.Type type, double sampleRate, String[] tags) {
send(aspect, values, type, sampleRate, 0, tags);
}

// send longs without sample rate
private void send(String aspect, long[] values, Message.Type type, String[] tags) {
send(aspect, values, type, Double.NaN, 0, tags);
}

private void send(String aspect, double[] values, Message.Type type, double sampleRate, long timestamp, String[] tags) {
if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) {
switch (type) {
case COUNT:
sampleRate = Double.NaN;
break;
default:
break;
}
}

if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
sendMetric(new DoublesStatsDMessage(aspect, type, values, sampleRate, timestamp, tags));
}
}

private void send(String aspect, double[] values, Message.Type type, double sampleRate, String[] tags) {
send(aspect, values, type, sampleRate, 0, tags);
}

// send doubles without sample rate
private void send(String aspect, double[] values, Message.Type type, String[] tags) {
send(aspect, values, type, Double.NaN, 0, tags);
}

private void sendWithTimestamp(String aspect, final double value, Message.Type type, long timestamp, String[] tags) {
if (timestamp < MIN_TIMESTAMP) {
timestamp = MIN_TIMESTAMP;
Expand Down Expand Up @@ -1039,6 +1162,70 @@ public void recordDistributionValue(final String aspect, final long value, final
send(aspect, value, Message.Type.DISTRIBUTION, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordDistributionValue(String aspect, double[] values, String... tags) {
send(aspect, values, Message.Type.DISTRIBUTION, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordDistributionValue(String aspect, double[] values, double sampleRate, String... tags) {
send(aspect, values, Message.Type.DISTRIBUTION, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordDistributionValue(String aspect, long[] values, String... tags) {
send(aspect, values, Message.Type.DISTRIBUTION, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordDistributionValue(String aspect, long[] values, double sampleRate, String... tags) {
send(aspect, values, Message.Type.DISTRIBUTION, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, double[] values, String... tags) {
recordDistributionValue(aspect, values, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, double[] values, double sampleRate, String... tags) {
recordDistributionValue(aspect, values, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, long[] values, String... tags) {
recordDistributionValue(aspect, values, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, long[] values, double sampleRate, String... tags) {
recordDistributionValue(aspect, values, sampleRate, tags);
}

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, double, String[])}.
*/
Expand Down
120 changes: 120 additions & 0 deletions src/main/java/com/timgroup/statsd/StatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,74 @@ public interface StatsDClient extends Closeable {
*/
void recordDistributionValue(String aspect, long value, double sampleRate, String... tags);

/**
* Records values for the specified named distribution.
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param tags
* array of tags to be added to the data
*/
void recordDistributionValue(String aspect, double[] values, String... tags);

/**
* Records values for the specified named distribution.
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param sampleRate
* percentage of time metric to be sent
* @param tags
* array of tags to be added to the data
*/
void recordDistributionValue(String aspect, double[] values, double sampleRate, String... tags);

/**
* Records values for the specified named distribution.
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param tags
* array of tags to be added to the data
*/
void recordDistributionValue(String aspect, long[] values, String... tags);

/**
* Records values for the specified named distribution.
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param sampleRate
* percentage of time metric to be sent
* @param tags
* array of tags to be added to the data
*/
void recordDistributionValue(String aspect, long[] values, double sampleRate, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, double, String[])}.
*
Expand Down Expand Up @@ -702,6 +770,58 @@ public interface StatsDClient extends Closeable {
*/
void distribution(String aspect, long value, double sampleRate, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, double[], String[])}.
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param tags
* array of tags to be added to the data
*/
void distribution(String aspect, double[] values, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, double[], double, String[])}.
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param sampleRate
* percentage of time metric to be sent
* @param tags
* array of tags to be added to the data
*/
void distribution(String aspect, double[] values, double sampleRate, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, long[], String[])}.
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param tags
* array of tags to be added to the data
*/
void distribution(String aspect, long[] values, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, long[], double, String[])}.
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param sampleRate
* percentage of time metric to be sent
* @param tags
* array of tags to be added to the data
*/
void distribution(String aspect, long[] values, double sampleRate, String... tags);

/**
* Records an event.
*
Expand Down
Loading