Skip to content

Commit

Permalink
Don't use lock for labelValues computation
Browse files Browse the repository at this point in the history
As part of this the following changes have been added:
- Manage concurrent runs transformation
- Fix test recalculation modal

Signed-off-by: Andrea Lamparelli <[email protected]>
  • Loading branch information
lampajr authored and johnaohara committed Oct 24, 2024
1 parent 23f4c78 commit fe237f7
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.time.Instant;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -53,7 +52,7 @@ public class DatasetServiceImpl implements DatasetService {
private static final Logger log = Logger.getLogger(DatasetServiceImpl.class);

//@formatter:off
private static final String LABEL_QUERY = """
private static final String LABEL_QUERY = """
WITH
used_labels AS (
SELECT label.id AS label_id, label.name, ds.schema_id, count(le) AS count
Expand Down Expand Up @@ -86,7 +85,7 @@ lvalues AS (
JOIN used_labels ul ON label.id = ul.label_id
GROUP BY lvalues.label_id, ul.name, function, ul.count
""";
protected static final String LABEL_PREVIEW = """
protected static final String LABEL_PREVIEW = """
WITH
le AS (
SELECT * FROM jsonb_populate_recordset(NULL::extractor, (?1)::jsonb)
Expand All @@ -110,7 +109,7 @@ WHEN jsonb_array_length((?1)::jsonb) = 1 THEN jsonb_agg(lvalues.value) -> 0
FROM lvalues
""";

private static final String SCHEMAS_SELECT = """
private static final String SCHEMAS_SELECT = """
SELECT dataset_id,
jsonb_agg(
jsonb_build_object('id', schema.id, 'uri', ds.uri, 'name', schema.name, 'source', 0, 'type', 2, 'key', ds.index::text, 'hasJsonSchema', schema.schema IS NOT NULL)
Expand All @@ -119,13 +118,13 @@ WHEN jsonb_array_length((?1)::jsonb) = 1 THEN jsonb_agg(lvalues.value) -> 0
JOIN dataset ON dataset.id = ds.dataset_id
JOIN schema ON schema.id = ds.schema_id
""";
private static final String VALIDATION_SELECT = """
private static final String VALIDATION_SELECT = """
validation AS (
SELECT dataset_id, jsonb_agg(jsonb_build_object('schemaId', schema_id, 'error', error)) AS errors
FROM dataset_validationerrors GROUP BY dataset_id
)
""";
private static final String DATASET_SUMMARY_SELECT = """
private static final String DATASET_SUMMARY_SELECT = """
SELECT ds.id, ds.runid AS runId,
ds.ordinal, ds.testid AS testId,
test.name AS testname, ds.description,
Expand All @@ -140,7 +139,7 @@ SELECT dataset_id, jsonb_agg(jsonb_build_object('schemaId', schema_id, 'error',
LEFT JOIN validation ON validation.dataset_id = ds.id
LEFT JOIN dataset_view dv ON dv.dataset_id = ds.id AND dv.view_id =
""";
private static final String LIST_SCHEMA_DATASETS = """
private static final String LIST_SCHEMA_DATASETS = """
WITH ids AS (
SELECT dataset_id AS id FROM dataset_schemas WHERE uri = ?1
),
Expand All @@ -162,15 +161,15 @@ WHERE dataset_id IN (SELECT id FROM ids) GROUP BY dataset_id
LEFT JOIN dataset_view dv ON dv.dataset_id = ds.id
WHERE ds.id IN (SELECT id FROM ids)
""";
private static final String ALL_LABELS_SELECT = """
private static final String ALL_LABELS_SELECT = """
SELECT dataset.id as dataset_id,
COALESCE(jsonb_object_agg(label.name, lv.value) FILTER (WHERE label.name IS NOT NULL), '{}'::jsonb) AS values
FROM dataset
LEFT JOIN label_values lv ON dataset.id = lv.dataset_id
LEFT JOIN label ON label.id = label_id
""";

//@formatter:on
//@formatter:on
@Inject
EntityManager em;

Expand All @@ -183,12 +182,6 @@ WHERE ds.id IN (SELECT id FROM ids)
@Inject
TransactionManager tm;

// This is a nasty hack that will serialize all run -> dataset transformations and label calculations
// The problem is that PostgreSQL's SSI will for some (unknown) reason rollback some transactions,
// probably due to false sharing of locks. For some reason even using advisory locks in DB does not
// solve the issue so we have to serialize this even outside the problematic transactions.
private final ReentrantLock recalculationLock = new ReentrantLock();

@PermitAll
@WithRoles
@Override
Expand Down Expand Up @@ -562,20 +555,7 @@ void updateFingerprints(int testId) {
}
}

void withRecalculationLock(Runnable runnable) {
recalculationLock.lock();
try {
runnable.run();
} finally {
recalculationLock.unlock();
}
}

public void onNewDataset(Dataset.EventNew event) {
withRecalculationLock(() -> calculateLabelValues(event.testId, event.datasetId, event.labelId, event.isRecalculation));
}

public void onNewDatasetNoLock(Dataset.EventNew event) {
calculateLabelValues(event.testId, event.datasetId, event.labelId, event.isRecalculation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -64,6 +65,7 @@
import io.hyperfoil.tools.horreum.api.data.ValidationError;
import io.hyperfoil.tools.horreum.api.services.RunService;
import io.hyperfoil.tools.horreum.api.services.SchemaService;
import io.hyperfoil.tools.horreum.api.services.TestService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.datastore.BackendResolver;
import io.hyperfoil.tools.horreum.datastore.Datastore;
Expand Down Expand Up @@ -152,6 +154,8 @@ WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)-
@Inject
Session session;

private final ConcurrentHashMap<Integer, TestService.RecalculationStatus> transformations = new ConcurrentHashMap<>();

@Transactional
@WithRoles(extras = Roles.HORREUM_SYSTEM)
void onTestDeleted(int testId) {
Expand Down Expand Up @@ -1118,14 +1122,38 @@ public void recalculateAll(String fromStr, String toStr) {
}
}

/**
* Transforms the data for a given run by applying applicable schemas and transformers.
* It ensures any existing datasets for the run are removed before creating new ones,
* handles timeouts for ongoing transformations, and creates datasets with the transformed data.
*
* @param runId the ID of the run to transform
* @param isRecalculation flag indicating if this is a recalculation
* @return the number of datasets created, or 0 if the run is invalid or not found or already ongoing
*/
@WithRoles(extras = Roles.HORREUM_SYSTEM)
@Transactional
int transform(int runId, boolean isRecalculation) {
if (runId < 1) {
log.errorf("Transformation parameters error: run %s", runId);
return 0;
}

log.debugf("Transforming run ID %d, recalculation? %s", runId, Boolean.toString(isRecalculation));
int numDatasets = 0;

// check whether there is an ongoing transformation on the same runId
TestService.RecalculationStatus status = new TestService.RecalculationStatus(1);
TestService.RecalculationStatus prev = transformations.putIfAbsent(runId, status);
// ensure the transformation is removed, with this approach we should be sure
// it gets removed even if transaction-level exception occurs, e.g., timeout
Util.registerTxSynchronization(tm, txStatus -> transformations.remove(runId, status));
if (prev != null) {
// there is an ongoing transformation that has recently been initiated
log.warnf("Transformation for run %d already in progress", runId);
return numDatasets;
}

// We need to make sure all old datasets are gone before creating new; otherwise we could
// break the runid,ordinal uniqueness constraint
for (DatasetDAO old : DatasetDAO.<DatasetDAO> list("run.id", runId)) {
Expand All @@ -1138,9 +1166,8 @@ int transform(int runId, boolean isRecalculation) {
RunDAO run = RunDAO.findById(runId);
if (run == null) {
log.errorf("Cannot load run ID %d for transformation", runId);
return 0;
return numDatasets; // this is 0
}
int ordinal = 0;
Map<Integer, JsonNode> transformerResults = new TreeMap<>();
// naked nodes (those produced by implicit identity transformers) are all added to each dataset
List<JsonNode> nakedNodes = new ArrayList<>();
Expand Down Expand Up @@ -1247,7 +1274,8 @@ int transform(int runId, boolean isRecalculation) {
}
} else if (!result.isContainerNode() || (result.isObject() && !result.has("$schema")) ||
(result.isArray()
&& StreamSupport.stream(result.spliterator(), false).anyMatch(item -> !item.has("$schema")))) {
&& StreamSupport.stream(result.spliterator(), false)
.anyMatch(item -> !item.has("$schema")))) {
logMessage(run, PersistentLogDAO.WARN, "Dataset will contain element without a schema.");
}
JsonNode existing = transformerResults.get(transformerId);
Expand Down Expand Up @@ -1285,12 +1313,14 @@ int transform(int runId, boolean isRecalculation) {
}
nakedNodes.add(node);
logMessage(run, PersistentLogDAO.DEBUG,
"This test (%d) does not use any transformer for schema %s (key %s), passing as-is.", run.testid, uri,
"This test (%d) does not use any transformer for schema %s (key %s), passing as-is.", run.testid,
uri,
key);
}
}
if (schemasAndTransformers > 0) {
int max = transformerResults.values().stream().filter(JsonNode::isArray).mapToInt(JsonNode::size).max().orElse(1);
int max = transformerResults.values().stream().filter(JsonNode::isArray).mapToInt(JsonNode::size).max()
.orElse(1);

for (int position = 0; position < max; position += 1) {
ArrayNode all = instance.arrayNode(max + nakedNodes.size());
Expand All @@ -1305,7 +1335,7 @@ int transform(int runId, boolean isRecalculation) {
String message = String.format(
"Transformer %d produced an array of %d elements but other transformer " +
"produced %d elements; dataset %d/%d might be missing some data.",
entry.getKey(), node.size(), max, run.id, ordinal);
entry.getKey(), node.size(), max, run.id, numDatasets);
logMessage(run, PersistentLogDAO.WARN, "%s", message);
log.warnf(message);
}
Expand All @@ -1316,18 +1346,18 @@ int transform(int runId, boolean isRecalculation) {
}
}
nakedNodes.forEach(all::add);
createDataset(new DatasetDAO(run, ordinal++, run.description, all), isRecalculation);
createDataset(new DatasetDAO(run, numDatasets++, run.description, all), isRecalculation);
}
mediator.validateRun(run.id);
return ordinal;
} else {
numDatasets = 1;
logMessage(run, PersistentLogDAO.INFO, "No applicable schema, dataset will be empty.");
createDataset(new DatasetDAO(
run, 0, "Empty Dataset for run data without any schema.",
instance.arrayNode()), isRecalculation);
mediator.validateRun(run.id);
return 1;
}
return numDatasets;
}

private String limitLength(String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
Expand Down Expand Up @@ -102,6 +103,8 @@ public class ServiceMediator {

private Map<AsyncEventChannels, Map<Integer, BlockingQueue<Object>>> events = new ConcurrentHashMap<>();

private final ReentrantLock lock = new ReentrantLock();

public ServiceMediator() {
}

Expand Down Expand Up @@ -151,7 +154,6 @@ void updateLabels(Dataset.LabelsUpdatedEvent event) {
}

void newDataset(Dataset.EventNew eventNew) {
//Note: should we call onNewDataset which will enable a lock?
datasetService.onNewDataset(eventNew);
}

Expand All @@ -166,7 +168,7 @@ void newChange(Change.Event event) {
@ActivateRequestContext
@WithRoles(extras = Roles.HORREUM_SYSTEM)
public void processDatasetEvents(Dataset.EventNew newEvent) {
datasetService.onNewDatasetNoLock(newEvent);
newDataset(newEvent);
validateDataset(newEvent.datasetId);
}

Expand Down Expand Up @@ -232,8 +234,13 @@ int transform(int runId, boolean isRecalculation) {
return runService.transform(runId, isRecalculation);
}

void withRecalculationLock(Runnable run) {
datasetService.withRecalculationLock(run);
void withSharedLock(Runnable runnable) {
lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}

void newExperimentResult(ExperimentService.ExperimentResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ public void recalculateDatasets(int testId) {
// transform will add proper roles anyway
// messageBus.executeForTest(testId, () -> datasetService.withRecalculationLock(() -> {
// mediator.executeBlocking(() -> mediator.transform(runId, true));
mediator.executeBlocking(() -> mediator.withRecalculationLock(() -> {
mediator.executeBlocking(() -> mediator.withSharedLock(() -> {
int newDatasets = 0;
try {
newDatasets = mediator.transform(runId, true);
Expand Down
8 changes: 0 additions & 8 deletions horreum-web/src/api.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,6 @@ export function updateChangeDetection(

}

export function updateRunsAndDatasetsAction(
testId: number,
runs: number,
datasets: number
): any {
return {type: "Tests/UPDATE_RUNS_AND_DATASETS", testId, runs, datasets}
}

///Runs
export function fetchRunSummary(id: number, token: string | undefined, alerting: AlertContextType): Promise<RunSummary> {
return apiCall(runApi.getRunSummary(id, token), alerting, "FETCH_RUN_SUMMARY", "Failed to fetch data for run " + id + ", try uploading a Run and use new Run id instead.");
Expand Down
28 changes: 10 additions & 18 deletions horreum-web/src/domain/tests/RecalculateDatasetsModal.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {useCallback, useEffect, useState, useRef, useContext} from "react"
import { useCallback, useEffect, useState, useRef, useContext } from "react"
import { Bullseye, Button, Modal, Progress, Spinner } from "@patternfly/react-core"
import {fetchTest, RecalculationStatus, testApi, TestStorage, updateRunsAndDatasetsAction} from "../../api"
import { fetchTest, RecalculationStatus, testApi, TestStorage } from "../../api"
import {AppContext} from "../../context/appContext";
import {AppContextType} from "../../context/@types/appContextTypes";

Expand All @@ -13,7 +13,7 @@ type RecalculateDatasetsModalProps = {

export default function RecalculateDatasetsModal(props: RecalculateDatasetsModalProps) {
const { alerting } = useContext(AppContext) as AppContextType;
const [test, setTest] = useState<TestStorage | undefined>( undefined)
const [test, setTest] = useState<TestStorage | undefined>(undefined)
const [progress, setProgress] = useState(-1)
const [status, setStatus] = useState<RecalculationStatus>()
const timerId = useRef<number>()
Expand All @@ -32,19 +32,16 @@ export default function RecalculateDatasetsModal(props: RecalculateDatasetsModal
if (!props.isOpen) {
return
}

// fetch the current test
fetchTest(props.testId, alerting).then(setTest)
}, [props.testId]);

useEffect(() => {
if (!props.isOpen) {
return
}
// fetch the latest recalculation status
if (test?.runs === undefined) {
testApi.getRecalculationStatus(props.testId).then(status => {
updateRunsAndDatasetsAction(props.testId, status.totalRuns, status.datasets)
})
testApi.getRecalculationStatus(props.testId).then(setStatus)
}
}, [test, props.isOpen])
}, [props.isOpen]);

return (
<Modal
title={`Re-transform datasets for test ${test?.name || "<unknown test>"}`}
Expand All @@ -66,11 +63,6 @@ export default function RecalculateDatasetsModal(props: RecalculateDatasetsModal
.then(status => {
setStatus(status)
setProgress(status.finished)
updateRunsAndDatasetsAction(
props.testId,
status.totalRuns,
status.datasets
)
if (status.finished === status.totalRuns) {
onClose()
}
Expand Down Expand Up @@ -112,7 +104,7 @@ export default function RecalculateDatasetsModal(props: RecalculateDatasetsModal
)}
{progress < 0 && (
<div style={{ marginBottom: "16px" }}>
This test has {test?.runs || "<unknown number>"} of runs; do you want to recalculate all datasets?
This test has {status?.totalRuns || "<unknown number>"} of runs; do you want to recalculate all datasets?
</div>
)}
{progress >= 0 && (
Expand Down

0 comments on commit fe237f7

Please sign in to comment.