diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/DatasetServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/DatasetServiceImpl.java index 3f2ac30a5..aeb8f458c 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/DatasetServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/DatasetServiceImpl.java @@ -3,7 +3,6 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -53,7 +52,7 @@ public class DatasetServiceImpl implements DatasetService { private static final Logger log = Logger.getLogger(DatasetServiceImpl.class); //@formatter:off - private static final String LABEL_QUERY = """ + private static final String LABEL_QUERY = """ WITH used_labels AS ( SELECT label.id AS label_id, label.name, ds.schema_id, count(le) AS count @@ -86,7 +85,7 @@ lvalues AS ( JOIN used_labels ul ON label.id = ul.label_id GROUP BY lvalues.label_id, ul.name, function, ul.count """; - protected static final String LABEL_PREVIEW = """ + protected static final String LABEL_PREVIEW = """ WITH le AS ( SELECT * FROM jsonb_populate_recordset(NULL::extractor, (?1)::jsonb) @@ -110,7 +109,7 @@ WHEN jsonb_array_length((?1)::jsonb) = 1 THEN jsonb_agg(lvalues.value) -> 0 FROM lvalues """; - private static final String SCHEMAS_SELECT = """ + private static final String SCHEMAS_SELECT = """ SELECT dataset_id, jsonb_agg( jsonb_build_object('id', schema.id, 'uri', ds.uri, 'name', schema.name, 'source', 0, 'type', 2, 'key', ds.index::text, 'hasJsonSchema', schema.schema IS NOT NULL) @@ -119,13 +118,13 @@ WHEN jsonb_array_length((?1)::jsonb) = 1 THEN jsonb_agg(lvalues.value) -> 0 JOIN dataset ON dataset.id = ds.dataset_id JOIN schema ON schema.id = ds.schema_id """; - private static final String VALIDATION_SELECT = """ + private static final String VALIDATION_SELECT = """ validation AS ( SELECT dataset_id, jsonb_agg(jsonb_build_object('schemaId', schema_id, 'error', error)) AS errors FROM dataset_validationerrors GROUP BY dataset_id ) """; - private static final String DATASET_SUMMARY_SELECT = """ + private static final String DATASET_SUMMARY_SELECT = """ SELECT ds.id, ds.runid AS runId, ds.ordinal, ds.testid AS testId, test.name AS testname, ds.description, @@ -140,7 +139,7 @@ SELECT dataset_id, jsonb_agg(jsonb_build_object('schemaId', schema_id, 'error', LEFT JOIN validation ON validation.dataset_id = ds.id LEFT JOIN dataset_view dv ON dv.dataset_id = ds.id AND dv.view_id = """; - private static final String LIST_SCHEMA_DATASETS = """ + private static final String LIST_SCHEMA_DATASETS = """ WITH ids AS ( SELECT dataset_id AS id FROM dataset_schemas WHERE uri = ?1 ), @@ -162,7 +161,7 @@ WHERE dataset_id IN (SELECT id FROM ids) GROUP BY dataset_id LEFT JOIN dataset_view dv ON dv.dataset_id = ds.id WHERE ds.id IN (SELECT id FROM ids) """; - private static final String ALL_LABELS_SELECT = """ + private static final String ALL_LABELS_SELECT = """ SELECT dataset.id as dataset_id, COALESCE(jsonb_object_agg(label.name, lv.value) FILTER (WHERE label.name IS NOT NULL), '{}'::jsonb) AS values FROM dataset @@ -170,7 +169,7 @@ WHERE ds.id IN (SELECT id FROM ids) LEFT JOIN label ON label.id = label_id """; - //@formatter:on + //@formatter:on @Inject EntityManager em; @@ -183,12 +182,6 @@ WHERE ds.id IN (SELECT id FROM ids) @Inject TransactionManager tm; - // This is a nasty hack that will serialize all run -> dataset transformations and label calculations - // The problem is that PostgreSQL's SSI will for some (unknown) reason rollback some transactions, - // probably due to false sharing of locks. For some reason even using advisory locks in DB does not - // solve the issue so we have to serialize this even outside the problematic transactions. - private final ReentrantLock recalculationLock = new ReentrantLock(); - @PermitAll @WithRoles @Override @@ -562,20 +555,7 @@ void updateFingerprints(int testId) { } } - void withRecalculationLock(Runnable runnable) { - recalculationLock.lock(); - try { - runnable.run(); - } finally { - recalculationLock.unlock(); - } - } - public void onNewDataset(Dataset.EventNew event) { - withRecalculationLock(() -> calculateLabelValues(event.testId, event.datasetId, event.labelId, event.isRecalculation)); - } - - public void onNewDatasetNoLock(Dataset.EventNew event) { calculateLabelValues(event.testId, event.datasetId, event.labelId, event.isRecalculation); } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/RunServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/RunServiceImpl.java index 9e9bc4e9e..c27a3ad02 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/RunServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/RunServiceImpl.java @@ -14,6 +14,7 @@ import java.util.Objects; import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -64,6 +65,7 @@ import io.hyperfoil.tools.horreum.api.data.ValidationError; import io.hyperfoil.tools.horreum.api.services.RunService; import io.hyperfoil.tools.horreum.api.services.SchemaService; +import io.hyperfoil.tools.horreum.api.services.TestService; import io.hyperfoil.tools.horreum.bus.AsyncEventChannels; import io.hyperfoil.tools.horreum.datastore.BackendResolver; import io.hyperfoil.tools.horreum.datastore.Datastore; @@ -152,6 +154,8 @@ WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)- @Inject Session session; + private final ConcurrentHashMap transformations = new ConcurrentHashMap<>(); + @Transactional @WithRoles(extras = Roles.HORREUM_SYSTEM) void onTestDeleted(int testId) { @@ -1118,6 +1122,15 @@ public void recalculateAll(String fromStr, String toStr) { } } + /** + * Transforms the data for a given run by applying applicable schemas and transformers. + * It ensures any existing datasets for the run are removed before creating new ones, + * handles timeouts for ongoing transformations, and creates datasets with the transformed data. + * + * @param runId the ID of the run to transform + * @param isRecalculation flag indicating if this is a recalculation + * @return the number of datasets created, or 0 if the run is invalid or not found or already ongoing + */ @WithRoles(extras = Roles.HORREUM_SYSTEM) @Transactional int transform(int runId, boolean isRecalculation) { @@ -1125,7 +1138,22 @@ int transform(int runId, boolean isRecalculation) { log.errorf("Transformation parameters error: run %s", runId); return 0; } + log.debugf("Transforming run ID %d, recalculation? %s", runId, Boolean.toString(isRecalculation)); + int numDatasets = 0; + + // check whether there is an ongoing transformation on the same runId + TestService.RecalculationStatus status = new TestService.RecalculationStatus(1); + TestService.RecalculationStatus prev = transformations.putIfAbsent(runId, status); + // ensure the transformation is removed, with this approach we should be sure + // it gets removed even if transaction-level exception occurs, e.g., timeout + Util.registerTxSynchronization(tm, txStatus -> transformations.remove(runId, status)); + if (prev != null) { + // there is an ongoing transformation that has recently been initiated + log.warnf("Transformation for run %d already in progress", runId); + return numDatasets; + } + // We need to make sure all old datasets are gone before creating new; otherwise we could // break the runid,ordinal uniqueness constraint for (DatasetDAO old : DatasetDAO. list("run.id", runId)) { @@ -1138,9 +1166,8 @@ int transform(int runId, boolean isRecalculation) { RunDAO run = RunDAO.findById(runId); if (run == null) { log.errorf("Cannot load run ID %d for transformation", runId); - return 0; + return numDatasets; // this is 0 } - int ordinal = 0; Map transformerResults = new TreeMap<>(); // naked nodes (those produced by implicit identity transformers) are all added to each dataset List nakedNodes = new ArrayList<>(); @@ -1247,7 +1274,8 @@ int transform(int runId, boolean isRecalculation) { } } else if (!result.isContainerNode() || (result.isObject() && !result.has("$schema")) || (result.isArray() - && StreamSupport.stream(result.spliterator(), false).anyMatch(item -> !item.has("$schema")))) { + && StreamSupport.stream(result.spliterator(), false) + .anyMatch(item -> !item.has("$schema")))) { logMessage(run, PersistentLogDAO.WARN, "Dataset will contain element without a schema."); } JsonNode existing = transformerResults.get(transformerId); @@ -1285,12 +1313,14 @@ int transform(int runId, boolean isRecalculation) { } nakedNodes.add(node); logMessage(run, PersistentLogDAO.DEBUG, - "This test (%d) does not use any transformer for schema %s (key %s), passing as-is.", run.testid, uri, + "This test (%d) does not use any transformer for schema %s (key %s), passing as-is.", run.testid, + uri, key); } } if (schemasAndTransformers > 0) { - int max = transformerResults.values().stream().filter(JsonNode::isArray).mapToInt(JsonNode::size).max().orElse(1); + int max = transformerResults.values().stream().filter(JsonNode::isArray).mapToInt(JsonNode::size).max() + .orElse(1); for (int position = 0; position < max; position += 1) { ArrayNode all = instance.arrayNode(max + nakedNodes.size()); @@ -1305,7 +1335,7 @@ int transform(int runId, boolean isRecalculation) { String message = String.format( "Transformer %d produced an array of %d elements but other transformer " + "produced %d elements; dataset %d/%d might be missing some data.", - entry.getKey(), node.size(), max, run.id, ordinal); + entry.getKey(), node.size(), max, run.id, numDatasets); logMessage(run, PersistentLogDAO.WARN, "%s", message); log.warnf(message); } @@ -1316,18 +1346,18 @@ int transform(int runId, boolean isRecalculation) { } } nakedNodes.forEach(all::add); - createDataset(new DatasetDAO(run, ordinal++, run.description, all), isRecalculation); + createDataset(new DatasetDAO(run, numDatasets++, run.description, all), isRecalculation); } mediator.validateRun(run.id); - return ordinal; } else { + numDatasets = 1; logMessage(run, PersistentLogDAO.INFO, "No applicable schema, dataset will be empty."); createDataset(new DatasetDAO( run, 0, "Empty Dataset for run data without any schema.", instance.arrayNode()), isRecalculation); mediator.validateRun(run.id); - return 1; } + return numDatasets; } private String limitLength(String str) { diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ServiceMediator.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ServiceMediator.java index 036d8fafa..76259fd6a 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ServiceMediator.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ServiceMediator.java @@ -6,6 +6,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantLock; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.control.ActivateRequestContext; @@ -102,6 +103,8 @@ public class ServiceMediator { private Map>> events = new ConcurrentHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); + public ServiceMediator() { } @@ -151,7 +154,6 @@ void updateLabels(Dataset.LabelsUpdatedEvent event) { } void newDataset(Dataset.EventNew eventNew) { - //Note: should we call onNewDataset which will enable a lock? datasetService.onNewDataset(eventNew); } @@ -166,7 +168,7 @@ void newChange(Change.Event event) { @ActivateRequestContext @WithRoles(extras = Roles.HORREUM_SYSTEM) public void processDatasetEvents(Dataset.EventNew newEvent) { - datasetService.onNewDatasetNoLock(newEvent); + newDataset(newEvent); validateDataset(newEvent.datasetId); } @@ -232,8 +234,13 @@ int transform(int runId, boolean isRecalculation) { return runService.transform(runId, isRecalculation); } - void withRecalculationLock(Runnable run) { - datasetService.withRecalculationLock(run); + void withSharedLock(Runnable runnable) { + lock.lock(); + try { + runnable.run(); + } finally { + lock.unlock(); + } } void newExperimentResult(ExperimentService.ExperimentResult result) { diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/TestServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/TestServiceImpl.java index 8fe486618..5a1ed4537 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/TestServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/TestServiceImpl.java @@ -720,7 +720,7 @@ public void recalculateDatasets(int testId) { // transform will add proper roles anyway // messageBus.executeForTest(testId, () -> datasetService.withRecalculationLock(() -> { // mediator.executeBlocking(() -> mediator.transform(runId, true)); - mediator.executeBlocking(() -> mediator.withRecalculationLock(() -> { + mediator.executeBlocking(() -> mediator.withSharedLock(() -> { int newDatasets = 0; try { newDatasets = mediator.transform(runId, true); diff --git a/horreum-web/src/api.tsx b/horreum-web/src/api.tsx index 1ae1ee27b..18e788a4c 100644 --- a/horreum-web/src/api.tsx +++ b/horreum-web/src/api.tsx @@ -342,14 +342,6 @@ export function updateChangeDetection( } -export function updateRunsAndDatasetsAction( - testId: number, - runs: number, - datasets: number -): any { - return {type: "Tests/UPDATE_RUNS_AND_DATASETS", testId, runs, datasets} -} - ///Runs export function fetchRunSummary(id: number, token: string | undefined, alerting: AlertContextType): Promise { return apiCall(runApi.getRunSummary(id, token), alerting, "FETCH_RUN_SUMMARY", "Failed to fetch data for run " + id + ", try uploading a Run and use new Run id instead."); diff --git a/horreum-web/src/domain/tests/RecalculateDatasetsModal.tsx b/horreum-web/src/domain/tests/RecalculateDatasetsModal.tsx index 76c9c45f4..fb98cd5f4 100644 --- a/horreum-web/src/domain/tests/RecalculateDatasetsModal.tsx +++ b/horreum-web/src/domain/tests/RecalculateDatasetsModal.tsx @@ -1,6 +1,6 @@ -import {useCallback, useEffect, useState, useRef, useContext} from "react" +import { useCallback, useEffect, useState, useRef, useContext } from "react" import { Bullseye, Button, Modal, Progress, Spinner } from "@patternfly/react-core" -import {fetchTest, RecalculationStatus, testApi, TestStorage, updateRunsAndDatasetsAction} from "../../api" +import { fetchTest, RecalculationStatus, testApi, TestStorage } from "../../api" import {AppContext} from "../../context/appContext"; import {AppContextType} from "../../context/@types/appContextTypes"; @@ -13,7 +13,7 @@ type RecalculateDatasetsModalProps = { export default function RecalculateDatasetsModal(props: RecalculateDatasetsModalProps) { const { alerting } = useContext(AppContext) as AppContextType; - const [test, setTest] = useState( undefined) + const [test, setTest] = useState(undefined) const [progress, setProgress] = useState(-1) const [status, setStatus] = useState() const timerId = useRef() @@ -32,19 +32,16 @@ export default function RecalculateDatasetsModal(props: RecalculateDatasetsModal if (!props.isOpen) { return } + + // fetch the current test fetchTest(props.testId, alerting).then(setTest) - }, [props.testId]); - useEffect(() => { - if (!props.isOpen) { - return - } + // fetch the latest recalculation status if (test?.runs === undefined) { - testApi.getRecalculationStatus(props.testId).then(status => { - updateRunsAndDatasetsAction(props.testId, status.totalRuns, status.datasets) - }) + testApi.getRecalculationStatus(props.testId).then(setStatus) } - }, [test, props.isOpen]) + }, [props.isOpen]); + return ( "}`} @@ -66,11 +63,6 @@ export default function RecalculateDatasetsModal(props: RecalculateDatasetsModal .then(status => { setStatus(status) setProgress(status.finished) - updateRunsAndDatasetsAction( - props.testId, - status.totalRuns, - status.datasets - ) if (status.finished === status.totalRuns) { onClose() } @@ -112,7 +104,7 @@ export default function RecalculateDatasetsModal(props: RecalculateDatasetsModal )} {progress < 0 && (
- This test has {test?.runs || ""} of runs; do you want to recalculate all datasets? + This test has {status?.totalRuns || ""} of runs; do you want to recalculate all datasets?
)} {progress >= 0 && (