Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issues 498: Added streaming with time units to the influx db interface #509

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
@@ -439,6 +439,37 @@ public void write(final String database, final String retentionPolicy,
*/
public void query(Query query, int chunkSize, Consumer<QueryResult> consumer);

/**
* Execute a streaming postQuery against a database.
*
* @param query
* the query to execute.
* @param chunkSize
* the number of QueryResults to process in one chunk.
* @param consumer
* the consumer to invoke for each received QueryResult
* @param timeUnit
* the time unit of the results.
*/
public void query(Query query, TimeUnit timeUnit, int chunkSize, Consumer<QueryResult> consumer);

/**
* Execute a streaming postQuery against a database.
*
* @param query
* the query to execute.
* @param timeUnit
* the time unit of the results.
* @param chunkSize
* the number of QueryResults to process in one chunk.
* @param onSuccess
* the consumer to invoke when result is received
* @param onFailure
* the consumer to invoke when error is thrown
*/
public void query(Query query, TimeUnit timeUnit, int chunkSize,
Consumer<QueryResult> onSuccess, Consumer<Throwable> onFailure);

/**
* Execute a query against a database.
*
73 changes: 56 additions & 17 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
@@ -524,42 +524,81 @@ public void onFailure(final Call<QueryResult> call, final Throwable throwable) {
* {@inheritDoc}
*/
@Override
public void query(final Query query, final int chunkSize, final Consumer<QueryResult> consumer) {
Call<ResponseBody> call = null;
if (query instanceof BoundParameterQuery) {
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
call = this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize,
boundParameterQuery.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
}
public void query(final Query query, final int chunkSize, final Consumer<QueryResult> consumer) {
query(query, null, chunkSize, consumer, null);
}

/**
* {@inheritDoc}
*/
@Override
public void query(final Query query, final TimeUnit timeUnit,
final int chunkSize, final Consumer<QueryResult> consumer) {
query(query, timeUnit, chunkSize, consumer, null);
}

/**
* {@inheritDoc}
*/
@Override
public void query(final Query query, final TimeUnit timeUnit, final int chunkSize,
final Consumer<QueryResult> onSuccess, final Consumer<Throwable> onFailure) {
Call<ResponseBody> call = makeResponseBodyCall(query, timeUnit, chunkSize);
call.enqueue(new Callback<ResponseBody>() {
@Override
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
try {
if (response.isSuccessful()) {
ResponseBody chunkedBody = response.body();
chunkProccesor.process(chunkedBody, consumer);
chunkProccesor.process(chunkedBody, onSuccess);
} else {
// REVIEW: must be handled consistently with IOException.
ResponseBody errorBody = response.errorBody();
if (errorBody != null) {
throw new InfluxDBException(errorBody.string());
InfluxDBException errorBodyException = new InfluxDBException(errorBody.string());
if (onFailure != null) {
onFailure.accept(errorBodyException);
} else {
throw errorBodyException;
}
}
}
} catch (IOException e) {
QueryResult queryResult = new QueryResult();
queryResult.setError(e.toString());
consumer.accept(queryResult);
onSuccess.accept(queryResult);
}
}

@Override
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
throw new InfluxDBException(t);
}
});
@Override
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
if (onFailure != null) {
onFailure.accept(t);
} else {
throw new InfluxDBException(t);
}
}
});
}

private Call<ResponseBody> makeResponseBodyCall(final Query query, final TimeUnit timeUnit, final int chunkSize) {
if (query instanceof BoundParameterQuery) {
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
if (timeUnit == null) {
return this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize,
boundParameterQuery.getParameterJsonWithUrlEncoded());
}
return this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(),
TimeUtil.toTimePrecision(timeUnit),
chunkSize, boundParameterQuery.getParameterJsonWithUrlEncoded());
} else {
if (timeUnit == null) {
return this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
}

return this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(),
TimeUtil.toTimePrecision(timeUnit), chunkSize);
}
}

/**
11 changes: 11 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBService.java
Original file line number Diff line number Diff line change
@@ -75,8 +75,19 @@ public Call<QueryResult> postQuery(@Query(DB) String db,
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@GET("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(EPOCH) String epoch, @Query(CHUNK_SIZE) int chunkSize);

@Streaming
@POST("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);

@Streaming
@POST("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(EPOCH) String precision, @Query(CHUNK_SIZE) int chunkSize,
@Query(value = PARAMS, encoded = true) String params);
}
150 changes: 132 additions & 18 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package org.influxdb;

import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB.LogLevel;
import org.influxdb.InfluxDB.ResponseFormat;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.*;
import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Series;
import org.influxdb.impl.InfluxDBImpl;
import org.junit.jupiter.api.AfterEach;
@@ -18,8 +15,6 @@
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;

import okhttp3.OkHttpClient;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
@@ -28,14 +23,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
@@ -47,10 +37,13 @@
@RunWith(JUnitPlatform.class)
public class InfluxDBTest {

InfluxDB influxDB;
private final static long DEFAULT_OPERATION_TIMEOUT = 5000L;
private final static int DEFAULT_QUERY_CHUNK_SIZE = 10;
private final static int UDP_PORT = 8089;
final static String UDP_DATABASE = "udp";

InfluxDB influxDB;

/**
* Create a influxDB connection before all tests start.
*
@@ -137,12 +130,13 @@ public void testBoundParameterQuery() throws InterruptedException {
waitForTestresults.notifyAll();
}
};
this.influxDB.query(query, 10, check);
this.influxDB.query(query, DEFAULT_QUERY_CHUNK_SIZE, check);
synchronized (waitForTestresults) {
waitForTestresults.wait(2000);
waitForTestresults.wait(DEFAULT_OPERATION_TIMEOUT);
}
}


/**
* Tests for callback query.
*/
@@ -162,6 +156,97 @@ public void accept(QueryResult queryResult) {
result.result();
}

/**
* Tests for the chunk stream query with specified time unit
*/
@Test
public void testChunkTimeUnitCallbackQuery() throws Throwable {
this.influxDB.setDatabase(UDP_DATABASE);
final int chunkCount = 40;

for (int i = 0; i < chunkCount; i++) {
Point point = Point.measurement("cpu")
.tag("atag", "test")
.addField("idle", 90L + i)
.addField("usertime", 9L + i)
.addField("system", 1L + i)
.build();
this.influxDB.write(point);
}

// test
Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE atag = $atag")
.forDatabase(UDP_DATABASE)
.bind("atag", "test")
.create();

Object waitForTestresults = new Object();
AtomicInteger callBackCalled = new AtomicInteger(0);
final int expectedCallBackCalled = chunkCount / DEFAULT_QUERY_CHUNK_SIZE;

Consumer<QueryResult> check = (queryResult) -> {
if (queryResult.getResults() == null) {
return;
}

callBackCalled.getAndIncrement();
List<List<Object>> values = queryResult.getResults().get(0).getSeries().get(0).getValues();
Assertions.assertEquals(DEFAULT_QUERY_CHUNK_SIZE, values.size());
Assertions.assertTrue(values.get(0).get(0) instanceof Number);

if (callBackCalled.get() == expectedCallBackCalled) {
synchronized (waitForTestresults) {
waitForTestresults.notifyAll();
}
}
};

this.influxDB.query(query, TimeUnit.MILLISECONDS, DEFAULT_QUERY_CHUNK_SIZE, check);

synchronized (waitForTestresults) {
waitForTestresults.wait(DEFAULT_OPERATION_TIMEOUT);
Assertions.assertEquals(expectedCallBackCalled, callBackCalled.get());
}
}

/**
* Test for the failure callback for a stream query
*/
@Test
public void testFailureCallbackQuery() throws Throwable {
this.influxDB.setDatabase(UDP_DATABASE);
Query query = QueryBuilder.newQuery("Invalid query")
.forDatabase(UDP_DATABASE)
.bind("atag", "test")
.create();

AtomicBoolean onSuccessCalled = new AtomicBoolean(false);
AtomicBoolean onFailureCalled = new AtomicBoolean(false);
Object waitForTestResults = new Object();

Consumer<QueryResult> onSuccess = (queryResult) -> {
onSuccessCalled.set(true);
synchronized (waitForTestResults) {
waitForTestResults.notifyAll();
}
};

Consumer<Throwable> onFailure = (throwable) -> {
onFailureCalled.set(true);
synchronized (waitForTestResults) {
waitForTestResults.notifyAll();
}
};

this.influxDB.query(query, TimeUnit.MILLISECONDS, DEFAULT_QUERY_CHUNK_SIZE, onSuccess, onFailure);

synchronized (waitForTestResults) {
waitForTestResults.wait(DEFAULT_OPERATION_TIMEOUT);
Assertions.assertFalse(onSuccessCalled.get());
Assertions.assertTrue(onFailureCalled.get());
}
}

/**
* Test that describe Databases works.
*/
@@ -790,6 +875,35 @@ public void accept(QueryResult result) {
Assertions.assertFalse(countDownLatch.await(10, TimeUnit.SECONDS));
}

/**
* Test chunking edge case.
* @throws InterruptedException
*/
@Test
public void testChunkingFailureWithCallback() throws InterruptedException {
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
// do not test version 0.13 and 1.0
return;
}
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.createDatabase(dbName);
final CountDownLatch countDownLatch = new CountDownLatch(1);
Query query = new Query("UNKNOWN_QUERY", dbName);
this.influxDB.query(query, TimeUnit.NANOSECONDS, 10, new Consumer<QueryResult>() {
@Override
public void accept(QueryResult result) {
countDownLatch.countDown();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
countDownLatch.countDown();
}
});
this.influxDB.deleteDatabase(dbName);
Assertions.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}

/**
* Test chunking on 0.13 and 1.0.
* @throws InterruptedException