Skip to content

Commit

Permalink
Add initial support for sorted merge joins.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jul 19, 2024
1 parent 7e02b55 commit b2602d5
Show file tree
Hide file tree
Showing 4 changed files with 568 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@
import io.github.linkedfactory.core.kvin.Record;
import io.github.linkedfactory.core.rdf4j.common.Conversions;
import io.github.linkedfactory.core.rdf4j.common.HasValue;
import io.github.linkedfactory.core.rdf4j.kvin.query.KvinFetch;
import io.github.linkedfactory.core.rdf4j.kvin.query.ParameterScanner;
import io.github.linkedfactory.core.rdf4j.kvin.query.*;
import io.github.linkedfactory.core.rdf4j.common.query.CompositeBindingSet;
import io.github.linkedfactory.core.rdf4j.common.query.InnerJoinIterator;
import io.github.linkedfactory.core.rdf4j.kvin.query.KvinFetchEvaluationStep;
import io.github.linkedfactory.core.rdf4j.kvin.query.Parameters;
import net.enilink.komma.rdf4j.RDF4JValueConverter;
import net.enilink.vocab.rdf.RDF;
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.SingletonIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
Expand All @@ -33,7 +32,10 @@

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.github.linkedfactory.core.rdf4j.common.query.Helpers.*;

Expand All @@ -55,7 +57,7 @@ public KvinEvaluationStrategy(Kvin kvin, Supplier<ExecutorService> executorServi

@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(StatementPattern stmt, final BindingSet bs)
throws QueryEvaluationException {
throws QueryEvaluationException {
// System.out.println("Stmt: " + stmt);

final Var subjectVar = stmt.getSubjectVar();
Expand Down Expand Up @@ -196,13 +198,68 @@ protected QueryEvaluationStep prepare(LeftJoin join, QueryEvaluationContext cont
}
}

static class LongArrayComparator implements Comparator<long[]> {
final int[] signs;

public LongArrayComparator(List<Integer> signs) {
this.signs = new int[signs.size()];
int i = 0;
for (Integer sign : signs) {
this.signs[i++] = sign;
}
}

@Override
public int compare(long[] a, long[] b) {
for (int i = 0; i < a.length; i++) {
if (i >= b.length) {
return 1;
}
int diff = signs[i] * Long.compare(a[i], b[i]);
if (diff != 0) {
return diff;
}
}
return 0;
}
}

@Override
protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context) throws QueryEvaluationException {
QueryEvaluationStep leftPrepared = precompile(join.getLeftArg(), context);
QueryEvaluationStep rightPrepared = precompile(join.getRightArg(), context);
if (useHashJoin(join.getLeftArg(), join.getRightArg())) {
String[] joinAttributes = HashJoinIteration.hashJoinAttributeNames(join);
return bindingSet -> new HashJoinIteration(leftPrepared, rightPrepared, bindingSet, false, joinAttributes, context);
Set<String> joinAttributesSet = Stream.of(joinAttributes).collect(Collectors.toSet());

KvinFetch fetch = (KvinFetch) findFirstFetch(join.getLeftArg());
Parameters params = fetch.getParams();
final List<String> compareParams = new ArrayList<>();
final List<Integer> compareSigns = new ArrayList<>();
Var[] sortParams = new Var[]{params.index, params.time, params.seqNr};
for (int i = 0; i < sortParams.length; i++) {
var v = sortParams[i];
if (v != null && joinAttributesSet.contains(v.getName())) {
compareParams.add(v.getName());
// index is ascending, time and seqNr are descending
compareSigns.add(i == 0 ? 1 : -1);
}
}

if (!compareParams.isEmpty()) {
return bindingSet -> InnerMergeJoinIterator.getInstance(leftPrepared, rightPrepared, bindingSet,
new LongArrayComparator(compareSigns),
bs -> {
long[] values = new long[compareParams.size()];
int i = 0;
for (String name : compareParams) {
values[i++] =((Literal)bs.getValue(name)).longValue();
}
return values;
}, context);
} else {
return bindingSet -> new HashJoinIteration(leftPrepared, rightPrepared, bindingSet, false, joinAttributes, context);
}
} else {
// strictly use lateral joins if left arg contains a KVIN fetch as right arg probably depends on the results
KvinFetch fetch = (KvinFetch) findFirstFetch(join.getLeftArg());
Expand All @@ -212,17 +269,17 @@ protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context)
// switch join order if left depends on right
Set<String> assured = join.getRightArg().getAssuredBindingNames();
boolean leftDependsOnRight = fetch.getRequiredBindings().stream()
.anyMatch(name -> assured.contains(name));
.anyMatch(name -> assured.contains(name));
if (leftDependsOnRight) {
// swap left and right argument
return bindingSet -> new InnerJoinIterator(KvinEvaluationStrategy.this, executorService,
rightPrepared, leftPrepared, bindingSet, true, true
rightPrepared, leftPrepared, bindingSet, true, true
);
}
}
boolean async = findFirstFetch(join.getRightArg()) != null;
return bindingSet -> new InnerJoinIterator(KvinEvaluationStrategy.this, executorService,
leftPrepared, rightPrepared, bindingSet, lateral, async
leftPrepared, rightPrepared, bindingSet, lateral, async
);
}
}
Expand Down Expand Up @@ -259,7 +316,7 @@ public QueryEvaluationStep precompile(TupleExpr expr, QueryEvaluationContext con
}

public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(TupleExpr expr, BindingSet bindings)
throws QueryEvaluationException {
throws QueryEvaluationException {
if (expr instanceof KvinFetch) {
QueryEvaluationContext context = new Minimal(this.dataset, this.tripleSource.getValueFactory());
return precompile(expr, context).evaluate(bindings);
Expand Down
Loading

0 comments on commit b2602d5

Please sign in to comment.