Skip to content

Commit

Permalink
fix: prevent possible concurrent modification of a collection
Browse files Browse the repository at this point in the history
We have seen reports of the TreeMap getting concurrently accessed and throwing exceptions. Unfortunately, we have never seen any code to reproduce these failures, as concurrency issues are notoriously hard to reproduce. Therefore we refactor the mechanism to make any such issues impossible.

For the same reason as there is no reproducer, there is also no new test coverage. But existing test coverage still passes.
  • Loading branch information
triceo committed Jan 31, 2025
1 parent 82e7841 commit fe24fce
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public interface Solver<Solution_> {
* <p>
* To learn more about problem change semantics, please refer to the {@link ProblemChange} Javadoc.
*
* @see #addProblemChanges(List)
* @see ProblemChange Learn more about problem change semantics.
* @see #addProblemChanges(List) Submit multiple problem changes at once.
*/
void addProblemChange(@NonNull ProblemChange<Solution_> problemChange);

Expand All @@ -105,7 +106,7 @@ public interface Solver<Solution_> {
* <p>
* To learn more about problem change semantics, please refer to the {@link ProblemChange} Javadoc.
*
* @see #addProblemChange(ProblemChange)
* @see ProblemChange Learn more about problem change semantics.
*/
void addProblemChanges(@NonNull List<ProblemChange<Solution_>> problemChangeList);

Expand All @@ -131,7 +132,7 @@ public interface Solver<Solution_> {
*
* @deprecated Prefer {@link #addProblemChange(ProblemChange)}.
* @return true (as specified by {@link Collection#add})
* @see #addProblemFactChanges(List)
* @see #addProblemChanges(List)
*/
@Deprecated(forRemoval = true)
boolean addProblemFactChange(@NonNull ProblemFactChange<Solution_> problemFactChange);
Expand All @@ -149,7 +150,7 @@ public interface Solver<Solution_> {
*
* @deprecated Prefer {@link #addProblemChanges(List)}.
* @return true (as specified by {@link Collection#add})
* @see #addProblemFactChange(ProblemFactChange)
* @see #addProblemChange(ProblemChange)
*/
@Deprecated(forRemoval = true)
boolean addProblemFactChanges(@NonNull List<ProblemFactChange<Solution_>> problemFactChangeList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ public interface SolverManager<Solution_, ProblemId_> extends AutoCloseable {
* or {@link #solveAndListen(Object, Object, Consumer)}
* @return completes after the best solution containing this change has been consumed.
* @throws IllegalStateException if there is no solver actively solving the problem associated with the problemId
* @see ProblemChange Learn more about problem change semantics.
*/
@NonNull
CompletableFuture<Void> addProblemChange(@NonNull ProblemId_ problemId, @NonNull ProblemChange<Solution_> problemChange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,42 @@

import ai.timefold.solver.core.api.domain.entity.PlanningEntity;
import ai.timefold.solver.core.api.domain.solution.PlanningSolution;
import ai.timefold.solver.core.api.domain.variable.VariableListener;
import ai.timefold.solver.core.api.score.Score;
import ai.timefold.solver.core.api.solver.Solver;
import ai.timefold.solver.core.api.solver.event.BestSolutionChangedEvent;
import ai.timefold.solver.core.impl.heuristic.move.Move;

import org.jspecify.annotations.NonNull;

/**
* A ProblemChange represents a change in one or more {@link PlanningEntity planning entities} or problem facts
* of a {@link PlanningSolution}.
* <p>
* The {@link Solver} checks the presence of waiting problem changes after every
* {@link ai.timefold.solver.core.impl.heuristic.move.Move} evaluation. If there are waiting problem changes,
* the {@link Solver}:
* The {@link Solver} checks the presence of waiting problem changes after every {@link Move} evaluation.
* If there are waiting problem changes, the {@link Solver}:
* <ol>
* <li>clones the last {@link PlanningSolution best solution} and sets the clone
* as the new {@link PlanningSolution working solution}</li>
* <li>clones the last {@link PlanningSolution best solution}
* and sets the clone as the new {@link PlanningSolution working solution}</li>
* <li>applies every problem change keeping the order in which problem changes have been submitted;
* after every problem change, {@link ai.timefold.solver.core.api.domain.variable.VariableListener variable listeners}
* are triggered
* after every problem change, {@link VariableListener variable listeners} are triggered
* <li>calculates the score and makes the {@link PlanningSolution updated working solution}
* the new {@link PlanningSolution best solution}; note that this {@link PlanningSolution solution} is not published
* via the {@link ai.timefold.solver.core.api.solver.event.BestSolutionChangedEvent}, as it hasn't been initialized yet</li>
* the new {@link PlanningSolution best solution};
* note that this {@link PlanningSolution solution} is not published via the {@link BestSolutionChangedEvent},
* as it hasn't been initialized yet</li>
* <li>restarts solving to fill potential uninitialized {@link PlanningEntity planning entities}</li>
* </ol>
* <p>
* From the above, it follows that the solver will require some time
* to restart solving and produce the next best solution.
* For that reason, it is recommended to submit problem changes in batches, rather than one by one.
* If there is not enough time between problem changes,
* the solver may not have enough time to produce a new best solution
* and the barrage of problem changes will have effectively caused the optimization process to stop.
* It is impossible to say with certainty how much time is needed between problem changes,
* as it depends on the problem size, complexity, and the speed of the {@link Score} calculation.
* But in general, problem changes should be separated by at least a few seconds, if not minutes.
* <p>
* Note that the {@link Solver} clones a {@link PlanningSolution} at will.
* Any change must be done on the problem facts and planning entities referenced by the {@link PlanningSolution}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import ai.timefold.solver.core.impl.solver.scope.SolverScope;
import ai.timefold.solver.core.impl.solver.termination.Termination;

import org.jspecify.annotations.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -138,12 +137,12 @@ public void stepEnded(AbstractStepScope<Solution_> stepScope) {
// ************************************************************************

@Override
public void addEventListener(@NonNull SolverEventListener<Solution_> eventListener) {
public void addEventListener(SolverEventListener<Solution_> eventListener) {
solverEventSupport.addEventListener(eventListener);
}

@Override
public void removeEventListener(@NonNull SolverEventListener<Solution_> eventListener) {
public void removeEventListener(SolverEventListener<Solution_> eventListener) {
solverEventSupport.removeEventListener(eventListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,106 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;

import ai.timefold.solver.core.api.solver.Solver;
import ai.timefold.solver.core.api.solver.change.ProblemChange;

import org.jspecify.annotations.NonNull;

import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

/**
* The goal of this class is to register problem changes and best solutions in a thread-safe way.
* Problem changes are {@link #addProblemChange(Solver, ProblemChange) put in a queue}
* and later associated with the best solution which contains them.
* The best solution is associated with a version number
* that is incremented each time a {@link #set new best solution is set}.
* The best solution is {@link #take() taken} together with all problem changes
* that were registered before the best solution was set.
*
* <p>
* This class needs to be thread-safe.
* Due to complicated interactions between the solver, solver manager and problem changes,
* it is best if we avoid explicit locking here,
* reducing cognitive complexity of the whole system.
* The core idea being to never modify the same data structure from multiple threads;
* instead, we replace the data structure with a new one atomically.
* The code contains comments throughout the class that explain the reasoning behind the design.
*
* @param <Solution_>
*/
@NullMarked
final class BestSolutionHolder<Solution_> {

private final Lock problemChangesLock = new ReentrantLock();
private final AtomicReference<VersionedBestSolution<Solution_>> versionedBestSolutionRef = new AtomicReference<>();
private final SortedMap<BigInteger, List<CompletableFuture<Void>>> problemChangesPerVersion =
new TreeMap<>();
private BigInteger currentVersion = BigInteger.ZERO;
private final AtomicReference<@Nullable VersionedBestSolution<Solution_>> versionedBestSolutionRef =
new AtomicReference<>();
private final AtomicReference<SortedMap<BigInteger, List<CompletableFuture<Void>>>> problemChangesPerVersionRef =
new AtomicReference<>(createNewProblemChangesMap());
// The version is BigInteger to avoid long overflow.
// The solver can run potentially forever, so long overflow is a (remote) possibility.
private final AtomicReference<BigInteger> currentVersion = new AtomicReference<>(BigInteger.ZERO);
private final AtomicReference<BigInteger> lastProcessedVersion = new AtomicReference<>(BigInteger.valueOf(-1));

private static SortedMap<BigInteger, List<CompletableFuture<Void>>> createNewProblemChangesMap() {
return createNewProblemChangesMap(Collections.emptySortedMap());
}

private static SortedMap<BigInteger, List<CompletableFuture<Void>>>
createNewProblemChangesMap(SortedMap<BigInteger, List<CompletableFuture<Void>>> map) {
return new TreeMap<>(map);
}

boolean isEmpty() {
return versionedBestSolutionRef.get() == null;
}

/**
* NOT thread-safe.
*
* @return the last best solution together with problem changes the solution contains.
* If there is no new best solution, returns null.
*/
@Nullable
BestSolutionContainingProblemChanges<Solution_> take() {
VersionedBestSolution<Solution_> versionedBestSolution = versionedBestSolutionRef.getAndSet(null);
var versionedBestSolution = versionedBestSolutionRef.getAndSet(null);
if (versionedBestSolution == null) {
return null;
}
SortedMap<BigInteger, List<CompletableFuture<Void>>> containedProblemChangesPerVersion =
problemChangesPerVersion.headMap(versionedBestSolution.getVersion().add(BigInteger.ONE));

List<CompletableFuture<Void>> containedProblemChanges = new ArrayList<>();
for (Map.Entry<BigInteger, List<CompletableFuture<Void>>> entry : containedProblemChangesPerVersion.entrySet()) {
containedProblemChanges.addAll(entry.getValue());
problemChangesPerVersion.remove(entry.getKey());
var bestSolutionVersion = versionedBestSolution.version();
var lastProcessedVersion = this.lastProcessedVersion.getAndUpdate(bestSolutionVersion::max);
if (lastProcessedVersion.compareTo(bestSolutionVersion) > 0) {
// Corner case: The best solution has already been taken,
// because a later take() was scheduled to run before an earlier take().
// This causes the later take() to return the latest best solution and all the problem changes,
// and the earlier best solution to be skipped entirely.
return null;
}

return new BestSolutionContainingProblemChanges<>(versionedBestSolution.getBestSolution(),
containedProblemChanges);
// The map is replaced by a map containing only the problem changes that are not contained in the best solution.
// This is done atomically, so no other thread can access the old map anymore.
// The old map can then be processed by the current thread without synchronization.
// The copying of maps is possibly expensive, but due to the nature of problem changes,
// we do not expect the map to ever get too big.
// It is not practical to submit a problem change every second, as that gives the solver no time to react.
// This limits the size of the map on input.
// The solver also finds new best solutions, which regularly trims the size of the map as well.
var boundaryVersion = bestSolutionVersion.add(BigInteger.ONE);
var oldProblemChangesPerVersion =
problemChangesPerVersionRef.getAndUpdate(map -> createNewProblemChangesMap(map.tailMap(boundaryVersion)));
// At this point, the old map is not accessible to any other thread.
// We also do not need to clear it, because this being the only reference,
// garbage collector will do it for us.
var containedProblemChanges = oldProblemChangesPerVersion.headMap(boundaryVersion)
.values()
.stream()
.flatMap(Collection::stream)
.toList();
return new BestSolutionContainingProblemChanges<>(versionedBestSolution.bestSolution(), containedProblemChanges);
}

/**
Expand All @@ -61,77 +113,56 @@ BestSolutionContainingProblemChanges<Solution_> take() {
* @param isEveryProblemChangeProcessed a supplier that tells if all problem changes have been processed
*/
void set(Solution_ bestSolution, BooleanSupplier isEveryProblemChangeProcessed) {
problemChangesLock.lock();
try {
/*
* The new best solution can be accepted only if there are no pending problem changes nor any additional
* changes may come during this operation. Otherwise, a race condition might occur that leads to associating
* problem changes with a solution that was created later, but does not contain them yet.
* As a result, CompletableFutures representing these changes would be completed too early.
*/
if (isEveryProblemChangeProcessed.getAsBoolean()) {
versionedBestSolutionRef.set(new VersionedBestSolution(bestSolution, currentVersion));
currentVersion = currentVersion.add(BigInteger.ONE);
}
} finally {
problemChangesLock.unlock();
/*
* The new best solution can be accepted only if there are no pending problem changes
* nor any additional changes may come during this operation.
* Otherwise, a race condition might occur
* that leads to associating problem changes with a solution that was created later,
* but does not contain them yet.
* As a result, CompletableFutures representing these changes would be completed too early.
*/
if (isEveryProblemChangeProcessed.getAsBoolean()) {
// This field is atomic, so we can safely set the new best solution without synchronization.
versionedBestSolutionRef.set(
new VersionedBestSolution<>(bestSolution, currentVersion.getAndUpdate(old -> old.add(BigInteger.ONE))));
}
}

/**
* Adds a new problem change to a solver and registers the problem change to be later retrieved together with
* a relevant best solution by the {@link #take()} method.
* Adds a new problem change to a solver and registers the problem change
* to be later retrieved together with a relevant best solution by the {@link #take()} method.
*
* @return CompletableFuture that will be completed after the best solution containing this change is passed to
* a user-defined Consumer.
*/
@NonNull
CompletableFuture<Void> addProblemChange(Solver<Solution_> solver, ProblemChange<Solution_> problemChange) {
problemChangesLock.lock();
try {
CompletableFuture<Void> futureProblemChange = new CompletableFuture<>();
problemChangesPerVersion.compute(currentVersion, (version, futureProblemChangeList) -> {
if (futureProblemChangeList == null) {
futureProblemChangeList = new ArrayList<>();
}
futureProblemChangeList.add(futureProblemChange);
return futureProblemChangeList;
});
var futureProblemChange = new CompletableFuture<Void>();
synchronized (this) {
// This actually needs to be synchronized,
// as we want the new problem change and its version to be linked.
var futureProblemChangeList =
problemChangesPerVersionRef.get().computeIfAbsent(currentVersion.get(), version -> new ArrayList<>());
futureProblemChangeList.add(futureProblemChange);
solver.addProblemChange(problemChange);
return futureProblemChange;
} finally {
problemChangesLock.unlock();
}
return futureProblemChange;
}

void cancelPendingChanges() {
problemChangesLock.lock();
try {
problemChangesPerVersion.values()
.stream()
.flatMap(Collection::stream)
.forEach(pendingProblemChange -> pendingProblemChange.cancel(false));
problemChangesPerVersion.clear();
} finally {
problemChangesLock.unlock();
}
// The map is an atomic reference.
// We first replace the reference with a new map atomically, avoiding synchronization issues.
// Then we process the old map, which is safe because no one can access it anymore.
// We do not need to clear it, because this being the only reference,
// the garbage collector will do it for us.
problemChangesPerVersionRef.getAndSet(createNewProblemChangesMap())
.values()
.stream()
.flatMap(Collection::stream)
.forEach(pendingProblemChange -> pendingProblemChange.cancel(false));
}

private static final class VersionedBestSolution<Solution_> {
final Solution_ bestSolution;
final BigInteger version;

public VersionedBestSolution(Solution_ bestSolution, BigInteger version) {
this.bestSolution = bestSolution;
this.version = version;
}

public Solution_ getBestSolution() {
return bestSolution;
}

public BigInteger getVersion() {
return version;
}
private record VersionedBestSolution<Solution_>(Solution_ bestSolution, BigInteger version) {
}

}
Loading

0 comments on commit fe24fce

Please sign in to comment.