Skip to content

Commit

Permalink
feat: Support Dogstatsd 1.1 protocol for distributions
Browse files Browse the repository at this point in the history
There is a new experimental Dogstatsd protocol (1.1) which supports sending
multiple values in a single datagram.

This is supported since agent 7.25.0/6.25.0.

This evolution in the protocol allows clients to buffer histogram and
distribution values and send them in fewer payload to the agent (providing a
behavior close to client-side aggregation for those types).

This commit add support for this new protocol for distribution.

Fix DataDog#215
  • Loading branch information
blemale committed Feb 24, 2023
1 parent 164cfa4 commit c0c0b80
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 1 deletion.
24 changes: 24 additions & 0 deletions src/main/java/com/timgroup/statsd/NoOpStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ public final class NoOpStatsDClient implements StatsDClient {

@Override public void recordDistributionValue(String aspect, long value, double sampleRate, String... tags) { }

@Override
public void recordDistributionValue(String aspect, double[] values, String... tags) { }

@Override
public void recordDistributionValue(String aspect, double[] values, double sampleRate, String... tags) { }

@Override
public void recordDistributionValue(String aspect, long[] values, String... tags) { }

@Override
public void recordDistributionValue(String aspect, long[] values, double sampleRate, String... tags) { }

@Override
public void distribution(String aspect, double[] values, String... tags) { }

@Override
public void distribution(String aspect, double[] values, double sampleRate, String... tags) { }

@Override
public void distribution(String aspect, long[] values, String... tags) { }

@Override
public void distribution(String aspect, long[] values, double sampleRate, String... tags) { }

@Override public void distribution(String aspect, double value, String... tags) { }

@Override public void distribution(String aspect, double value, double sampleRate, String... tags) { }
Expand Down
189 changes: 188 additions & 1 deletion src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;


/**
Expand Down Expand Up @@ -531,6 +530,80 @@ public boolean canAggregate() {
protected abstract void writeValue(StringBuilder builder);
}

abstract class MultiValuedStatsDMessage extends Message {
private final double sampleRate; // NaN for none
private final long timestamp; // zero for none

MultiValuedStatsDMessage(String aspect, Message.Type type, String[] tags, double sampleRate, long timestamp) {
super(aspect, type, tags);
this.sampleRate = sampleRate;
this.timestamp = timestamp;
}

@Override
public final boolean canAggregate() {
return false;
}

@Override
public final void aggregate(Message message) { }

@Override
public final void writeTo(StringBuilder builder, String containerID) {
builder.append(prefix).append(aspect);
writeValuesTo(builder);
builder.append('|').append(type);
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
}
if (timestamp != 0) {
builder.append("|T").append(timestamp);
}
tagString(tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}

protected abstract void writeValuesTo(StringBuilder builder);
}

final class LongsStatsDMessage extends MultiValuedStatsDMessage {
private final long[] values;

LongsStatsDMessage(String aspect, Message.Type type, long[] values, double sampleRate, long timestamp, String[] tags) {
super(aspect, type, tags, sampleRate, timestamp);
this.values = values;
}

@Override
protected void writeValuesTo(StringBuilder builder) {
for (long value: values) {
builder.append(':').append(value);
}
}
}

final class DoublesStatsDMessage extends MultiValuedStatsDMessage {
private final double[] values;

DoublesStatsDMessage(String aspect, Message.Type type, double[] values, double sampleRate, long timestamp,
String[] tags) {
super(aspect, type, tags, sampleRate,timestamp);
this.values = values;
}

@Override
protected void writeValuesTo(StringBuilder builder) {
for (double value: values) {
builder.append(':').append(value);
}
}
}



private boolean sendMetric(final Message message) {
return send(message);
Expand Down Expand Up @@ -609,6 +682,56 @@ private void send(String aspect, final long value, Message.Type type, String[] t
send(aspect, value, type, Double.NaN, 0, tags);
}

private void send(String aspect, long[] values, Message.Type type, double sampleRate, long timestamp, String[] tags) {
if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) {
switch (type) {
case COUNT:
sampleRate = Double.NaN;
break;
default:
break;
}
}

if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
sendMetric(new LongsStatsDMessage(aspect, type, values, sampleRate, timestamp, tags));
}
}

private void send(String aspect, long[] values, Message.Type type, double sampleRate, String[] tags) {
send(aspect, values, type, sampleRate, 0, tags);
}

// send longs without sample rate
private void send(String aspect, long[] values, Message.Type type, String[] tags) {
send(aspect, values, type, Double.NaN, 0, tags);
}

private void send(String aspect, double[] values, Message.Type type, double sampleRate, long timestamp, String[] tags) {
if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) {
switch (type) {
case COUNT:
sampleRate = Double.NaN;
break;
default:
break;
}
}

if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
sendMetric(new DoublesStatsDMessage(aspect, type, values, sampleRate, timestamp, tags));
}
}

private void send(String aspect, double[] values, Message.Type type, double sampleRate, String[] tags) {
send(aspect, values, type, sampleRate, 0, tags);
}

// send doubles without sample rate
private void send(String aspect, double[] values, Message.Type type, String[] tags) {
send(aspect, values, type, Double.NaN, 0, tags);
}

private void sendWithTimestamp(String aspect, final double value, Message.Type type, long timestamp, String[] tags) {
if (timestamp < MIN_TIMESTAMP) {
timestamp = MIN_TIMESTAMP;
Expand Down Expand Up @@ -1039,6 +1162,70 @@ public void recordDistributionValue(final String aspect, final long value, final
send(aspect, value, Message.Type.DISTRIBUTION, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordDistributionValue(String aspect, double[] values, String... tags) {
send(aspect, values, Message.Type.DISTRIBUTION, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordDistributionValue(String aspect, double[] values, double sampleRate, String... tags) {
send(aspect, values, Message.Type.DISTRIBUTION, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordDistributionValue(String aspect, long[] values, String... tags) {
send(aspect, values, Message.Type.DISTRIBUTION, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordDistributionValue(String aspect, long[] values, double sampleRate, String... tags) {
send(aspect, values, Message.Type.DISTRIBUTION, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, double[] values, String... tags) {
recordDistributionValue(aspect, values, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, double[] values, double sampleRate, String... tags) {
recordDistributionValue(aspect, values, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, long[] values, String... tags) {
recordDistributionValue(aspect, values, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, long[] values, double sampleRate, String... tags) {
recordDistributionValue(aspect, values, sampleRate, tags);
}

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, double, String[])}.
*/
Expand Down
120 changes: 120 additions & 0 deletions src/main/java/com/timgroup/statsd/StatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,74 @@ public interface StatsDClient extends Closeable {
*/
void recordDistributionValue(String aspect, long value, double sampleRate, String... tags);

/**
* Records values for the specified named distribution.
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param tags
* array of tags to be added to the data
*/
void recordDistributionValue(String aspect, double[] values, String... tags);

/**
* Records values for the specified named distribution.
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param sampleRate
* percentage of time metric to be sent
* @param tags
* array of tags to be added to the data
*/
void recordDistributionValue(String aspect, double[] values, double sampleRate, String... tags);

/**
* Records values for the specified named distribution.
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param tags
* array of tags to be added to the data
*/
void recordDistributionValue(String aspect, long[] values, String... tags);

/**
* Records values for the specified named distribution.
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param sampleRate
* percentage of time metric to be sent
* @param tags
* array of tags to be added to the data
*/
void recordDistributionValue(String aspect, long[] values, double sampleRate, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, double, String[])}.
*
Expand Down Expand Up @@ -702,6 +770,58 @@ public interface StatsDClient extends Closeable {
*/
void distribution(String aspect, long value, double sampleRate, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, double[], String[])}.
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param tags
* array of tags to be added to the data
*/
void distribution(String aspect, double[] values, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, double[], double, String[])}.
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param sampleRate
* percentage of time metric to be sent
* @param tags
* array of tags to be added to the data
*/
void distribution(String aspect, double[] values, double sampleRate, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, long[], String[])}.
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param tags
* array of tags to be added to the data
*/
void distribution(String aspect, long[] values, String... tags);

/**
* Convenience method equivalent to {@link #recordDistributionValue(String, long[], double, String[])}.
*
* @param aspect
* the name of the distribution
* @param values
* the values to be incorporated in the distribution
* @param sampleRate
* percentage of time metric to be sent
* @param tags
* array of tags to be added to the data
*/
void distribution(String aspect, long[] values, double sampleRate, String... tags);

/**
* Records an event.
*
Expand Down
Loading

0 comments on commit c0c0b80

Please sign in to comment.