Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add solver job start event #1084

Merged
merged 8 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ default SolverJobBuilder<Solution_, ProblemId_> withProblem(Solution_ problem) {
SolverJobBuilder<Solution_, ProblemId_>
withFirstInitializedSolutionConsumer(Consumer<? super Solution_> firstInitializedSolutionConsumer);

/**
* Sets the consumer for when the solver starts its solving process.
*
* @param solverJobStartedConsumer never null, called only once when the solver is starting the solving process
* @return this, never null
*/
SolverJobBuilder<Solution_, ProblemId_> withSolverJobStartedConsumer(Consumer<? super Solution_> solverJobStartedConsumer);

/**
* Sets the custom exception handler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,31 @@ final class ConsumerSupport<Solution_, ProblemId_> implements AutoCloseable {
private final Consumer<? super Solution_> bestSolutionConsumer;
private final Consumer<? super Solution_> finalBestSolutionConsumer;
private final Consumer<? super Solution_> firstInitializedSolutionConsumer;
private final Consumer<? super Solution_> solverJobStartedConsumer;
private final BiConsumer<? super ProblemId_, ? super Throwable> exceptionHandler;
private final Semaphore activeConsumption = new Semaphore(1);
private final Semaphore firstSolutionConsumption = new Semaphore(1);
private final Semaphore startSolverJobConsumption = new Semaphore(1);
private final BestSolutionHolder<Solution_> bestSolutionHolder;
private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
private Solution_ firstInitializedSolution;
private Solution_ initialSolution;

public ConsumerSupport(ProblemId_ problemId, Consumer<? super Solution_> bestSolutionConsumer,
Consumer<? super Solution_> finalBestSolutionConsumer, Consumer<? super Solution_> firstInitializedSolutionConsumer,
Consumer<? super Solution_> solverJobStartedConsumer,
BiConsumer<? super ProblemId_, ? super Throwable> exceptionHandler,
BestSolutionHolder<Solution_> bestSolutionHolder) {
this.problemId = problemId;
this.bestSolutionConsumer = bestSolutionConsumer;
this.finalBestSolutionConsumer = finalBestSolutionConsumer == null ? finalBestSolution -> {
} : finalBestSolutionConsumer;
this.firstInitializedSolutionConsumer = firstInitializedSolutionConsumer;
this.solverJobStartedConsumer = solverJobStartedConsumer;
this.exceptionHandler = exceptionHandler;
this.bestSolutionHolder = bestSolutionHolder;
this.firstInitializedSolution = null;
this.initialSolution = null;
}

// Called on the Solver thread.
Expand Down Expand Up @@ -62,14 +68,25 @@ void consumeFirstInitializedSolution(Solution_ firstInitializedSolution) {
scheduleFirstInitializedSolutionConsumption();
}

// Called on the consumer thread
void consumeStartSolverJob(Solution_ initialSolution) {
try {
// Called on the solver thread
// During the solving process, this lock is called once, and it won't block the Solver thread
startSolverJobConsumption.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for the start solver job consumption.");
}
// called on the Consumer thread
this.initialSolution = initialSolution;
scheduleStartJobConsumption();
}

// Called on the Solver thread after Solver#solve() returns.
void consumeFinalBestSolution(Solution_ finalBestSolution) {
try {
// Wait for the previous consumption to complete.
// As the solver has already finished, holding the solver thread is not an issue.
activeConsumption.acquire();
// Wait for the first solution consumption to complete
firstSolutionConsumption.acquire();
acquireAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for the final best solution consumption.");
Expand All @@ -88,12 +105,14 @@ void consumeFinalBestSolution(Solution_ finalBestSolution) {
} finally {
// If there is no intermediate best solution consumer, complete the problem changes now.
if (bestSolutionConsumer == null) {
bestSolutionHolder.take().completeProblemChanges();
var solutionHolder = bestSolutionHolder.take();
if (solutionHolder != null) {
solutionHolder.completeProblemChanges();
}
}
// Cancel problem changes that arrived after the solver terminated.
bestSolutionHolder.cancelPendingChanges();
activeConsumption.release();
firstSolutionConsumption.release();
releaseAll();
disposeConsumerThread();
}
});
Expand Down Expand Up @@ -135,29 +154,66 @@ private CompletableFuture<Void> scheduleIntermediateBestSolutionConsumption() {

/**
* Called on the Consumer thread.
* Don't call without locking firstSolutionConsumption, because the consumption may not be executed before the final best
* solution is executed.
* Don't call without locking firstSolutionConsumption,
* because the consumption may not be executed before the final best solution is executed.
*/
private void scheduleFirstInitializedSolutionConsumption() {
scheduleConsumption(firstSolutionConsumption, firstInitializedSolutionConsumer, firstInitializedSolution);
}

/**
* Called on the Consumer thread.
* Don't call without locking startSolverJobConsumption,
* because the consumption may not be executed before the final best solution is executed.
*/
private void scheduleStartJobConsumption() {
scheduleConsumption(startSolverJobConsumption, solverJobStartedConsumer, initialSolution);
}

private void scheduleConsumption(Semaphore semaphore, Consumer<? super Solution_> consumer, Solution_ solution) {
CompletableFuture.runAsync(() -> {
try {
if (firstInitializedSolutionConsumer != null && firstInitializedSolution != null) {
firstInitializedSolutionConsumer.accept(firstInitializedSolution);
if (consumer != null && solution != null) {
consumer.accept(solution);
}
} catch (Throwable throwable) {
if (exceptionHandler != null) {
exceptionHandler.accept(problemId, throwable);
}
} finally {
firstSolutionConsumption.release();
semaphore.release();
}
}, consumerExecutor);
}

private void acquireAll() throws InterruptedException {
// Wait for the previous consumption to complete.
// As the solver has already finished, holding the solver thread is not an issue.
activeConsumption.acquire();
// Wait for the start job event to complete
startSolverJobConsumption.acquire();
// Wait for the first solution consumption to complete
firstSolutionConsumption.acquire();
}

private void releaseAll() {
activeConsumption.release();
startSolverJobConsumption.release();
firstSolutionConsumption.release();
}

@Override
public void close() {
disposeConsumerThread();
bestSolutionHolder.cancelPendingChanges();
try {
acquireAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for closing the consumer.");
} finally {
disposeConsumerThread();
bestSolutionHolder.cancelPendingChanges();
releaseAll();
}
}

private void disposeConsumerThread() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class DefaultSolverJob<Solution_, ProblemId_> implements SolverJob<
private final Consumer<? super Solution_> bestSolutionConsumer;
private final Consumer<? super Solution_> finalBestSolutionConsumer;
private final Consumer<? super Solution_> firstInitializedSolutionConsumer;
private final Consumer<? super Solution_> solverJobStartedConsumer;
private final BiConsumer<? super ProblemId_, ? super Throwable> exceptionHandler;

private volatile SolverStatus solverStatus;
Expand All @@ -63,6 +64,7 @@ public DefaultSolverJob(
Consumer<? super Solution_> bestSolutionConsumer,
Consumer<? super Solution_> finalBestSolutionConsumer,
Consumer<? super Solution_> firstInitializedSolutionConsumer,
Consumer<? super Solution_> solverJobStartedConsumer,
BiConsumer<? super ProblemId_, ? super Throwable> exceptionHandler) {
this.solverManager = solverManager;
this.problemId = problemId;
Expand All @@ -75,6 +77,7 @@ public DefaultSolverJob(
this.bestSolutionConsumer = bestSolutionConsumer;
this.finalBestSolutionConsumer = finalBestSolutionConsumer;
this.firstInitializedSolutionConsumer = firstInitializedSolutionConsumer;
this.solverJobStartedConsumer = solverJobStartedConsumer;
this.exceptionHandler = exceptionHandler;
solverStatus = SolverStatus.SOLVING_SCHEDULED;
terminatedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -108,13 +111,15 @@ public Solution_ call() {
solverStatus = SolverStatus.SOLVING_ACTIVE;
// Create the consumer thread pool only when this solver job is active.
consumerSupport = new ConsumerSupport<>(getProblemId(), bestSolutionConsumer, finalBestSolutionConsumer,
firstInitializedSolutionConsumer, exceptionHandler, bestSolutionHolder);
firstInitializedSolutionConsumer, solverJobStartedConsumer, exceptionHandler, bestSolutionHolder);

Solution_ problem = problemFinder.apply(problemId);
// add a phase lifecycle listener that unlock the solver status lock when solving started
solver.addPhaseLifecycleListener(new UnlockLockPhaseLifecycleListener());
// add a phase lifecycle listener that consumes the first initialized solution
solver.addPhaseLifecycleListener(new FirstInitializedSolutionPhaseLifecycleListener(consumerSupport));
// add a phase lifecycle listener once when the solver starts its execution
solver.addPhaseLifecycleListener(new StartSolverJobPhaseLifecycleListener(consumerSupport));
solver.addEventListener(this::onBestSolutionChangedEvent);
final Solution_ finalBestSolution = solver.solve(problem);
consumerSupport.consumeFinalBestSolution(finalBestSolution);
Expand Down Expand Up @@ -146,6 +151,7 @@ private void solvingTerminated() {
solverStatus = SolverStatus.NOT_SOLVING;
solverManager.unregisterSolverJob(problemId);
terminatedLatch.countDown();
close();
}

// TODO Future features
Expand Down Expand Up @@ -306,4 +312,21 @@ public void phaseEnded(AbstractPhaseScope<Solution_> phaseScope) {
}
}
}

/**
* A listener that is triggered once when the solver starts the solving process.
*/
private final class StartSolverJobPhaseLifecycleListener extends PhaseLifecycleListenerAdapter<Solution_> {

private final ConsumerSupport<Solution_, ProblemId_> consumerSupport;

public StartSolverJobPhaseLifecycleListener(ConsumerSupport<Solution_, ProblemId_> consumerSupport) {
this.consumerSupport = consumerSupport;
}

@Override
public void solvingStarted(SolverScope<Solution_> solverScope) {
consumerSupport.consumeStartSolverJob(solverScope.getWorkingSolution());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class DefaultSolverJobBuilder<Solution_, ProblemId_> implements Sol
private Consumer<? super Solution_> bestSolutionConsumer;
private Consumer<? super Solution_> finalBestSolutionConsumer;
private Consumer<? super Solution_> initializedSolutionConsumer;
private Consumer<? super Solution_> solverJobStartedConsumer;
private BiConsumer<? super ProblemId_, ? super Throwable> exceptionHandler;
private SolverConfigOverride<Solution_> solverConfigOverride;

Expand Down Expand Up @@ -66,6 +67,14 @@ public SolverJobBuilder<Solution_, ProblemId_> withBestSolutionConsumer(Consumer
return this;
}

@Override
public SolverJobBuilder<Solution_, ProblemId_>
withSolverJobStartedConsumer(Consumer<? super Solution_> solverJobStartedConsumer) {
this.solverJobStartedConsumer = Objects.requireNonNull(solverJobStartedConsumer,
"Invalid startSolverJobHandler (null) given to SolverJobBuilder.");
return this;
}

@Override
public SolverJobBuilder<Solution_, ProblemId_>
withExceptionHandler(BiConsumer<? super ProblemId_, ? super Throwable> exceptionHandler) {
Expand All @@ -90,10 +99,10 @@ public SolverJob<Solution_, ProblemId_> run() {

if (this.bestSolutionConsumer == null) {
return solverManager.solve(problemId, problemFinder, null, finalBestSolutionConsumer,
initializedSolutionConsumer, exceptionHandler, solverConfigOverride);
initializedSolutionConsumer, solverJobStartedConsumer, exceptionHandler, solverConfigOverride);
} else {
return solverManager.solveAndListen(problemId, problemFinder, bestSolutionConsumer, finalBestSolutionConsumer,
initializedSolutionConsumer, exceptionHandler, solverConfigOverride);
initializedSolutionConsumer, solverJobStartedConsumer, exceptionHandler, solverConfigOverride);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,22 @@ protected SolverJob<Solution_, ProblemId_> solveAndListen(ProblemId_ problemId,
Consumer<? super Solution_> bestSolutionConsumer,
Consumer<? super Solution_> finalBestSolutionConsumer,
Consumer<? super Solution_> initializedSolutionConsumer,
Consumer<? super Solution_> solverJobStartedConsumer,
BiConsumer<? super ProblemId_, ? super Throwable> exceptionHandler,
SolverConfigOverride<Solution_> solverConfigOverride) {
if (bestSolutionConsumer == null) {
throw new IllegalStateException("The consumer bestSolutionConsumer is required.");
}
return solve(getProblemIdOrThrow(problemId), problemFinder, bestSolutionConsumer, finalBestSolutionConsumer,
initializedSolutionConsumer, exceptionHandler, solverConfigOverride);
initializedSolutionConsumer, solverJobStartedConsumer, exceptionHandler, solverConfigOverride);
}

protected SolverJob<Solution_, ProblemId_> solve(ProblemId_ problemId,
Function<? super ProblemId_, ? extends Solution_> problemFinder,
Consumer<? super Solution_> bestSolutionConsumer,
Consumer<? super Solution_> finalBestSolutionConsumer,
Consumer<? super Solution_> initializedSolutionConsumer,
Consumer<? super Solution_> solverJobStartedConsumer,
BiConsumer<? super ProblemId_, ? super Throwable> exceptionHandler,
SolverConfigOverride<Solution_> configOverride) {
Solver<Solution_> solver = solverFactory.buildSolver(configOverride);
Expand All @@ -111,7 +113,8 @@ protected SolverJob<Solution_, ProblemId_> solve(ProblemId_ problemId,
throw new IllegalStateException("The problemId (" + problemId + ") is already solving.");
} else {
return new DefaultSolverJob<>(this, solver, problemId, problemFinder, bestSolutionConsumer,
finalBestSolutionConsumer, initializedSolutionConsumer, finalExceptionHandler);
finalBestSolutionConsumer, initializedSolutionConsumer, solverJobStartedConsumer,
finalExceptionHandler);
}
});
Future<Solution_> future = solverThreadPool.submit(solverJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import ai.timefold.solver.core.impl.testdata.util.PlannerTestUtils;

import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -461,6 +462,28 @@ void firstInitializedSolutionConsumerWith2Custom() throws ExecutionException, In
assertThat(hasInitializedSolution.booleanValue()).isFalse();
}

@Test
@Timeout(60)
void testStartJobConsumer() throws ExecutionException, InterruptedException {
SolverConfig solverConfig = PlannerTestUtils
.buildSolverConfig(TestdataSolution.class, TestdataEntity.class);
solverManager = SolverManager
.create(solverConfig, new SolverManagerConfig());

Function<Object, TestdataUnannotatedExtendedSolution> problemFinder = o -> new TestdataUnannotatedExtendedSolution(
PlannerTestUtils.generateTestdataSolution("s1"));

MutableInt started = new MutableInt(0);

SolverJob<TestdataSolution, Long> solverJob = solverManager.solveBuilder()
.withProblemId(1L)
.withProblemFinder(problemFinder)
.withSolverJobStartedConsumer(solution -> started.increment())
.run();
solverJob.getFinalBestSolution();
assertThat(started.getValue()).isOne();
}

@Test
void solveWithOverride() {
// Default spent limit is 1L
Expand Down
Loading
Loading