From b2602d5fc0eff04a3d423da50e1f8059fddb58de Mon Sep 17 00:00:00 2001 From: Ken Wenzel Date: Fri, 19 Jul 2024 14:34:38 +0200 Subject: [PATCH] Add initial support for sorted merge joins. --- .../rdf4j/kvin/KvinEvaluationStrategy.java | 77 ++++- .../kvin/query/InnerMergeJoinIterator.java | 286 ++++++++++++++++++ .../core/rdf4j/kvin/query/KvinFetch.java | 4 + .../rdf4j/kvin/query/PeekMarkIterator.java | 211 +++++++++++++ 4 files changed, 568 insertions(+), 10 deletions(-) create mode 100644 bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/InnerMergeJoinIterator.java create mode 100644 bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/PeekMarkIterator.java diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationStrategy.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationStrategy.java index 3b226b28..cc9d0277 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationStrategy.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationStrategy.java @@ -5,18 +5,17 @@ import io.github.linkedfactory.core.kvin.Record; import io.github.linkedfactory.core.rdf4j.common.Conversions; import io.github.linkedfactory.core.rdf4j.common.HasValue; -import io.github.linkedfactory.core.rdf4j.kvin.query.KvinFetch; -import io.github.linkedfactory.core.rdf4j.kvin.query.ParameterScanner; +import io.github.linkedfactory.core.rdf4j.kvin.query.*; import io.github.linkedfactory.core.rdf4j.common.query.CompositeBindingSet; import io.github.linkedfactory.core.rdf4j.common.query.InnerJoinIterator; -import io.github.linkedfactory.core.rdf4j.kvin.query.KvinFetchEvaluationStep; -import io.github.linkedfactory.core.rdf4j.kvin.query.Parameters; +import net.enilink.komma.rdf4j.RDF4JValueConverter; import net.enilink.vocab.rdf.RDF; import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.common.iteration.SingletonIteration; import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Literal; import org.eclipse.rdf4j.model.Value; import org.eclipse.rdf4j.model.ValueFactory; import org.eclipse.rdf4j.query.BindingSet; @@ -33,7 +32,10 @@ import java.util.*; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static io.github.linkedfactory.core.rdf4j.common.query.Helpers.*; @@ -55,7 +57,7 @@ public KvinEvaluationStrategy(Kvin kvin, Supplier executorServi @Override public CloseableIteration evaluate(StatementPattern stmt, final BindingSet bs) - throws QueryEvaluationException { + throws QueryEvaluationException { // System.out.println("Stmt: " + stmt); final Var subjectVar = stmt.getSubjectVar(); @@ -196,13 +198,68 @@ protected QueryEvaluationStep prepare(LeftJoin join, QueryEvaluationContext cont } } + static class LongArrayComparator implements Comparator { + final int[] signs; + + public LongArrayComparator(List signs) { + this.signs = new int[signs.size()]; + int i = 0; + for (Integer sign : signs) { + this.signs[i++] = sign; + } + } + + @Override + public int compare(long[] a, long[] b) { + for (int i = 0; i < a.length; i++) { + if (i >= b.length) { + return 1; + } + int diff = signs[i] * Long.compare(a[i], b[i]); + if (diff != 0) { + return diff; + } + } + return 0; + } + } + @Override protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context) throws QueryEvaluationException { QueryEvaluationStep leftPrepared = precompile(join.getLeftArg(), context); QueryEvaluationStep rightPrepared = precompile(join.getRightArg(), context); if (useHashJoin(join.getLeftArg(), join.getRightArg())) { String[] joinAttributes = HashJoinIteration.hashJoinAttributeNames(join); - return bindingSet -> new HashJoinIteration(leftPrepared, rightPrepared, bindingSet, false, joinAttributes, context); + Set joinAttributesSet = Stream.of(joinAttributes).collect(Collectors.toSet()); + + KvinFetch fetch = (KvinFetch) findFirstFetch(join.getLeftArg()); + Parameters params = fetch.getParams(); + final List compareParams = new ArrayList<>(); + final List compareSigns = new ArrayList<>(); + Var[] sortParams = new Var[]{params.index, params.time, params.seqNr}; + for (int i = 0; i < sortParams.length; i++) { + var v = sortParams[i]; + if (v != null && joinAttributesSet.contains(v.getName())) { + compareParams.add(v.getName()); + // index is ascending, time and seqNr are descending + compareSigns.add(i == 0 ? 1 : -1); + } + } + + if (!compareParams.isEmpty()) { + return bindingSet -> InnerMergeJoinIterator.getInstance(leftPrepared, rightPrepared, bindingSet, + new LongArrayComparator(compareSigns), + bs -> { + long[] values = new long[compareParams.size()]; + int i = 0; + for (String name : compareParams) { + values[i++] =((Literal)bs.getValue(name)).longValue(); + } + return values; + }, context); + } else { + return bindingSet -> new HashJoinIteration(leftPrepared, rightPrepared, bindingSet, false, joinAttributes, context); + } } else { // strictly use lateral joins if left arg contains a KVIN fetch as right arg probably depends on the results KvinFetch fetch = (KvinFetch) findFirstFetch(join.getLeftArg()); @@ -212,17 +269,17 @@ protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context) // switch join order if left depends on right Set assured = join.getRightArg().getAssuredBindingNames(); boolean leftDependsOnRight = fetch.getRequiredBindings().stream() - .anyMatch(name -> assured.contains(name)); + .anyMatch(name -> assured.contains(name)); if (leftDependsOnRight) { // swap left and right argument return bindingSet -> new InnerJoinIterator(KvinEvaluationStrategy.this, executorService, - rightPrepared, leftPrepared, bindingSet, true, true + rightPrepared, leftPrepared, bindingSet, true, true ); } } boolean async = findFirstFetch(join.getRightArg()) != null; return bindingSet -> new InnerJoinIterator(KvinEvaluationStrategy.this, executorService, - leftPrepared, rightPrepared, bindingSet, lateral, async + leftPrepared, rightPrepared, bindingSet, lateral, async ); } } @@ -259,7 +316,7 @@ public QueryEvaluationStep precompile(TupleExpr expr, QueryEvaluationContext con } public CloseableIteration evaluate(TupleExpr expr, BindingSet bindings) - throws QueryEvaluationException { + throws QueryEvaluationException { if (expr instanceof KvinFetch) { QueryEvaluationContext context = new Minimal(this.dataset, this.tripleSource.getValueFactory()); return precompile(expr, context).evaluate(bindings); diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/InnerMergeJoinIterator.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/InnerMergeJoinIterator.java new file mode 100644 index 00000000..38178a75 --- /dev/null +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/InnerMergeJoinIterator.java @@ -0,0 +1,286 @@ +/******************************************************************************* + * Copyright (c) 2023 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + ******************************************************************************/ + +package io.github.linkedfactory.core.rdf4j.kvin.query; + +import java.util.Comparator; +import java.util.NoSuchElementException; +import java.util.function.Function; + +import org.eclipse.rdf4j.common.annotation.Experimental; +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.query.Binding; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.MutableBindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep; +import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext; + +/** + * @author Håvard M. Ottestad + */ +@Experimental +public class InnerMergeJoinIterator implements CloseableIteration { + + private final PeekMarkIterator leftIterator; + private final PeekMarkIterator rightIterator; + private final Comparator cmp; + private final Function valueFunction; + private final QueryEvaluationContext context; + + private BindingSet next; + private BindingSet currentLeft; + private V currentLeftValue; + private V leftPeekValue; + + // -1 for unset, 0 for equal, 1 for different + int currentLeftValueAndPeekEquals = -1; + + private boolean closed = false; + + InnerMergeJoinIterator(CloseableIteration leftIterator, CloseableIteration rightIterator, + Comparator cmp, Function valueFunction, QueryEvaluationContext context) + throws QueryEvaluationException { + + this.leftIterator = new PeekMarkIterator<>(leftIterator); + this.rightIterator = new PeekMarkIterator<>(rightIterator); + this.cmp = cmp; + this.valueFunction = valueFunction; + this.context = context; + } + + public static CloseableIteration getInstance(QueryEvaluationStep leftPrepared, + QueryEvaluationStep preparedRight, BindingSet bindings, Comparator cmp, + Function value, QueryEvaluationContext context) { + + CloseableIteration leftIter = leftPrepared.evaluate(bindings); + if (leftIter == QueryEvaluationStep.EMPTY_ITERATION) { + return leftIter; + } + + CloseableIteration rightIter = preparedRight.evaluate(bindings); + if (rightIter == QueryEvaluationStep.EMPTY_ITERATION) { + leftIter.close(); + return rightIter; + } + + return new InnerMergeJoinIterator(leftIter, rightIter, cmp, value, context); + } + + private BindingSet join(BindingSet left, BindingSet right, boolean createNewBindingSet) { + MutableBindingSet joined; + if (!createNewBindingSet && left instanceof MutableBindingSet) { + joined = (MutableBindingSet) left; + } else { + joined = context.createBindingSet(left); + } + + for (Binding binding : right) { + if (!joined.hasBinding(binding.getName())) { + joined.addBinding(binding); + } + } + return joined; + } + + private void calculateNext() { + if (next != null) { + return; + } + + if (currentLeft == null && leftIterator.hasNext()) { + currentLeft = leftIterator.next(); + currentLeftValue = null; + leftPeekValue = null; + currentLeftValueAndPeekEquals = -1; + + } + + if (currentLeft == null) { + return; + } + + loop(); + + } + + private void loop() { + while (next == null) { + if (rightIterator.hasNext()) { + BindingSet peekRight = rightIterator.peek(); + + if (currentLeftValue == null) { + currentLeftValue = valueFunction.apply(currentLeft); + leftPeekValue = null; + currentLeftValueAndPeekEquals = -1; + } + + int compare = compare(currentLeftValue, valueFunction.apply(peekRight)); + + if (compare == 0) { + equal(); + return; + } else if (compare < 0) { + // leftIterator is behind, or in other words, rightIterator is ahead + if (leftIterator.hasNext()) { + lessThan(); + } else { + close(); + return; + } + } else { + // rightIterator is behind, skip forward + rightIterator.next(); + + } + + } else if (rightIterator.isResettable() && leftIterator.hasNext()) { + rightIterator.reset(); + currentLeft = leftIterator.next(); + currentLeftValue = null; + leftPeekValue = null; + currentLeftValueAndPeekEquals = -1; + } else { + close(); + return; + } + + } + } + + private int compare(V left, V right) { + int compareTo; + + if (left == right) { + compareTo = 0; + } else { + compareTo = cmp.compare(left, right); + } + return compareTo; + } + + private void lessThan() { + V oldLeftValue = currentLeftValue; + currentLeft = leftIterator.next(); + if (leftPeekValue != null) { + currentLeftValue = leftPeekValue; + } else { + currentLeftValue = valueFunction.apply(currentLeft); + } + + leftPeekValue = null; + currentLeftValueAndPeekEquals = -1; + + if (oldLeftValue.equals(currentLeftValue)) { + // we have duplicate keys on the leftIterator and need to reset the rightIterator (if it + // is resettable) + if (rightIterator.isResettable()) { + rightIterator.reset(); + } + } else { + rightIterator.unmark(); + } + } + + private void equal() { + if (rightIterator.isResettable()) { + next = join(currentLeft, rightIterator.next(), true); + } else { + doLeftPeek(); + + if (currentLeftValueAndPeekEquals == 0) { + rightIterator.mark(); + next = join(currentLeft, rightIterator.next(), true); + } else { + next = join(rightIterator.next(), currentLeft, false); + } + } + } + + private void doLeftPeek() { + if (leftPeekValue == null) { + BindingSet leftPeek = leftIterator.peek(); + leftPeekValue = leftPeek != null ? valueFunction.apply(leftPeek) : null; + currentLeftValueAndPeekEquals = -1; + } + + if (currentLeftValueAndPeekEquals == -1) { + boolean equals = currentLeftValue.equals(leftPeekValue); + if (equals) { + currentLeftValue = leftPeekValue; + currentLeftValueAndPeekEquals = 0; + } else { + currentLeftValueAndPeekEquals = 1; + } + } + } + + @Override + public final boolean hasNext() { + if (isClosed()) { + return false; + } + + calculateNext(); + + return next != null; + } + + @Override + public final BindingSet next() { + if (isClosed()) { + throw new NoSuchElementException("The iteration has been closed."); + } + calculateNext(); + + if (next == null) { + close(); + } + + BindingSet result = next; + + if (result != null) { + next = null; + return result; + } else { + throw new NoSuchElementException(); + } + } + + /** + * Throws an {@link UnsupportedOperationException}. + */ + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Checks whether this CloseableIteration has been closed. + * + * @return true if the CloseableIteration has been closed, false otherwise. + */ + public final boolean isClosed() { + return closed; + } + + @Override + public final void close() { + if (!closed) { + closed = true; + try { + leftIterator.close(); + } finally { + rightIterator.close(); + } + } + } +} \ No newline at end of file diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/KvinFetch.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/KvinFetch.java index 37215dcf..633e4397 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/KvinFetch.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/KvinFetch.java @@ -58,6 +58,10 @@ public Set getAssuredBindingNames() { return assuredBindingNames; } + public Parameters getParams() { + return params; + } + public Set getRequiredBindings() { return requiredBindings; } diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/PeekMarkIterator.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/PeekMarkIterator.java new file mode 100644 index 00000000..34fa982c --- /dev/null +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/query/PeekMarkIterator.java @@ -0,0 +1,211 @@ +/******************************************************************************* + * Copyright (c) 2023 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + ******************************************************************************/ + +package io.github.linkedfactory.core.rdf4j.kvin.query; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.eclipse.rdf4j.common.annotation.Experimental; +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.query.QueryEvaluationException; + +/** + * An iterator that allows to peek at the next element without consuming it. It also allows to mark the current position + * and reset to that position. + * + * @author Håvard M. Ottestad + */ +@Experimental +public class PeekMarkIterator implements CloseableIteration { + + private final CloseableIteration iterator; + private boolean mark; + private ArrayList buffer; + private Iterator bufferIterator = Collections.emptyIterator(); + private E next; + + // -1: reset not possible; 0: reset possible, but next must be saved; 1: reset possible + private int resetPossible; + + private boolean closed; + + PeekMarkIterator(CloseableIteration iterator) { + this.iterator = iterator; + } + + private void calculateNext() { + if (next != null) { + return; + } + + if (bufferIterator.hasNext()) { + next = bufferIterator.next(); + } else { + if (!mark && resetPossible > -1) { + resetPossible--; + } + if (iterator.hasNext()) { + next = iterator.next(); + } + } + + if (mark && next != null) { + assert resetPossible > 0; + buffer.add(next); + } + + } + + @Override + public boolean hasNext() { + if (closed) { + return false; + } + calculateNext(); + return next != null; + } + + @Override + public E next() { + if (closed) { + throw new NoSuchElementException("The iteration has been closed."); + } + calculateNext(); + E result = next; + next = null; + if (!mark && resetPossible == 0) { + resetPossible--; + } + if (result == null) { + throw new NoSuchElementException(); + } + + return result; + } + + /** + * @return the next element without consuming it, or null if there are no more elements + */ + public E peek() { + if (closed) { + return null; + } + calculateNext(); + return next; + } + + /** + * Mark the current position so that the iterator can be reset to the current state. This will cause elements to be + * stored in memory until one of {@link #reset()}, {@link #unmark()} or {@link #mark()} is called. + */ + public void mark() { + if (closed) { + throw new IllegalStateException("The iteration has been closed."); + } + mark = true; + resetPossible = 1; + + if (buffer != null && !bufferIterator.hasNext()) { + buffer.clear(); + bufferIterator = Collections.emptyIterator(); + } else { + buffer = new ArrayList<>(); + } + + if (next != null) { + buffer.add(next); + } + + } + + /** + * Reset the iterator to the marked position. Resetting an iterator multiple times will always reset to the same + * position. Resetting an iterator turns off marking. If the iterator was reset previously and the iterator has + * advanced beyond the point where reset was initially called, then the iterator can no longer be reset because + * there will be elements that were not stored while the iterator was marked and resetting will cause these elements + * to be lost. + */ + public void reset() { + if (closed) { + throw new IllegalStateException("The iteration has been closed."); + } + if (buffer == null) { + throw new IllegalStateException("Mark never set"); + } + if (resetPossible < 0) { + throw new IllegalStateException("Reset not possible"); + } + + if (mark && bufferIterator.hasNext()) { + while (bufferIterator.hasNext()) { + buffer.add(bufferIterator.next()); + } + } + + if (resetPossible == 0) { + assert !mark; + buffer.add(next); + next = null; + bufferIterator = buffer.iterator(); + } else if (resetPossible > 0) { + next = null; + bufferIterator = buffer.iterator(); + } + + mark = false; + resetPossible = 1; + } + + /** + * @return true if the iterator is marked + */ + boolean isMarked() { + return !closed && mark; + } + + /** + * @return true if {@link #reset()} can be called on this iterator + */ + boolean isResettable() { + return !closed && (mark || resetPossible >= 0); + } + + @Override + public void close() { + this.closed = true; + iterator.close(); + buffer = null; + } + + /** + * Unmark the iterator. This will cause the iterator to stop buffering elements. If the iterator was recently reset + * and there are still elements in the buffer, then these elements will still be returned by next(). + */ + public void unmark() { + mark = false; + resetPossible = -1; + if (bufferIterator.hasNext()) { + buffer = null; + } else if (buffer != null) { + + buffer.clear(); + bufferIterator = Collections.emptyIterator(); + } + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException("Not implemented"); + } +} \ No newline at end of file