Skip to content

Commit

Permalink
Merge pull request #173 from tobiasschuerg/feat/query_params
Browse files Browse the repository at this point in the history
feat: Adding query params
  • Loading branch information
vlastahajek authored Jan 20, 2022
2 parents bfcbcf6 + 1e4d676 commit 4e0eb10
Show file tree
Hide file tree
Showing 16 changed files with 740 additions and 17 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
### Features
- [167](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/167) - Added `InfluxDBClient::writeRecord(const char *record)`.
- [167](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/167) - Added possibility to disable retrying by setting `maxRetryAttempts` to zero: `client.setWriteOptions(WriteOptions().maxRetryAttempts(0));`
- [172](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/170) - Added directly streaming batch for write. It can be enable by `InfluxDBClient::setStreamWrite(bool enable = true)`. Writing by streaming lines of batch saves RAM as it sends data without allocating a buffer. On the other hand, this way of writing is about half times slower than the classic way, when allocating the buffer for writing the whole batch.
- [172](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/170) - Allowing larger batch size, > 255.
- [172](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/172) - Added directly streaming batch for write. It can be enable by `InfluxDBClient::setStreamWrite(bool enable = true)`. Writing by streaming lines of batch saves RAM as it sends data without allocating a buffer. On the other hand, this way of writing is about half times slower than the classic way, when allocating the buffer for writing the whole batch.
- [172](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/172) - Allowing larger batch size, > 255.
- [173](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/173) - Added Flux query parameters. Now supported by InfluxDB Cloud only.

## 3.9.0 [2021-09-17]
### Features
Expand Down
82 changes: 82 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ This library doesn't support using those devices as a peripheral.
- [InfluxDb 1](#influxdb-1)
- [Skipping certificate validation](#skipping-certificate-validation)
- [Querying](#querying)
- [Parametrized Queries](#parametrized-queries)
- [Original API](#original-api)
- [Initialization](#initialization)
- [Sending a single measurement](#sending-a-single-measurement)
Expand Down Expand Up @@ -524,6 +525,87 @@ if(result.getError() != "") {
```
Complete source code is available in [QueryAggregated example](examples/QueryAggregated/QueryAggregated.ino).

### Parametrized Queries
InfluxDB Cloud supports [Parameterized Queries](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/)
that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more
reusable and can also be used to help prevent injection attacks.

InfluxDB Cloud inserts the params object into the Flux query as a Flux record named `params`. Use dot or bracket
notation to access parameters in the `params` record in your Flux query. Parameterized Flux queries support only `int`
, `float`, and `string` data types. To convert the supported data types into
other [Flux basic data types, use Flux type conversion functions](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/#supported-parameter-data-types).

Parameterized query example:
> :warning: Parameterized Queries are supported only in InfluxDB Cloud, currently there is no support in InfluxDB OSS.
```cpp
// Prepare query parameters
QueryParams params;
params.add("bucket", INFLUXDB_BUCKET);
params.add("since", "-5m");
params.add("device", DEVICE);
params.add("rssiThreshold", -50);

// Construct a Flux query using parameters
// Parameters are accessed via the 'params' Flux object
// Flux only supports only string, float and int as parameters. Duration can be converted from string.
// Query will find RSSI less than defined threshold
String query = "from(bucket: params.bucket) |> range(start: duration(v: params.since)) \
|> filter(fn: (r) => r._measurement == \"wifi_status\") \
|> filter(fn: (r) => r._field == \"rssi\") \
|> filter(fn: (r) => r.device == params.device) \
|> filter(fn: (r) => r._value < params.rssiThreshold)";

// Print ouput header
// Print composed query
Serial.print("Querying with: ");
Serial.println(query);

// Send query to the server and get result
FluxQueryResult result = client.query(query, params);

//Print header
Serial.printf("%10s %20s %5s\n","Time","SSID","RSSI");

for(int i=0;i<37;i++) {
Serial.print('-');
}
Serial.println();

// Iterate over rows. Even there is just one row, next() must be called at least once.
int c = 0;
while (result.next()) {
// Get converted value for flux result column 'SSID'
String ssid = result.getValueByName("SSID").getString();

// Get converted value for flux result column '_value' where there is RSSI value
long rssi = result.getValueByName("_value").getLong();

// Get converted value for the _time column
FluxDateTime time = result.getValueByName("_time").getDateTime();

// Format date-time for printing
// Format string according to http://www.cplusplus.com/reference/ctime/strftime/
String timeStr = time.format("%F %T");
// Print formatted row
Serial.printf("%20s %10s %5d\n", timeStr.c_str(), ssid.c_str() ,rssi);
c++;
}
if(!c) {
Serial.println(" No data found");
}

// Check if there was an error
if(result.getError() != "") {
Serial.print("Query result error: ");
Serial.println(result.getError());
}

// Close the result
result.close();
```
Complete source code is available in [QueryParams example](examples/QueryParams/QueryParams.ino).

## Original API

### Initialization
Expand Down
154 changes: 154 additions & 0 deletions examples/QueryParams/QueryParams.ino
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* QueryParams Example code for InfluxDBClient library for Arduino.
*
* This example demonstrates querying using parameters inserted into the Flux query. We select WiFi signal level values bellow a certain threshold.
* WiFi signal is measured and stored in BasicWrite and SecureWrite examples.
*
* Demonstrates connection to any InfluxDB instance accesible via:
* - InfluxDB 2 Cloud at https://cloud2.influxdata.com/ (certificate is preconfigured)
*
* Enter WiFi and InfluxDB parameters below
**/

#if defined(ESP32)
#include <WiFiMulti.h>
WiFiMulti wifiMulti;
#define DEVICE "ESP32"
#elif defined(ESP8266)
#include <ESP8266WiFiMulti.h>
ESP8266WiFiMulti wifiMulti;
#define DEVICE "ESP8266"
#endif

#include <InfluxDbClient.h>
#include <InfluxDbCloud.h>

// WiFi AP SSID
#define WIFI_SSID "SSID"
// WiFi password
#define WIFI_PASSWORD "PASSWORD"
// InfluxDB v2 server url, e.g. https://eu-central-1-1.aws.cloud2.influxdata.com (Use: InfluxDB UI -> Load Data -> Client Libraries)
// InfluxDB 1.8+ (v2 compatibility API) server url, e.g. http://192.168.1.48:8086
#define INFLUXDB_URL "server-url"
// InfluxDB v2 server or cloud API authentication token (Use: InfluxDB UI -> Load Data -> Tokens -> <select token>)
// InfluxDB 1.8+ (v2 compatibility API) use form user:password, eg. admin:adminpass
#define INFLUXDB_TOKEN "server token"
// InfluxDB v2 organization name or id (Use: InfluxDB UI -> Settings -> Profile -> <name under tile> )
// InfluxDB 1.8+ (v2 compatibility API) use any non empty string
#define INFLUXDB_ORG "org name/id"
// InfluxDB v2 bucket name (Use: InfluxDB UI -> Load Data -> Buckets)
// InfluxDB 1.8+ (v2 compatibility API) use database name
#define INFLUXDB_BUCKET "bucket name"

// Set timezone string according to https://www.gnu.org/software/libc/manual/html_node/TZ-Variable.html
// Examples:
// Pacific Time: "PST8PDT"
// Eastern: "EST5EDT"
// Japanesse: "JST-9"
// Central Europe: "CET-1CEST,M3.5.0,M10.5.0/3"
#define TZ_INFO "CET-1CEST,M3.5.0,M10.5.0/3"

// InfluxDB client instance with preconfigured InfluxCloud certificate
InfluxDBClient client(INFLUXDB_URL, INFLUXDB_ORG, INFLUXDB_BUCKET, INFLUXDB_TOKEN, InfluxDbCloud2CACert);

void setup() {
Serial.begin(115200);

// Setup wifi
WiFi.mode(WIFI_STA);
wifiMulti.addAP(WIFI_SSID, WIFI_PASSWORD);

Serial.print("Connecting to wifi");
while (wifiMulti.run() != WL_CONNECTED) {
Serial.print(".");
delay(500);
}
Serial.println();


// Accurate time is necessary for certificate validation
// For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/
// Syncing progress and the time will be printed to Serial
timeSync(TZ_INFO, "pool.ntp.org", "time.nis.gov");

// Check server connection
if (client.validateConnection()) {
Serial.print("Connected to InfluxDB: ");
Serial.println(client.getServerUrl());
} else {
Serial.print("InfluxDB connection failed: ");
Serial.println(client.getLastErrorMessage());
}
}


// Queries WiFi signal level values bellow a certain threshold using parameters inserted into the Flux query
// Prints composed query and the result values.
void loop() {
// Prepare query parameters
QueryParams params;
params.add("bucket", INFLUXDB_BUCKET);
params.add("since", "-5m");
params.add("device", DEVICE);
params.add("rssiTreshold", -50);

// Construct a Flux query using parameters
// Parameters are accessed via the 'params' Flux object
// Flux only supports only string, float and int as parameters. Duration can be converted from string.
// Query will find RSSI less than defined treshold
String query = "from(bucket: params.bucket) |> range(start: duration(v: params.since)) \
|> filter(fn: (r) => r._measurement == \"wifi_status\") \
|> filter(fn: (r) => r._field == \"rssi\") \
|> filter(fn: (r) => r.device == params.device) \
|> filter(fn: (r) => r._value < params.rssiTreshold)";

// Print ouput header
// Print composed query
Serial.print("Querying with: ");
Serial.println(query);

// Send query to the server and get result
FluxQueryResult result = client.query(query, params);

//Print header
Serial.printf("%10s %20s %5s\n","Time","SSID","RSSI");

for(int i=0;i<37;i++) {
Serial.print('-');
}
Serial.println();

// Iterate over rows. Even there is just one row, next() must be called at least once.
int c = 0;
while (result.next()) {
// Get converted value for flux result column 'SSID'
String ssid = result.getValueByName("SSID").getString();

// Get converted value for flux result column '_value' where there is RSSI value
long rssi = result.getValueByName("_value").getLong();

// Get converted value for the _time column
FluxDateTime time = result.getValueByName("_time").getDateTime();

// Format date-time for printing
// Format string according to http://www.cplusplus.com/reference/ctime/strftime/
String timeStr = time.format("%F %T");
// Print formatted row
Serial.printf("%20s %10s %5d\n", timeStr.c_str(), ssid.c_str() ,rssi);
c++;
}
if(!c) {
Serial.println(" No data found");
}

// Check if there was an error
if(result.getError() != "") {
Serial.print("Query result error: ");
Serial.println(result.getError());
}

// Close the result
result.close();
// Wait 15s
delay(15000);
}
31 changes: 27 additions & 4 deletions src/InfluxDbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,16 @@ static const char QueryDialect[] PROGMEM = "\
\"header\": true,\
\"delimiter\": \",\",\
\"commentPrefix\": \"#\"\
}}";
}";

static const char Params[] PROGMEM = ",\
\"params\": {";

FluxQueryResult InfluxDBClient::query(String fluxQuery) {
return query(fluxQuery, QueryParams());
}

FluxQueryResult InfluxDBClient::query(String fluxQuery, QueryParams params) {
uint32_t rwt = getRemainingRetryTime();
if(rwt > 0) {
INFLUXDB_CLIENT_DEBUG("[W] Cannot query yet, pause %ds, %ds yet\n", _retryTime, rwt);
Expand All @@ -590,12 +597,28 @@ FluxQueryResult InfluxDBClient::query(String fluxQuery) {
INFLUXDB_CLIENT_DEBUG("[D] Query to %s\n", _queryUrl.c_str());
INFLUXDB_CLIENT_DEBUG("[D] JSON query:\n%s\n", fluxQuery.c_str());

String body = F("{\"type\":\"flux\",\"query\":\"");
body += escapeJSONString(fluxQuery) + "\",";
String queryEsc = escapeJSONString(fluxQuery);
String body;
body.reserve(150 + queryEsc.length() + params.size()*30);
body = F("{\"type\":\"flux\",\"query\":\"");
body += queryEsc;
body += "\",";
body += FPSTR(QueryDialect);

if(params.size()) {
body += FPSTR(Params);
body += params.jsonString(0);
for(int i=1;i<params.size();i++) {
body +=",";
char *js = params.jsonString(i);
body += js;
delete [] js;
}
body += '}';
}
body += '}';
CsvReader *reader = nullptr;
_retryTime = 0;
INFLUXDB_CLIENT_DEBUG("[D] Query: %s\n", body.c_str());
if(_service->doPOST(_queryUrl.c_str(), body.c_str(), PSTR("application/json"), 200, [&](HTTPClient *httpClient){
bool chunked = false;
if(httpClient->hasHeader(TransferEncoding)) {
Expand Down
5 changes: 5 additions & 0 deletions src/InfluxDbClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "Point.h"
#include "WritePrecision.h"
#include "query/FluxParser.h"
#include "query/Params.h"
#include "util/helpers.h"
#include "Options.h"
#include "BucketsClient.h"
Expand Down Expand Up @@ -130,6 +131,10 @@ class InfluxDBClient {
// Use FluxQueryResult::next() method to iterate over lines of the query result.
// Always call of FluxQueryResult::close() when reading is finished. Check FluxQueryResult doc for more info.
FluxQueryResult query(String fluxQuery);
// Sends Flux query with params and returns FluxQueryResult object for subsequently reading flux query response.
// Use FluxQueryResult::next() method to iterate over lines of the query result.
// Always call of FluxQueryResult::close() when reading is finished. Check FluxQueryResult doc for more info.
FluxQueryResult query(String fluxQuery, QueryParams params);
// Forces writing of all points in buffer, even the batch is not full.
// Returns true if successful, false in case of any error
bool flushBuffer();
Expand Down
4 changes: 2 additions & 2 deletions src/query/FluxParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ bool FluxQueryResult::next() {
return true;
}

FluxDateTime *FluxQueryResult::convertRfc3339(String value, const char *type) {
FluxDateTime *FluxQueryResult::convertRfc3339(String &value, const char *type) {
tm t = {0,0,0,0,0,0,0,0,0};
// has the time part
int zet = value.indexOf('Z');
Expand Down Expand Up @@ -240,7 +240,7 @@ FluxDateTime *FluxQueryResult::convertRfc3339(String value, const char *type) {
return new FluxDateTime(value, type, t, fracts);
}

FluxBase *FluxQueryResult::convertValue(String value, String dataType) {
FluxBase *FluxQueryResult::convertValue(String &value, String &dataType) {
FluxBase *ret = nullptr;
if(dataType.equals(FluxDatatypeDatetimeRFC3339) || dataType.equals(FluxDatatypeDatetimeRFC3339Nano)) {
const char *type = FluxDatatypeDatetimeRFC3339;
Expand Down
4 changes: 2 additions & 2 deletions src/query/FluxParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class FluxQueryResult {
// Descructor
~FluxQueryResult();
protected:
FluxBase *convertValue(String value, String dataType);
static FluxDateTime *convertRfc3339(String value, const char *type);
FluxBase *convertValue(String &value, String &dataType);
static FluxDateTime *convertRfc3339(String &value, const char *type);
void clearValues();
void clearColumns();
private:
Expand Down
Loading

0 comments on commit 4e0eb10

Please sign in to comment.