Skip to content

Commit

Permalink
Remove hardcoded test recalc timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Lamparelli <[email protected]>
  • Loading branch information
lampajr authored and johnaohara committed Oct 25, 2024
1 parent 69bdf59 commit 08b7300
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
Expand Down Expand Up @@ -103,8 +102,6 @@ public class ServiceMediator {

private Map<AsyncEventChannels, Map<Integer, BlockingQueue<Object>>> events = new ConcurrentHashMap<>();

private final ReentrantLock lock = new ReentrantLock();

public ServiceMediator() {
}

Expand Down Expand Up @@ -234,15 +231,6 @@ int transform(int runId, boolean isRecalculation) {
return runService.transform(runId, isRecalculation);
}

void withSharedLock(Runnable runnable) {
lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}

void newExperimentResult(ExperimentService.ExperimentResult result) {
actionService.onNewExperimentResult(result);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.hyperfoil.tools.horreum.svc;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -12,7 +11,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import jakarta.annotation.security.PermitAll;
Expand Down Expand Up @@ -687,21 +685,14 @@ public void recalculateDatasets(int testId) {
RecalculationStatus status = new RecalculationStatus(RunDAO.count("testid = ?1 AND trashed = false", testId));
// we don't have to care about races with new runs
RecalculationStatus prev = recalculations.putIfAbsent(testId, status);
while (prev != null) {
log.debugf("Recalculation for test %d (%s) already in progress", testId, test.name);
if (prev.timestamp < System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(10)) {
log.warnf("Recalculation for test %d (%s) from %s timed out after 10 minutes with %d/%d runs",
testId, test.name, Instant.ofEpochMilli(prev.timestamp), prev.finished, prev.totalRuns);
if (recalculations.replace(testId, prev, status)) {
log.debug("Continuing with recalculation.");
break;
} else {
prev = recalculations.get(testId);
}
} else {
return;
}
// ensure the recalculation is removed, with this approach we should guarantee
// it gets removed even if transaction-level exception occurs, e.g., timeout
Util.registerTxSynchronization(tm, txStatus -> recalculations.remove(testId, status));
if (prev != null) {
log.infof("Recalculation for test %d (%s) already in progress", testId, test.name);
return;
}

long deleted = em
.createNativeQuery(
"DELETE FROM dataset USING run WHERE run.id = dataset.runid AND run.trashed AND dataset.testid = ?1")
Expand All @@ -710,31 +701,32 @@ public void recalculateDatasets(int testId) {
log.debugf("Deleted %d datasets for trashed runs in test %s (%d)", deleted, test.name, (Object) testId);
}

ScrollableResults results = em.createNativeQuery("SELECT id FROM run WHERE testid = ?1 AND NOT trashed ORDER BY start")
try (ScrollableResults results = em
.createNativeQuery("SELECT id FROM run WHERE testid = ?1 AND NOT trashed ORDER BY start")
.setParameter(1, testId)
.unwrap(NativeQuery.class).setReadOnly(true).setFetchSize(100)
.scroll(ScrollMode.FORWARD_ONLY);
while (results.next()) {
int runId = (int) results.get();
log.debugf("Recalculate Datasets for run %d - forcing recalculation for test %d (%s)", runId, testId, test.name);
// transform will add proper roles anyway
// messageBus.executeForTest(testId, () -> datasetService.withRecalculationLock(() -> {
// mediator.executeBlocking(() -> mediator.transform(runId, true));
mediator.executeBlocking(() -> mediator.withSharedLock(() -> {
int newDatasets = 0;
try {
newDatasets = mediator.transform(runId, true);
// mediator.queueRunRecalculation(runId);
} finally {
synchronized (status) {
status.finished++;
status.datasets += newDatasets;
if (status.finished == status.totalRuns) {
recalculations.remove(testId, status);
.scroll(ScrollMode.FORWARD_ONLY)) {
while (results.next()) {
int runId = (int) results.get();
log.debugf("Recalculate Datasets for run %d - forcing recalculation for test %d (%s)", runId, testId,
test.name);

mediator.executeBlocking(() -> {
int newDatasets = 0;
try {
newDatasets = mediator.transform(runId, true);
// mediator.queueRunRecalculation(runId);
} finally {
synchronized (status) {
status.finished++;
status.datasets += newDatasets;
if (status.finished == status.totalRuns) {
recalculations.remove(testId, status);
}
}
}
}
}));
});
}
}
}

Expand Down

0 comments on commit 08b7300

Please sign in to comment.