diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/util/JsonFormatWriter.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/util/JsonFormatWriter.java index 87e5de35..7d0c517f 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/util/JsonFormatWriter.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/util/JsonFormatWriter.java @@ -23,13 +23,17 @@ public JsonFormatWriter(OutputStream outputStream, boolean prettyPrint) throws I if (prettyPrint) { this.generator.useDefaultPrettyPrinter(); } - this.generator.writeStartObject(); + initialStartObject(); } public JsonFormatWriter(OutputStream outputStream) throws IOException { this(outputStream, false); } + protected void initialStartObject() throws IOException { + this.generator.writeStartObject(); + } + public static String toJsonString(IExtendedIterator it) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); JsonFormatWriter writer = new JsonFormatWriter(baos); diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KVIN.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KVIN.java index 8b183b1c..0016b981 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KVIN.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KVIN.java @@ -13,6 +13,7 @@ public interface KVIN { IRI OP = SimpleValueFactory.getInstance().createIRI(KVIN + "op"); IRI VALUE = SimpleValueFactory.getInstance().createIRI(KVIN + "value"); + IRI VALUE_JSON = SimpleValueFactory.getInstance().createIRI(KVIN + "valueJson"); IRI TIME = SimpleValueFactory.getInstance().createIRI(KVIN + "time"); IRI SEQNR = SimpleValueFactory.getInstance().createIRI(KVIN + "seqNr"); diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationStrategy.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationStrategy.java index 6c7ca2b0..12589fd4 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationStrategy.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/rdf4j/kvin/KvinEvaluationStrategy.java @@ -3,12 +3,14 @@ import io.github.linkedfactory.core.kvin.Kvin; import io.github.linkedfactory.core.kvin.KvinTuple; import io.github.linkedfactory.core.kvin.Record; +import io.github.linkedfactory.core.kvin.util.JsonFormatWriter; import io.github.linkedfactory.core.rdf4j.common.Conversions; import io.github.linkedfactory.core.rdf4j.common.HasValue; import io.github.linkedfactory.core.rdf4j.common.query.CompositeBindingSet; import io.github.linkedfactory.core.rdf4j.common.query.InnerJoinIteratorEvaluationStep; import io.github.linkedfactory.core.rdf4j.common.query.BatchQueryEvaluationStep; import io.github.linkedfactory.core.rdf4j.kvin.query.*; +import net.enilink.komma.core.URI; import net.enilink.vocab.rdf.RDF; import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration; import org.eclipse.rdf4j.common.iteration.CloseableIteration; @@ -30,6 +32,10 @@ import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy; import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -40,6 +46,26 @@ import static io.github.linkedfactory.core.rdf4j.common.query.Helpers.findFirstFetch; public class KvinEvaluationStrategy extends StrictEvaluationStrategy { + static class InternalJsonFormatWriter extends JsonFormatWriter { + InternalJsonFormatWriter(OutputStream outputStream) throws IOException { + super(outputStream); + } + + @Override + protected void initialStartObject() { + // do nothing + } + + @Override + protected void writeValue(Object value) throws IOException { + super.writeValue(value); + } + + @Override + public void end() throws IOException { + // do nothing + } + } final Kvin kvin; final ParameterScanner scanner; @@ -79,6 +105,25 @@ public CloseableIteration evaluate(Stateme Var valueVar = stmt.getObjectVar(); Value rdfValue = Conversions.toRdfValue(tuple.value, vf); return compareAndBind(bs, valueVar, rdfValue); + } else if (KVIN.VALUE_JSON.equals(predValue)) { + Var valueVar = stmt.getObjectVar(); + Value rdfValue; + if (tuple.value instanceof Record || tuple.value instanceof Object[] || tuple.value instanceof URI) { + var baos = new ByteArrayOutputStream(); + try { + var writer = new InternalJsonFormatWriter(baos); + writer.writeValue(tuple.value); + writer.close(); + } catch (IOException e) { + throw new QueryEvaluationException(e); + } catch (Exception e) { + throw new QueryEvaluationException(e); + } + rdfValue = vf.createLiteral(baos.toString(StandardCharsets.UTF_8)); + } else { + rdfValue = Conversions.toRdfValue(tuple.value, vf); + } + return compareAndBind(bs, valueVar, rdfValue); } else if (KVIN.TIME.equals(predValue)) { Var timeVar = stmt.getObjectVar(); Value timeValue = Conversions.toRdfValue(tuple.time, vf); @@ -215,32 +260,6 @@ public CloseableIteration evaluate(List { - final int[] signs; - - public LongArrayComparator(List signs) { - this.signs = new int[signs.size()]; - int i = 0; - for (Integer sign : signs) { - this.signs[i++] = sign; - } - } - - @Override - public int compare(long[] a, long[] b) { - for (int i = 0; i < a.length; i++) { - if (i >= b.length) { - return 1; - } - int diff = signs[i] * Long.compare(a[i], b[i]); - if (diff != 0) { - return diff; - } - } - return 0; - } - } - @Override protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context) throws QueryEvaluationException { QueryEvaluationStep leftPrepared = precompile(join.getLeftArg(), context); @@ -275,21 +294,21 @@ protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context) return values; }, context, executorService); } else { - return new BatchQueryEvaluationStep() { - @Override - public CloseableIteration evaluate(BindingSet bindingSet) { - return new HashJoinIteration(leftPrepared, rightPrepared, bindingSet, false, joinAttributes, context); - } - - @Override - public CloseableIteration evaluate(List bindingSets) { - return new HashJoinIteration( - BatchQueryEvaluationStep.evaluate(leftPrepared, bindingSets), - join.getLeftArg().getBindingNames(), - BatchQueryEvaluationStep.evaluate(rightPrepared, bindingSets), - join.getRightArg().getBindingNames(), false); - } - }; + return new BatchQueryEvaluationStep() { + @Override + public CloseableIteration evaluate(BindingSet bindingSet) { + return new HashJoinIteration(leftPrepared, rightPrepared, bindingSet, false, joinAttributes, context); + } + + @Override + public CloseableIteration evaluate(List bindingSets) { + return new HashJoinIteration( + BatchQueryEvaluationStep.evaluate(leftPrepared, bindingSets), + join.getLeftArg().getBindingNames(), + BatchQueryEvaluationStep.evaluate(rightPrepared, bindingSets), + join.getRightArg().getBindingNames(), false); + } + }; } } else { // strictly use lateral joins if left arg contains a KVIN fetch as right arg probably depends on the results @@ -330,7 +349,7 @@ boolean useHashJoin(TupleExpr leftArg, TupleExpr rightArg) { // in case of projections with aggregates we just use the projected binding names Set leftAssured = leftArg instanceof Projection ? leftArg.getBindingNames() : leftArg.getAssuredBindingNames(); - return ! rightFetch.getRequiredBindings().stream().anyMatch(required -> leftAssured.contains(required)); + return !rightFetch.getRequiredBindings().stream().anyMatch(required -> leftAssured.contains(required)); } } return false; @@ -372,4 +391,30 @@ public ValueFactory getValueFactory() { public Supplier getExecutorService() { return executorService; } + + static class LongArrayComparator implements Comparator { + final int[] signs; + + public LongArrayComparator(List signs) { + this.signs = new int[signs.size()]; + int i = 0; + for (Integer sign : signs) { + this.signs[i++] = sign; + } + } + + @Override + public int compare(long[] a, long[] b) { + for (int i = 0; i < a.length; i++) { + if (i >= b.length) { + return 1; + } + int diff = signs[i] * Long.compare(a[i], b[i]); + if (diff != 0) { + return diff; + } + } + return 0; + } + } } \ No newline at end of file diff --git a/bundles/io.github.linkedfactory.core/src/test/scala/io/github/linkedfactory/core/rdf4j/kvin/ServiceTest.scala b/bundles/io.github.linkedfactory.core/src/test/scala/io/github/linkedfactory/core/rdf4j/kvin/ServiceTest.scala index fd6c6ea2..de39088b 100644 --- a/bundles/io.github.linkedfactory.core/src/test/scala/io/github/linkedfactory/core/rdf4j/kvin/ServiceTest.scala +++ b/bundles/io.github.linkedfactory.core/src/test/scala/io/github/linkedfactory/core/rdf4j/kvin/ServiceTest.scala @@ -27,7 +27,8 @@ import org.eclipse.rdf4j.repository.sail.SailRepository import org.eclipse.rdf4j.sail.memory.MemoryStore import org.junit.{After, Assert, Before, Test} -import java.io.{File, IOException} +import java.io.{ByteArrayOutputStream, File, IOException} +import java.nio.charset.StandardCharsets import java.nio.file.attribute.BasicFileAttributes import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor} import java.util.concurrent.Executors @@ -118,6 +119,48 @@ class ServiceTest { } } + @Test + def jsonTest { + val data = addRecords(2, 10) + + val conn = repository.getConnection + val vf = repository.getValueFactory + try { + val time = START_TIME + 20 + + val values = "values ?item { }" + val queryStr = + s"""select * where { $values service { + |?item ?v . ?v $time ; 1 . + |?v ?record ; ?time . + |} }""".stripMargin + val query = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryStr, "http://example.org/") + + val dataByItemAndTime = data.filter(_.time <= time).groupBy(_.item) + .view.mapValues(_.groupBy(_.time)) + + val r = query.evaluate + while (r.hasNext) { + val bs = r.next + val item = URIs.createURI(bs.getValue("item").toString) + val time = bs.getValue("time").asInstanceOf[Literal].longValue() + + val itemValue = dataByItemAndTime(item)(time).head.value + + val baos = new ByteArrayOutputStream(); + val writer = new KvinEvaluationStrategy.InternalJsonFormatWriter(baos); + writer.writeValue(itemValue) + writer.close() + + Assert.assertEquals(baos.toString(StandardCharsets.UTF_8), + bs.getValue("record").asInstanceOf[Literal].getLabel()) + } + r.close + } finally { + conn.close + } + } + @Test def arrayTest { val data = addArrays(2, 10)