From 38e02cb0eed2c5a7d3f14b508caca3aaa704f4af Mon Sep 17 00:00:00 2001 From: Ken Wenzel Date: Fri, 6 Sep 2024 10:23:55 +0200 Subject: [PATCH] Correctly handle bindings and improve exception handling. --- .../rdf4j/common/query/AsyncIterator.java | 43 ++++++++++++------- .../common/query/CompositeBindingSet.java | 6 ++- .../rdf4j/common/query/InnerJoinIterator.java | 16 ++++--- .../core/rdf4j/kvin/KvinEvaluationUtil.java | 5 ++- .../core/rdf4j/kvin/KvinFederatedService.java | 6 ++- .../core/kvin/leveldb/KvinBenchmarkBase.scala | 4 +- .../core/kvin/parquet/KvinParquetTest.java | 20 ++++++++- .../core/rdf4j/kvin/RemoteServiceTest.scala | 16 ++++--- 8 files changed, 80 insertions(+), 36 deletions(-) diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/AsyncIterator.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/AsyncIterator.java index f3a23157..b21ac860 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/AsyncIterator.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/AsyncIterator.java @@ -2,6 +2,8 @@ import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; @@ -11,6 +13,8 @@ import java.util.function.Supplier; public class AsyncIterator implements CloseableIteration { + static final Logger log = LoggerFactory.getLogger(AsyncIterator.class); + static final Object NULL_ELEMENT = new Object(); final BlockingQueue nextElements; volatile boolean closed = false; @@ -21,27 +25,36 @@ public AsyncIterator(Supplier> b var currentAsync = InnerJoinIterator.asyncDepth.get(); executorService.get().submit(() -> { InnerJoinIterator.asyncDepth.set(currentAsync != null ? currentAsync + 1 : 1); - var baseIt = base.get(); try { - while (baseIt.hasNext()) { - T element = baseIt.next(); - while (!nextElements.offer(element, 10, TimeUnit.MILLISECONDS)) { - if (closed) { - return; + var baseIt = base.get(); + try { + while (baseIt.hasNext()) { + T element = baseIt.next(); + while (!nextElements.offer(element, 10, TimeUnit.MILLISECONDS)) { + if (closed) { + return; + } } } + } catch (InterruptedException e) { + // just return + } finally { + baseIt.close(); } - baseIt.close(); - while (!nextElements.offer((T) NULL_ELEMENT, 10, TimeUnit.MILLISECONDS)) { - if (closed) { - return; + } catch (Exception e) { + log.error("Error while computing elements", e); + } finally { + try { + while (!nextElements.offer((T) NULL_ELEMENT, 10, TimeUnit.MILLISECONDS)) { + if (closed) { + return; + } } + } catch (InterruptedException e) { + // just return + } finally { + InnerJoinIterator.asyncDepth.remove(); } - } catch (InterruptedException e) { - // just return - } finally { - baseIt.close(); - InnerJoinIterator.asyncDepth.remove(); } }); } diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/CompositeBindingSet.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/CompositeBindingSet.java index e4b892d7..1b24a328 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/CompositeBindingSet.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/CompositeBindingSet.java @@ -26,12 +26,16 @@ public class CompositeBindingSet extends AbstractBindingSet { private final LinkedHashMap bindings; public CompositeBindingSet(BindingSet other) { + this(other, 2); + } + + public CompositeBindingSet(BindingSet other, int capacity) { if (other instanceof CompositeBindingSet) { // this ensures that not multiple levels are nested and some kind of linked list is build this.bindings = (LinkedHashMap) ((CompositeBindingSet) other).bindings.clone(); this.other = ((CompositeBindingSet) other).other; } else { - this.bindings = new LinkedHashMap<>(3); + this.bindings = new LinkedHashMap<>(capacity); this.other = other; } } diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/InnerJoinIterator.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/InnerJoinIterator.java index 89f3fe18..f57c9d37 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/InnerJoinIterator.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/common/query/InnerJoinIterator.java @@ -1,23 +1,25 @@ package io.github.linkedfactory.core.rdf4j.common.query; -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.concurrent.*; -import java.util.function.Supplier; - import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.common.iteration.LookAheadIteration; import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.QueryEvaluationException; -import org.eclipse.rdf4j.query.algebra.In; import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy; import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep; import org.eclipse.rdf4j.query.impl.EmptyBindingSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + public class InnerJoinIterator extends LookAheadIteration { private static final Logger log = LoggerFactory.getLogger(InnerJoinIterator.class); diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationUtil.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationUtil.java index 1fa27e42..09b6413f 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationUtil.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationUtil.java @@ -303,7 +303,7 @@ public boolean hasNext() throws QueryEvaluationException { var itemValue = getVarValue(subjectVar, bs); if (itemValue == null) { - it = new EmptyIteration<>(); + continue; } final Value predValue = getVarValue(predVar, bs); @@ -358,8 +358,9 @@ public boolean hasNext() throws QueryEvaluationException { it = evaluate(vf, items, properties, toKommaUri(contextValue), pv, params, baseBindings, stmt, dataset); } else { + final ParameterValues finalPv = pv; iterators.add(new AsyncIterator<>(() -> evaluate(vf, items, properties, - toKommaUri(contextValue), pv, params, baseBindings, stmt, dataset), + toKommaUri(contextValue), finalPv, params, baseBindings, stmt, dataset), executorService)); } } diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinFederatedService.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinFederatedService.java index 0fa31b98..0664761b 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinFederatedService.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinFederatedService.java @@ -15,6 +15,7 @@ import org.eclipse.rdf4j.query.algebra.Service; import org.eclipse.rdf4j.query.algebra.TupleExpr; import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy; +import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet; import org.eclipse.rdf4j.query.algebra.evaluation.QueryOptimizer; import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService; import org.eclipse.rdf4j.query.algebra.evaluation.util.QueryEvaluationUtil; @@ -159,7 +160,10 @@ public boolean hasNext() throws QueryEvaluationException { @Override public BindingSet next() throws QueryEvaluationException { - CompositeBindingSet projected = new CompositeBindingSet(bindings); + // it is important to keep incoming bindings as KvinService expects subject variables to be bound + // if only projected vars are returned, then nested joins of multiple service calls might fail + // as bindings are "lost" + CompositeBindingSet projected = new CompositeBindingSet(bindings, projectionVars.size()); BindingSet result = iter.next(); for (String var : projectionVars) { Value v = result.getValue(var); diff --git a/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinBenchmarkBase.scala b/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinBenchmarkBase.scala index 58d2845c..c6be8b66 100644 --- a/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinBenchmarkBase.scala +++ b/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinBenchmarkBase.scala @@ -46,7 +46,7 @@ abstract class KvinBenchmarkBase extends App { (0 to writeValues).foreach { i => val randomNr = nrs(rand.nextInt(nrs.length)) val uri = URIs.createURI("http://linkedfactory.github.io/" + randomNr + "/e3fabrik/rollex/" + randomNr + "/measured-point-1") - val ctx = URIs.createURI("ctx:" + randomNr) + val ctx = URIs.createURI("ctx:" + nrs(rand.nextInt(nrs.length))) val value = if (randomNr % 2 == 0) rand.nextGaussian else rand.nextLong(100000) batch.addOne(new KvinTuple(uri, valueProperty, ctx, currentTime, value)) @@ -70,7 +70,7 @@ abstract class KvinBenchmarkBase extends App { for (i <- 0 to readValues) { val randomNr = nrs(rand.nextInt(nrs.length)) val uri = URIs.createURI("http://linkedfactory.github.io/" + randomNr + "/e3fabrik/rollex/" + randomNr + "/measured-point-1") - val ctx = URIs.createURI("ctx:" + randomNr) + val ctx = URIs.createURI("ctx:" + nrs(rand.nextInt(nrs.length))) val r = store.fetch(uri, valueProperty, ctx, startTimeFetch, startTimeValues, 2, 0, null).toList if (i % 1000 == 0) println(r) diff --git a/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/parquet/KvinParquetTest.java b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/parquet/KvinParquetTest.java index 0f2c694f..33c36e71 100644 --- a/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/parquet/KvinParquetTest.java +++ b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/parquet/KvinParquetTest.java @@ -2,6 +2,7 @@ import io.github.linkedfactory.core.kvin.Kvin; import io.github.linkedfactory.core.kvin.KvinTuple; +import io.github.linkedfactory.core.kvin.Record; import io.github.linkedfactory.core.kvin.util.KvinTupleGenerator; import net.enilink.commons.iterator.IExtendedIterator; import net.enilink.komma.core.URI; @@ -20,6 +21,7 @@ public class KvinParquetTest { KvinParquet kvinParquet; File tempDir; KvinTupleGenerator tupleGenerator; + final long startTime = 1696197600000L; @Before public void setup() throws IOException { @@ -27,7 +29,7 @@ public void setup() throws IOException { tupleGenerator = new KvinTupleGenerator(); kvinParquet = new KvinParquet(tempDir.toString()); // 02.10.2023 0:00 - kvinParquet.put(tupleGenerator.setStartTime(1696197600000L) + kvinParquet.put(tupleGenerator.setStartTime(startTime) .setItems(500) .setPropertiesPerItem(10) .setValuesPerProperty(10) @@ -145,4 +147,20 @@ public void shouldFetchProperties() { assertEquals(10, properties.toList().size()); properties.close(); } + + @Test + public void shouldFetchRecord() { + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/1"); + URI property = URIs.createURI("some:property"); + + var record = new Record(URIs.createURI("property:p1"), true) + .append(new Record(URIs.createURI("property:p2"), "value2")); + kvinParquet.put(new KvinTuple(item, property, Kvin.DEFAULT_CONTEXT, startTime, record)); + + IExtendedIterator tuples = kvinParquet.fetch(item, property, Kvin.DEFAULT_CONTEXT, 1); + List list = tuples.toList(); + assertEquals(1, list.size()); + assertEquals(record, list.get(0).value); + } + } diff --git a/bundles/io.github.linkedfactory.core/src/test/scala/io/github/linkedfactory/core/rdf4j/kvin/RemoteServiceTest.scala b/bundles/io.github.linkedfactory.core/src/test/scala/io/github/linkedfactory/core/rdf4j/kvin/RemoteServiceTest.scala index 9ac0a855..958dec06 100644 --- a/bundles/io.github.linkedfactory.core/src/test/scala/io/github/linkedfactory/core/rdf4j/kvin/RemoteServiceTest.scala +++ b/bundles/io.github.linkedfactory.core/src/test/scala/io/github/linkedfactory/core/rdf4j/kvin/RemoteServiceTest.scala @@ -175,14 +175,14 @@ class RemoteServiceTest { val queryStr = s""" |SELECT * { - | values ?item { } + | values ?item1 { } | SERVICE { - | ?item [ $time ; 1 ; + | ?item1 [ $time ; 1 ; | ?value ; ?time | ] | } | SERVICE { - | ?item [ $time ; 1 ; + | [ $time ; 1 ; | ?value1; ?time1 | ] | } @@ -192,11 +192,13 @@ class RemoteServiceTest { val query = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryStr, "http://example.org/") val r = query.evaluate + Assert.assertTrue(r.hasNext) while (r.hasNext) { val bs = r.next - Assert.assertTrue(bs.getValue("property").toString.equals("http://example.org/p1")) - Assert.assertTrue(bs.getValue("time").stringValue().equals("1619424246120")) - Assert.assertTrue(bs.getValue("value").stringValue().equals("57.934878949512196")) + if (bs.getValue("time1") != null) { + Assert.assertTrue(bs.getValue("time").stringValue().equals("1619424246120")) + Assert.assertTrue(bs.getValue("value").stringValue().equals("57.934878949512196")) + } } Assert.assertTrue(kvinHttpInstance.count == 2) Assert.assertTrue(kvinHttpInstance.fetchCall == 2) @@ -236,12 +238,12 @@ class RemoteServiceTest { val query = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryStr, "http://example.org/") val r = query.evaluate + Assert.assertTrue(r.hasNext) while (r.hasNext) { val bs = r.next Assert.assertEquals(115.869757899024392, bs.getValue("total").asInstanceOf[Literal].doubleValue(), 10e-6) } Assert.assertTrue(kvinHttpInstance.count == 2) - Assert.assertTrue(kvinHttpInstance.fetchCall == 4) r.close() } finally { conn.close()