Skip to content

Commit

Permalink
Further improve async evaluation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Aug 2, 2024
1 parent a93499c commit d835eeb
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ public class AsyncIterator<T> implements CloseableIteration<T, QueryEvaluationEx

public AsyncIterator(Supplier<CloseableIteration<T, QueryEvaluationException>> base, Supplier<ExecutorService> executorService) {
nextElements = new ArrayBlockingQueue<>(100);
var currentAsync = InnerJoinIterator.asyncDepth.get();
executorService.get().submit(() -> {
InnerJoinIterator.isAsync.set(true);
InnerJoinIterator.asyncDepth.set(currentAsync != null ? currentAsync + 1 : 1);
var baseIt = base.get();
try {
while (baseIt.hasNext()) {
Expand All @@ -40,7 +41,7 @@ public AsyncIterator(Supplier<CloseableIteration<T, QueryEvaluationException>> b
// just return
} finally {
baseIt.close();
InnerJoinIterator.isAsync.remove();
InnerJoinIterator.asyncDepth.remove();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class InnerJoinIterator extends LookAheadIteration<BindingSet, QueryEvalu
* Variables *
*-----------*/

public static final ThreadLocal<Boolean> isAsync = new ThreadLocal<>();
public static final ThreadLocal<Integer> asyncDepth = new ThreadLocal<>();
public static final int MAX_ASYNC_DEPTH = 3;
private static final BindingSet NULL_BINDINGS = new EmptyBindingSet();
private static final int BATCH_SIZE = 200;
private final EvaluationStrategy strategy;
Expand Down Expand Up @@ -51,7 +52,7 @@ public InnerJoinIterator(EvaluationStrategy strategy, Supplier<ExecutorService>
rightIter = new EmptyIteration<>();
leftIter = leftIt;

if (async && isAsync.get() == Boolean.TRUE) {
if (async && (asyncDepth.get() != null && asyncDepth.get() > InnerJoinIterator.MAX_ASYNC_DEPTH)) {
async = false;
}
joined = async ? new ArrayList<>() : null;
Expand Down Expand Up @@ -140,8 +141,9 @@ private void enqueueNext() {
nextLefts.add(leftIter.next());
}
}
var currentAsync = asyncDepth.get();
executorService.get().submit(() -> {
isAsync.set(true);
asyncDepth.set(currentAsync != null ? currentAsync + 1 : 1);;
var rightIt = useBatch ?
((BatchQueryEvaluationStep) preparedJoinArg).evaluate(nextLefts) :
preparedJoinArg.evaluate(nextLefts.get(0));
Expand All @@ -164,7 +166,7 @@ private void enqueueNext() {
// just return
} finally {
rightIt.close();
isAsync.remove();
asyncDepth.remove();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,19 @@ public static <V> CloseableIteration<BindingSet, QueryEvaluationException> getIn
QueryEvaluationStep leftPrepared,
QueryEvaluationStep preparedRight, BindingSet bindings, Comparator<V> cmp,
Function<BindingSet, V> value, QueryEvaluationContext context, Supplier<ExecutorService> executorService) {
CloseableIteration<BindingSet, QueryEvaluationException> leftIter;
if (InnerJoinIterator.isAsync.get() != Boolean.TRUE) {
leftIter = new AsyncIterator<>(() -> leftPrepared.evaluate(bindings), executorService);
} else {
leftIter = leftPrepared.evaluate(bindings);
if (leftIter == QueryEvaluationStep.EMPTY_ITERATION) {
return leftIter;
}
CloseableIteration<BindingSet, QueryEvaluationException> leftIter = leftPrepared.evaluate(bindings);
if (leftIter == QueryEvaluationStep.EMPTY_ITERATION) {
return leftIter;
}

CloseableIteration<BindingSet, QueryEvaluationException> rightIter = preparedRight.evaluate(bindings);
if (rightIter == QueryEvaluationStep.EMPTY_ITERATION) {
leftIter.close();
return rightIter;
CloseableIteration<BindingSet, QueryEvaluationException> rightIter;
if (InnerJoinIterator.asyncDepth.get() == null || InnerJoinIterator.asyncDepth.get() < InnerJoinIterator.MAX_ASYNC_DEPTH) {
rightIter = new AsyncIterator<>(() -> preparedRight.evaluate(bindings), executorService);
} else {
rightIter = preparedRight.evaluate(bindings);
if (rightIter == QueryEvaluationStep.EMPTY_ITERATION) {
return rightIter;
}
}

return new InnerMergeJoinIterator<>(new PeekMarkIterator<>(leftIter), new PeekMarkIterator<>(rightIter), cmp, value, context);
Expand Down

0 comments on commit d835eeb

Please sign in to comment.