Skip to content

Commit

Permalink
Move label values json aggregation into Java
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Lamparelli <[email protected]>
  • Loading branch information
lampajr authored and johnaohara committed Oct 6, 2024
1 parent dcfb2c0 commit 27295df
Show file tree
Hide file tree
Showing 9 changed files with 1,116 additions and 407 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package io.hyperfoil.tools.horreum.api.data;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import jakarta.validation.constraints.NotNull;

import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
import org.eclipse.microprofile.openapi.annotations.media.Schema;

import com.fasterxml.jackson.databind.node.ObjectNode;

@Schema(type = SchemaType.OBJECT, description = "A map of label names to label values with the associated datasetId and runId")
public class ExportedLabelValues {
@Schema
Expand All @@ -36,25 +32,5 @@ public ExportedLabelValues(LabelValueMap v, Integer runId, Integer datasetId, In
this.datasetId = datasetId;
this.start = start;
this.stop = stop;

}

public static List<ExportedLabelValues> parse(List<Object[]> nodes) {
if (nodes == null || nodes.isEmpty())
return new ArrayList<>();
List<ExportedLabelValues> fps = new ArrayList<>();
nodes.forEach(objects -> {
ObjectNode node = (ObjectNode) objects[0];
Integer runId = Integer.parseInt(objects[1] == null ? "-1" : objects[1].toString());
Integer datasetId = Integer.parseInt(objects[2] == null ? "-1" : objects[2].toString());
Instant start = (Instant) objects[3];
Instant stop = (Instant) objects[4];
if (node.isObject()) {
fps.add(new ExportedLabelValues(LabelValueMap.fromObjectNode(node), runId, datasetId, start, stop));
} else {
//TODO alert that something is wrong in the db response
}
});
return fps;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package io.hyperfoil.tools.horreum.svc;

import static com.fasterxml.jackson.databind.node.JsonNodeFactory.instance;
import static io.hyperfoil.tools.horreum.entity.data.SchemaDAO.*;
import static io.hyperfoil.tools.horreum.entity.data.SchemaDAO.QUERY_1ST_LEVEL_BY_RUNID_TRANSFORMERID_SCHEMA_ID;
import static io.hyperfoil.tools.horreum.entity.data.SchemaDAO.QUERY_2ND_LEVEL_BY_RUNID_TRANSFORMERID_SCHEMA_ID;
import static io.hyperfoil.tools.horreum.entity.data.SchemaDAO.QUERY_TRANSFORMER_TARGETS;

import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand All @@ -14,9 +22,17 @@
import jakarta.annotation.security.RolesAllowed;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.persistence.*;
import jakarta.persistence.EntityManager;
import jakarta.persistence.NoResultException;
import jakarta.persistence.PersistenceException;
import jakarta.persistence.Query;
import jakarta.persistence.TransactionRequiredException;
import jakarta.transaction.*;
import jakarta.persistence.Tuple;
import jakarta.transaction.InvalidTransactionException;
import jakarta.transaction.SystemException;
import jakarta.transaction.Transaction;
import jakarta.transaction.TransactionManager;
import jakarta.transaction.Transactional;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
Expand All @@ -40,8 +56,12 @@
import com.fasterxml.jackson.databind.node.TextNode;

import io.hyperfoil.tools.horreum.api.SortDirection;
import io.hyperfoil.tools.horreum.api.data.*;
import io.hyperfoil.tools.horreum.api.data.Access;
import io.hyperfoil.tools.horreum.api.data.Dataset;
import io.hyperfoil.tools.horreum.api.data.ExportedLabelValues;
import io.hyperfoil.tools.horreum.api.data.JsonpathValidation;
import io.hyperfoil.tools.horreum.api.data.Run;
import io.hyperfoil.tools.horreum.api.data.ValidationError;
import io.hyperfoil.tools.horreum.api.services.RunService;
import io.hyperfoil.tools.horreum.api.services.SchemaService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
Expand All @@ -51,9 +71,12 @@
import io.hyperfoil.tools.horreum.entity.PersistentLogDAO;
import io.hyperfoil.tools.horreum.entity.alerting.DataPointDAO;
import io.hyperfoil.tools.horreum.entity.alerting.TransformationLogDAO;
import io.hyperfoil.tools.horreum.entity.data.*;
import io.hyperfoil.tools.horreum.entity.data.DatasetDAO;
import io.hyperfoil.tools.horreum.entity.data.RunDAO;
import io.hyperfoil.tools.horreum.entity.data.SchemaDAO;
import io.hyperfoil.tools.horreum.entity.data.TestDAO;
import io.hyperfoil.tools.horreum.entity.data.TransformerDAO;
import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType;
import io.hyperfoil.tools.horreum.hibernate.JsonbSetType;
import io.hyperfoil.tools.horreum.mapper.DatasetMapper;
import io.hyperfoil.tools.horreum.mapper.RunMapper;
import io.hyperfoil.tools.horreum.server.RoleManager;
Expand All @@ -67,15 +90,16 @@
@Startup
public class RunServiceImpl implements RunService {
private static final Logger log = Logger.getLogger(RunServiceImpl.class);

//@formatter:off
private static final String FIND_AUTOCOMPLETE = """
private static final String FIND_AUTOCOMPLETE = """
SELECT * FROM (
SELECT DISTINCT jsonb_object_keys(q) AS key
FROM run, jsonb_path_query(run.data, ? ::jsonpath) q
WHERE jsonb_typeof(q) = 'object') AS keys
WHERE keys.key LIKE CONCAT(?, '%');
""";
protected static final String FIND_RUNS_WITH_URI = """
protected static final String FIND_RUNS_WITH_URI = """
SELECT id, testid
FROM run
WHERE NOT trashed
Expand All @@ -88,7 +112,7 @@ WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)-
OR (metadata IS NOT NULL AND ?1 IN (SELECT jsonb_array_elements(metadata)->>'$schema'))
)
""";
//@formatter:on
//@formatter:on
private static final String[] CONDITION_SELECT_TERMINAL = { "==", "!=", "<>", "<", "<=", ">", ">=", " " };
private static final String UPDATE_TOKEN = "UPDATE run SET token = ? WHERE id = ?";
private static final String CHANGE_ACCESS = "UPDATE run SET owner = ?, access = ? WHERE id = ?";
Expand All @@ -114,6 +138,9 @@ WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)-
@Inject
TestServiceImpl testService;

@Inject
LabelValuesService labelValuesService;

@Inject
ObjectMapper mapper;

Expand Down Expand Up @@ -268,121 +295,17 @@ public Object getData(int id, String token, String schemaUri) {
@Override
public List<ExportedLabelValues> labelValues(int runId, String filter, String sort, String direction, int limit, int page,
List<String> include, List<String> exclude, boolean multiFilter) {
List<ExportedLabelValues> rtrn = new ArrayList<>();
Run run = getRun(runId, null);
if (run == null) {
throw ServiceException.serverError("Cannot find run " + runId);
throw ServiceException.notFound("Cannot find run " + runId);
}
Object filterObject = Util.getFilterObject(filter);

TestServiceImpl.FilterDef filterDef = TestServiceImpl.getFilterDef(filter, null, null, multiFilter,
(str) -> labelValues(runId, str, sort, direction, limit, page, include, exclude, false), em);

String filterSql = filterDef.sql();
if (filterDef.filterObject() != null) {
filterObject = filterDef.filterObject();
}

if (filterSql.isBlank() && filter != null && !filter.isBlank()) {
//TODO there was an error with the filter, do we return that info to the user?
}
String orderSql = "";

String orderDirection = direction.equalsIgnoreCase("ascending") ? "ASC" : "DESC";
if (!sort.isBlank()) {
Util.CheckResult jsonpathResult = Util.castCheck(sort, "jsonpath", em);
if (jsonpathResult.ok()) {
orderSql = "order by jsonb_path_query(combined.values,CAST( :orderBy as jsonpath)) " + orderDirection
+ ", combined.datasetId DESC";
} else {
orderSql = "order by combined.datasetId DESC";
}
}
String includeExcludeSql = "";
List<String> mutableInclude = new ArrayList<>(include);

if (include != null && !include.isEmpty()) {
if (exclude != null && !exclude.isEmpty()) {
mutableInclude.removeAll(exclude);
}
if (!mutableInclude.isEmpty()) {
includeExcludeSql = " AND label.name in :include";
}
}
//includeExcludeSql is empty if include did not contain entries after exclude removal
if (includeExcludeSql.isEmpty() && exclude != null && !exclude.isEmpty()) {
includeExcludeSql = " AND label.name NOT in :exclude";
}

String sql = """
WITH
combined as (
SELECT DISTINCT COALESCE(jsonb_object_agg(label.name, lv.value) FILTER (WHERE label.name IS NOT NULL INCLUDE_EXCLUDE_PLACEHOLDER), '{}'::jsonb) AS values, dataset.id AS datasetId, dataset.start AS start, dataset.stop AS stop
FROM dataset
LEFT JOIN label_values lv ON dataset.id = lv.dataset_id
LEFT JOIN label ON label.id = lv.label_id
WHERE runId = :runId
GROUP BY dataset.id
) select * from combined FILTER_PLACEHOLDER ORDER_PLACEHOLDER limit :limit offset :offset
"""
.replace("FILTER_PLACEHOLDER", filterSql)
.replace("INCLUDE_EXCLUDE_PLACEHOLDER", includeExcludeSql)
.replace("ORDER_PLACEHOLDER", orderSql);

NativeQuery query = ((NativeQuery) em.createNativeQuery(sql))
.setParameter("runId", runId);
if (!filterSql.isEmpty()) {
if (filterSql.contains(TestServiceImpl.LABEL_VALUES_FILTER_CONTAINS_JSON)) {
query.setParameter("filter", filterObject, JsonBinaryType.INSTANCE);
} else if (filterSql.contains(TestServiceImpl.LABEL_VALUES_FILTER_MATCHES_NOT_NULL)) {
query.setParameter("filter", filter);
}
}
if (!filterDef.multis().isEmpty() && filterDef.filterObject() != null) {
ObjectNode fullFilterObject = (ObjectNode) Util.getFilterObject(filter);
for (int i = 0; i < filterDef.multis().size(); i++) {
String key = filterDef.multis().get(i);
ArrayNode value = (ArrayNode) fullFilterObject.get(key);
query.setParameter("key" + i, "$." + key);
query.setParameter("value" + i, value, JsonbSetType.INSTANCE);
}
}
if (includeExcludeSql.contains(":include")) {
query.setParameter("include", mutableInclude);
} else if (includeExcludeSql.contains(":exclude")) {
query.setParameter("exclude", exclude);
}
if (orderSql.contains(":orderBy")) {
query.setParameter("orderBy", sort);
try {
return labelValuesService.labelValuesByRun(runId, filter, sort, direction, limit,
page, include, exclude, multiFilter);
} catch (IllegalArgumentException e) {
throw ServiceException.badRequest(e.getMessage());
}
query
.setParameter("limit", limit)
.setParameter("offset", limit * Math.max(0, page))
.unwrap(NativeQuery.class)
.addScalar("values", JsonBinaryType.INSTANCE)
.addScalar("datasetId", Integer.class)
.addScalar("start", StandardBasicTypes.INSTANT)
.addScalar("stop", StandardBasicTypes.INSTANT);
//casting because type inference cannot detect there will be two scalars in the result
//TODO replace this with strictly typed entries
((List<Object[]>) query.getResultList()).forEach(objects -> {
JsonNode node = (JsonNode) objects[0];
Integer datasetId = Integer.parseInt(objects[1] == null ? "-1" : objects[1].toString());
Instant start = (Instant) objects[2];
Instant stop = (Instant) objects[3];

if (node.isObject()) {
rtrn.add(new ExportedLabelValues(
LabelValueMap.fromObjectNode((ObjectNode) node),
runId,
datasetId,
start,
stop));
} else {
//TODO alert that something is wrong in the db response
}
});
return rtrn;
}

@PermitAll
Expand Down
Loading

0 comments on commit 27295df

Please sign in to comment.