Skip to content

Commit

Permalink
Merge pull request #56 from thulab/dev_grafana
Browse files Browse the repository at this point in the history
Dev grafana
  • Loading branch information
zhuyuqing authored Jun 4, 2021
2 parents dc2d4b4 + 380d346 commit 52d0bc4
Show file tree
Hide file tree
Showing 10 changed files with 852 additions and 6 deletions.
83 changes: 81 additions & 2 deletions core/src/main/java/cn/edu/tsinghua/iginx/rest/MetricsResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
import cn.edu.tsinghua.iginx.metadata.SortedListAbstractMetaManager;
import cn.edu.tsinghua.iginx.rest.insert.InsertAnnotationWorker;
import cn.edu.tsinghua.iginx.rest.insert.InsertWorker;
import cn.edu.tsinghua.iginx.rest.query.Query;
import cn.edu.tsinghua.iginx.rest.query.QueryExecutor;
import cn.edu.tsinghua.iginx.rest.query.QueryParser;
import cn.edu.tsinghua.iginx.rest.query.QueryResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,8 +48,11 @@
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -55,7 +61,9 @@ public class MetricsResource {

private final IMetaManager metaManager = SortedListAbstractMetaManager.getInstance();
private static final String INSERT_URL = "api/v1/datapoints";
private static final String INSERT_ANNOTATION_URL = "api/v1/datapoints/annotations";
private static final String QUERY_URL = "api/v1/datapoints/query";
private static final String QUERY_ANNOTATION_URL = "api/v1/datapoints/query/annotations";
private static final String DELETE_URL = "api/v1/datapoints/delete";
private static final String DELETE_METRIC_URL = "api/v1/metric/{metricName}";
private static final String NO_CACHE = "no-cache";
Expand All @@ -71,6 +79,8 @@ public MetricsResource() {

static Response.ResponseBuilder setHeaders(Response.ResponseBuilder responseBuilder) {
responseBuilder.header("Access-Control-Allow-Origin", "*");
responseBuilder.header("Access-Control-Allow-Methods", "POST");
responseBuilder.header("Access-Control-Allow-Headers", "accept, content-type");
responseBuilder.header("Pragma", NO_CACHE);
responseBuilder.header("Cache-Control", NO_CACHE);
responseBuilder.header("Expires", 0);
Expand Down Expand Up @@ -99,11 +109,59 @@ public Response OK() {

@POST
@Path("query")
public Response Grafana_query(final InputStream stream) {
public Response grafanaQuery(String jsonStr) {
try
{
if (jsonStr == null)
{
throw new Exception("query json must not be null or empty");
}
QueryParser parser = new QueryParser();
Query query = parser.parseGrafanaQueryMetric(jsonStr);
QueryExecutor executor = new QueryExecutor(query);
QueryResult result = executor.execute(false);
String entity = parser.parseResultToGrafanaJson(result);
return setHeaders(Response.status(Status.OK).entity(entity + "\n")).build();

return setHeaders(Response.status(Status.OK)).build();
}
catch (Exception e)
{
LOGGER.error("Error occurred during execution ", e);
return setHeaders(Response.status(Status.BAD_REQUEST).entity("Error occurred during execution\n")).build();
}
}

@POST
@Path(INSERT_ANNOTATION_URL)
public void addAnnotation(@Context HttpHeaders httpheaders, final InputStream stream, @Suspended final AsyncResponse asyncResponse) {
threadPool.execute(new InsertAnnotationWorker(asyncResponse, httpheaders, stream));
}

@POST
@Path("annotations")
public Response grafanaAnnotation(String jsonStr) {

try {
return postAnnotationQuery(jsonStr, true);
} catch (Exception e) {
LOGGER.error("Error occurred during execution ", e);
return setHeaders(Response.status(Status.BAD_REQUEST).entity("Error occurred during execution\n")).build();
}
}

@POST
@Path(QUERY_ANNOTATION_URL)
public Response queryAnnotation(String jsonStr) {

try {
return postAnnotationQuery(jsonStr, false);
} catch (Exception e) {
LOGGER.error("Error occurred during execution ", e);
return setHeaders(Response.status(Status.BAD_REQUEST).entity("Error occurred during execution\n")).build();
}
}


@POST
@Path("{string : .+}")
public Response errorPath(@PathParam("string") String str) {
Expand Down Expand Up @@ -152,6 +210,25 @@ public Response postQuery(String jsonStr) {
}
}


public Response postAnnotationQuery(String jsonStr, boolean isGrafana) {
try {
if (jsonStr == null) {
throw new Exception("query json must not be null or empty");
}
QueryParser parser = new QueryParser();
Query query = parser.parseAnnotationQueryMetric(jsonStr, isGrafana);
QueryExecutor executor = new QueryExecutor(query);
QueryResult result = executor.execute(false);
String entity = parser.parseResultToAnnotationJson(result, isGrafana);
return setHeaders(Response.status(Status.OK).entity(entity + "\n")).build();

} catch (Exception e) {
LOGGER.error("Error occurred during execution ", e);
return setHeaders(Response.status(Status.BAD_REQUEST).entity("Error occurred during execution\n")).build();
}
}

@POST
@Path(DELETE_URL)
public Response postDelete(final InputStream stream) {
Expand Down Expand Up @@ -209,4 +286,6 @@ void deleteMetric(String metricName) throws Exception {
restSession.deleteColumns(ins);
restSession.closeSession();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -41,6 +40,7 @@
import java.util.concurrent.ConcurrentHashMap;

public class DataPointsParser {
public static final String ANNOTATION_SPLIT_STRING = "@@annotation";
private static final Logger LOGGER = LoggerFactory.getLogger(DataPointsParser.class);
private static Config config = ConfigDescriptor.getInstance().getConfig();
private final IMetaManager metaManager = SortedListAbstractMetaManager.getInstance();
Expand All @@ -59,7 +59,6 @@ public DataPointsParser(Reader stream) {

public void parse() throws Exception
{

try {
session.openSession();
} catch (SessionException e) {
Expand Down Expand Up @@ -90,6 +89,72 @@ public void parse() throws Exception
}
}

public void parseAnnotation() throws Exception
{
try {
session.openSession();
} catch (SessionException e) {
LOGGER.error("Error occurred during opening session", e);
throw e;
}
try {
JsonNode node = mapper.readTree(inputStream);
if (node.isArray()) {
for (JsonNode objNode : node) {
metricList.add(getAnnotationMetricObject(objNode));
}
} else {
metricList.add(getAnnotationMetricObject(node));
}

} catch (Exception e) {
LOGGER.error("Error occurred during parsing data ", e);
throw e;
}
try {
sendAnnotationMetricsData();
} catch (Exception e) {
LOGGER.debug("Exception occur for create and send ", e);
throw e;
} finally {
session.closeSession();
}
}

private Metric getAnnotationMetricObject(JsonNode node) {
Metric ret = new Metric();
ret.setName(node.get("name").asText());
Iterator<String> fieldNames = node.get("tags").fieldNames();
Iterator<JsonNode> elements = node.get("tags").elements();
while (elements.hasNext() && fieldNames.hasNext()) {
ret.addTag(fieldNames.next(), elements.next().textValue());
}
JsonNode tim = node.get("timestamp"), val = node.get("value");
if (tim != null && val != null) {
ret.addTimestamp(tim.asLong());
ret.addValue(val.asText());
}
JsonNode dp = node.get("datapoints");
if (dp != null)
{
if (dp.isArray())
{
for (JsonNode dpnode : dp)
{
ret.addTimestamp(dpnode.asLong());
}
}
}
JsonNode anno = node.get("annotation");
if (anno != null)
{
ret.setAnnotation(anno.toString().replace("\n", "")
.replace("\t", "").replace(" ", ""));
}
return ret;
}


private Metric getMetricObject(JsonNode node) {
Metric ret = new Metric();
ret.setName(node.get("name").asText());
Expand All @@ -114,6 +179,12 @@ private Metric getMetricObject(JsonNode node) {
}
}
}
JsonNode anno = node.get("annotation");
if (anno != null)
{
ret.setAnnotation(anno.toString().replace("\n", "")
.replace("\t", "").replace(" ", ""));
}
return ret;
}

Expand Down Expand Up @@ -185,13 +256,77 @@ private void sendMetricsData() throws Exception
valuesList[0] = values;
try {
session.insertColumnRecords(paths, metric.getTimestamps().stream().mapToLong(t -> t.longValue()).toArray(), valuesList, type, null);
if (metric.getAnnotation() != null)
{
for (int i = 0; i < size; i++) {
values[i] = metric.getAnnotation().getBytes();
}
valuesList[0] = values;
path.append(ANNOTATION_SPLIT_STRING);
paths.set(0, path.toString());
type.set(0, DataType.BINARY);
session.insertColumnRecords(paths, metric.getTimestamps().stream().mapToLong(t -> t.longValue()).toArray(), valuesList, type, null);
}
} catch (ExecutionException e) {
LOGGER.error("Error occurred during insert ", e);
throw e;
}
}
}

private void sendAnnotationMetricsData() throws Exception
{
for (Metric metric: metricList)
{
boolean needUpdate = false;
Map<String, Integer> metricschema = metaManager.getSchemaMapping(metric.getName());
if (metricschema == null) {
needUpdate = true;
metricschema = new ConcurrentHashMap<>();
}
Iterator iter = metric.getTags().entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
if (metricschema.get(entry.getKey()) == null) {
needUpdate = true;
int pos = metricschema.size() + 1;
metricschema.put((String) entry.getKey(), pos);
}
}
if (needUpdate)
metaManager.addOrUpdateSchemaMapping(metric.getName(), metricschema);
Map<Integer, String> pos2path = new TreeMap<>();
for (Map.Entry<String, Integer> entry : metricschema.entrySet())
pos2path.put(entry.getValue(), entry.getKey());
StringBuilder path = new StringBuilder("");
iter = pos2path.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
String ins = metric.getTags().get(entry.getValue());
if (ins != null)
path.append(ins + ".");
else
path.append("null.");
}
path.append(metric.getName());
path.append(ANNOTATION_SPLIT_STRING);
List<String> paths = new ArrayList<>();
paths.add(path.toString());
List<DataType> type = new ArrayList<>();
type.add(DataType.BINARY);
int size = metric.getTimestamps().size();
Object[] valuesList = new Object[1];
Object[] values = new Object[size];
for (int i = 0; i < size; i++)
{
values[i] = metric.getAnnotation().getBytes();
}
valuesList[0] = values;
session.insertColumnRecords(paths, metric.getTimestamps().stream().mapToLong(t -> t.longValue()).toArray(), valuesList, type, null);
}
}


Object getType(String str, DataType tp) {
switch (tp) {
case BINARY:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cn.edu.tsinghua.iginx.rest.insert;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.zip.GZIPInputStream;

public class InsertAnnotationWorker extends Thread
{
private static final String NO_CACHE = "no-cache";
private static final Logger LOGGER = LoggerFactory.getLogger(InsertAnnotationWorker.class);
private HttpHeaders httpheaders;
private InputStream stream;
private AsyncResponse asyncResponse;

public InsertAnnotationWorker(final AsyncResponse asyncResponse, HttpHeaders httpheaders,
InputStream stream) {
this.asyncResponse = asyncResponse;
this.httpheaders = httpheaders;
this.stream = stream;
}

static Response.ResponseBuilder setHeaders(Response.ResponseBuilder responseBuilder) {
responseBuilder.header("Access-Control-Allow-Origin", "*");
responseBuilder.header("Pragma", NO_CACHE);
responseBuilder.header("Cache-Control", NO_CACHE);
responseBuilder.header("Expires", 0);
return (responseBuilder);
}

@Override
public void run() {
Response response;
try {
if (httpheaders != null) {
List<String> requestHeader = httpheaders.getRequestHeader("Content-Encoding");
if (requestHeader != null && requestHeader.contains("gzip")) {
stream = new GZIPInputStream(stream);
}
}
DataPointsParser parser = new DataPointsParser(new InputStreamReader(stream, StandardCharsets.UTF_8));
parser.parseAnnotation();
response = Response.status(Response.Status.OK).build();
} catch (Exception e) {
response = setHeaders(Response.status(Response.Status.BAD_REQUEST).entity("Error occurred during execution\n")).build();
}
asyncResponse.resume(response);
}
}
Loading

0 comments on commit 52d0bc4

Please sign in to comment.