Skip to content

Commit

Permalink
Support simple async inner joins.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jul 17, 2024
1 parent c2adef2 commit f5abaaa
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,12 @@ protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context)
if (leftDependsOnRight) {
// swap left and right argument
return bindingSet -> new InnerJoinIterator(AasEvaluationStrategy.this,
rightPrepared, leftPrepared, bindingSet, true
rightPrepared, leftPrepared, bindingSet, true, false
);
}
}
return bindingSet -> new InnerJoinIterator(AasEvaluationStrategy.this,
leftPrepared, rightPrepared, bindingSet, lateral
leftPrepared, rightPrepared, bindingSet, lateral, false
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
package io.github.linkedfactory.core.rdf4j.common.query;

import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.*;

import io.github.linkedfactory.core.rdf4j.kvin.query.KvinFetchEvaluationStep;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;

public class InnerJoinIterator extends LookAheadIteration<BindingSet, QueryEvaluationException> {

/*-----------*
* Variables *
*-----------*/

private static final BindingSet NULL_BINDINGS = new EmptyBindingSet();
private static final ExecutorService executor = Executors.newCachedThreadPool();
private static final ThreadLocal<Boolean> isAsync = new ThreadLocal<>();
private final EvaluationStrategy strategy;

private final CloseableIteration<BindingSet, QueryEvaluationException> leftIter;
private final QueryEvaluationStep preparedJoinArg;
private final List<BlockingQueue<BindingSet>> joined;
private volatile CloseableIteration<BindingSet, QueryEvaluationException> rightIter;

/*--------------*
* Constructors *
*--------------*/

public InnerJoinIterator(EvaluationStrategy strategy, QueryEvaluationStep leftPrepared,
QueryEvaluationStep rightPrepared, BindingSet bindings, boolean lateral) throws QueryEvaluationException {
QueryEvaluationStep rightPrepared, BindingSet bindings, boolean lateral, boolean async) throws QueryEvaluationException {
this.strategy = strategy;

CloseableIteration<BindingSet, QueryEvaluationException> leftIt = leftPrepared.evaluate(bindings);
Expand All @@ -42,6 +49,11 @@ public InnerJoinIterator(EvaluationStrategy strategy, QueryEvaluationStep leftPr
}
rightIter = new EmptyIteration<>();
leftIter = leftIt;

if (async && isAsync.get() == Boolean.TRUE) {
async = false;
}
joined = async ? new ArrayList<>() : null;
}

/*---------*
Expand All @@ -50,6 +62,14 @@ public InnerJoinIterator(EvaluationStrategy strategy, QueryEvaluationStep leftPr

@Override
protected BindingSet getNextElement() throws QueryEvaluationException {
if (joined == null) {
return getNextElementSync();
} else {
return getNextElementAsync();
}
}

protected BindingSet getNextElementSync() throws QueryEvaluationException {
try {
while (rightIter.hasNext() || leftIter.hasNext()) {
if (rightIter.hasNext()) {
Expand All @@ -71,6 +91,57 @@ protected BindingSet getNextElement() throws QueryEvaluationException {
return null;
}

protected BindingSet getNextElementAsync() throws QueryEvaluationException {
try {
while (!joined.isEmpty() || leftIter.hasNext()) {
enqueueNext();
if (!joined.isEmpty()) {
BlockingQueue<BindingSet> nextQueue = joined.get(0);
BindingSet next = nextQueue.take();
if (next == NULL_BINDINGS) {
joined.remove(0);
continue;
}
enqueueNext();
if (next != NULL_BINDINGS) {
return next;
}
}
}
} catch (NoSuchElementException ignore) {
// probably, one of the iterations has been closed concurrently in
// handleClose()
} catch (InterruptedException e) {
handleClose();
}

return null;
}

private void enqueueNext() {
while (joined.size() < 5 && leftIter.hasNext()) {
BlockingQueue<BindingSet> queue = new ArrayBlockingQueue<>(50);
joined.add(queue);
BindingSet next = leftIter.next();
executor.submit(() -> {
isAsync.set(true);
var rightIt = preparedJoinArg.evaluate(next);
try {
while (rightIt.hasNext()) {
queue.put(rightIt.next());
}
rightIt.close();
queue.put(NULL_BINDINGS);
} catch (InterruptedException e) {
rightIt.close();
return;
} finally {
isAsync.remove();
}
});
}
}

@Override
protected void handleClose() throws QueryEvaluationException {
super.handleClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@

import java.util.*;

import static io.github.linkedfactory.core.rdf4j.common.query.Helpers.compareAndBind;
import static io.github.linkedfactory.core.rdf4j.common.query.Helpers.findFirstFetch;
import static io.github.linkedfactory.core.rdf4j.common.query.Helpers.*;

public class KvinEvaluationStrategy extends StrictEvaluationStrategy {

Expand Down Expand Up @@ -213,12 +212,13 @@ protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context)
if (leftDependsOnRight) {
// swap left and right argument
return bindingSet -> new InnerJoinIterator(KvinEvaluationStrategy.this,
rightPrepared, leftPrepared, bindingSet, true
rightPrepared, leftPrepared, bindingSet, true, true
);
}
}
boolean async = findFirstFetch(join.getRightArg()) != null;
return bindingSet -> new InnerJoinIterator(KvinEvaluationStrategy.this,
leftPrepared, rightPrepared, bindingSet, lateral
leftPrepared, rightPrepared, bindingSet, lateral, async
);
}
}
Expand Down

0 comments on commit f5abaaa

Please sign in to comment.