Skip to content

Commit

Permalink
Correctly handle bindings and improve exception handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Sep 6, 2024
1 parent 5a28c88 commit 38e02cb
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -11,6 +13,8 @@
import java.util.function.Supplier;

public class AsyncIterator<T> implements CloseableIteration<T, QueryEvaluationException> {
static final Logger log = LoggerFactory.getLogger(AsyncIterator.class);

static final Object NULL_ELEMENT = new Object();
final BlockingQueue<T> nextElements;
volatile boolean closed = false;
Expand All @@ -21,27 +25,36 @@ public AsyncIterator(Supplier<CloseableIteration<T, QueryEvaluationException>> b
var currentAsync = InnerJoinIterator.asyncDepth.get();
executorService.get().submit(() -> {
InnerJoinIterator.asyncDepth.set(currentAsync != null ? currentAsync + 1 : 1);
var baseIt = base.get();
try {
while (baseIt.hasNext()) {
T element = baseIt.next();
while (!nextElements.offer(element, 10, TimeUnit.MILLISECONDS)) {
if (closed) {
return;
var baseIt = base.get();
try {
while (baseIt.hasNext()) {
T element = baseIt.next();
while (!nextElements.offer(element, 10, TimeUnit.MILLISECONDS)) {
if (closed) {
return;
}
}
}
} catch (InterruptedException e) {
// just return
} finally {
baseIt.close();
}
baseIt.close();
while (!nextElements.offer((T) NULL_ELEMENT, 10, TimeUnit.MILLISECONDS)) {
if (closed) {
return;
} catch (Exception e) {
log.error("Error while computing elements", e);
} finally {
try {
while (!nextElements.offer((T) NULL_ELEMENT, 10, TimeUnit.MILLISECONDS)) {
if (closed) {
return;
}
}
} catch (InterruptedException e) {
// just return
} finally {
InnerJoinIterator.asyncDepth.remove();
}
} catch (InterruptedException e) {
// just return
} finally {
baseIt.close();
InnerJoinIterator.asyncDepth.remove();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ public class CompositeBindingSet extends AbstractBindingSet {
private final LinkedHashMap<String, Value> bindings;

public CompositeBindingSet(BindingSet other) {
this(other, 2);
}

public CompositeBindingSet(BindingSet other, int capacity) {
if (other instanceof CompositeBindingSet) {
// this ensures that not multiple levels are nested and some kind of linked list is build
this.bindings = (LinkedHashMap<String, Value>) ((CompositeBindingSet) other).bindings.clone();
this.other = ((CompositeBindingSet) other).other;
} else {
this.bindings = new LinkedHashMap<>(3);
this.bindings = new LinkedHashMap<>(capacity);
this.other = other;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
package io.github.linkedfactory.core.rdf4j.common.query;

import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.*;
import java.util.function.Supplier;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.In;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class InnerJoinIterator extends LookAheadIteration<BindingSet, QueryEvaluationException> {
private static final Logger log = LoggerFactory.getLogger(InnerJoinIterator.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public boolean hasNext() throws QueryEvaluationException {

var itemValue = getVarValue(subjectVar, bs);
if (itemValue == null) {
it = new EmptyIteration<>();
continue;
}

final Value predValue = getVarValue(predVar, bs);
Expand Down Expand Up @@ -358,8 +358,9 @@ public boolean hasNext() throws QueryEvaluationException {
it = evaluate(vf, items, properties,
toKommaUri(contextValue), pv, params, baseBindings, stmt, dataset);
} else {
final ParameterValues finalPv = pv;
iterators.add(new AsyncIterator<>(() -> evaluate(vf, items, properties,
toKommaUri(contextValue), pv, params, baseBindings, stmt, dataset),
toKommaUri(contextValue), finalPv, params, baseBindings, stmt, dataset),
executorService));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.eclipse.rdf4j.query.algebra.Service;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryOptimizer;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService;
import org.eclipse.rdf4j.query.algebra.evaluation.util.QueryEvaluationUtil;
Expand Down Expand Up @@ -159,7 +160,10 @@ public boolean hasNext() throws QueryEvaluationException {

@Override
public BindingSet next() throws QueryEvaluationException {
CompositeBindingSet projected = new CompositeBindingSet(bindings);
// it is important to keep incoming bindings as KvinService expects subject variables to be bound
// if only projected vars are returned, then nested joins of multiple service calls might fail
// as bindings are "lost"
CompositeBindingSet projected = new CompositeBindingSet(bindings, projectionVars.size());
BindingSet result = iter.next();
for (String var : projectionVars) {
Value v = result.getValue(var);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class KvinBenchmarkBase extends App {
(0 to writeValues).foreach { i =>
val randomNr = nrs(rand.nextInt(nrs.length))
val uri = URIs.createURI("http://linkedfactory.github.io/" + randomNr + "/e3fabrik/rollex/" + randomNr + "/measured-point-1")
val ctx = URIs.createURI("ctx:" + randomNr)
val ctx = URIs.createURI("ctx:" + nrs(rand.nextInt(nrs.length)))

val value = if (randomNr % 2 == 0) rand.nextGaussian else rand.nextLong(100000)
batch.addOne(new KvinTuple(uri, valueProperty, ctx, currentTime, value))
Expand All @@ -70,7 +70,7 @@ abstract class KvinBenchmarkBase extends App {
for (i <- 0 to readValues) {
val randomNr = nrs(rand.nextInt(nrs.length))
val uri = URIs.createURI("http://linkedfactory.github.io/" + randomNr + "/e3fabrik/rollex/" + randomNr + "/measured-point-1")
val ctx = URIs.createURI("ctx:" + randomNr)
val ctx = URIs.createURI("ctx:" + nrs(rand.nextInt(nrs.length)))

val r = store.fetch(uri, valueProperty, ctx, startTimeFetch, startTimeValues, 2, 0, null).toList
if (i % 1000 == 0) println(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.github.linkedfactory.core.kvin.Kvin;
import io.github.linkedfactory.core.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.Record;
import io.github.linkedfactory.core.kvin.util.KvinTupleGenerator;
import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.komma.core.URI;
Expand All @@ -20,14 +21,15 @@ public class KvinParquetTest {
KvinParquet kvinParquet;
File tempDir;
KvinTupleGenerator tupleGenerator;
final long startTime = 1696197600000L;

@Before
public void setup() throws IOException {
tempDir = Files.createTempDirectory("archive").toFile();
tupleGenerator = new KvinTupleGenerator();
kvinParquet = new KvinParquet(tempDir.toString());
// 02.10.2023 0:00
kvinParquet.put(tupleGenerator.setStartTime(1696197600000L)
kvinParquet.put(tupleGenerator.setStartTime(startTime)
.setItems(500)
.setPropertiesPerItem(10)
.setValuesPerProperty(10)
Expand Down Expand Up @@ -145,4 +147,20 @@ public void shouldFetchProperties() {
assertEquals(10, properties.toList().size());
properties.close();
}

@Test
public void shouldFetchRecord() {
URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/1");
URI property = URIs.createURI("some:property");

var record = new Record(URIs.createURI("property:p1"), true)
.append(new Record(URIs.createURI("property:p2"), "value2"));
kvinParquet.put(new KvinTuple(item, property, Kvin.DEFAULT_CONTEXT, startTime, record));

IExtendedIterator<KvinTuple> tuples = kvinParquet.fetch(item, property, Kvin.DEFAULT_CONTEXT, 1);
List<KvinTuple> list = tuples.toList();
assertEquals(1, list.size());
assertEquals(record, list.get(0).value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ class RemoteServiceTest {
val queryStr =
s"""
|SELECT * {
| values ?item { <item1> }
| values ?item1 { <item1> }
| SERVICE <kvin:http://test1.com> {
| ?item <p1> [ <kvin:to> $time ; <kvin:limit> 1 ;
| ?item1 <p1> [ <kvin:to> $time ; <kvin:limit> 1 ;
| <kvin:value> ?value ; <kvin:time> ?time
| ]
| }
| SERVICE <kvin:http://test2.com> {
| ?item <p2> [ <kvin:to> $time ; <kvin:limit> 1 ;
| <item2> <p2> [ <kvin:to> $time ; <kvin:limit> 1 ;
| <kvin:value> ?value1; <kvin:time> ?time1
| ]
| }
Expand All @@ -192,11 +192,13 @@ class RemoteServiceTest {
val query = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryStr, "http://example.org/")
val r = query.evaluate

Assert.assertTrue(r.hasNext)
while (r.hasNext) {
val bs = r.next
Assert.assertTrue(bs.getValue("property").toString.equals("http://example.org/p1"))
Assert.assertTrue(bs.getValue("time").stringValue().equals("1619424246120"))
Assert.assertTrue(bs.getValue("value").stringValue().equals("57.934878949512196"))
if (bs.getValue("time1") != null) {
Assert.assertTrue(bs.getValue("time").stringValue().equals("1619424246120"))
Assert.assertTrue(bs.getValue("value").stringValue().equals("57.934878949512196"))
}
}
Assert.assertTrue(kvinHttpInstance.count == 2)
Assert.assertTrue(kvinHttpInstance.fetchCall == 2)
Expand Down Expand Up @@ -236,12 +238,12 @@ class RemoteServiceTest {
val query = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryStr, "http://example.org/")
val r = query.evaluate

Assert.assertTrue(r.hasNext)
while (r.hasNext) {
val bs = r.next
Assert.assertEquals(115.869757899024392, bs.getValue("total").asInstanceOf[Literal].doubleValue(), 10e-6)
}
Assert.assertTrue(kvinHttpInstance.count == 2)
Assert.assertTrue(kvinHttpInstance.fetchCall == 4)
r.close()
} finally {
conn.close()
Expand Down

0 comments on commit 38e02cb

Please sign in to comment.