Skip to content

Commit

Permalink
Support <kvin:valueJson> to convert values into JSON strings.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jan 14, 2025
1 parent fd09b60 commit 080b34d
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ public JsonFormatWriter(OutputStream outputStream, boolean prettyPrint) throws I
if (prettyPrint) {
this.generator.useDefaultPrettyPrinter();
}
this.generator.writeStartObject();
initialStartObject();
}

public JsonFormatWriter(OutputStream outputStream) throws IOException {
this(outputStream, false);
}

protected void initialStartObject() throws IOException {
this.generator.writeStartObject();
}

public static String toJsonString(IExtendedIterator<KvinTuple> it) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonFormatWriter writer = new JsonFormatWriter(baos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public interface KVIN {
IRI OP = SimpleValueFactory.getInstance().createIRI(KVIN + "op");

IRI VALUE = SimpleValueFactory.getInstance().createIRI(KVIN + "value");
IRI VALUE_JSON = SimpleValueFactory.getInstance().createIRI(KVIN + "valueJson");
IRI TIME = SimpleValueFactory.getInstance().createIRI(KVIN + "time");
IRI SEQNR = SimpleValueFactory.getInstance().createIRI(KVIN + "seqNr");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import io.github.linkedfactory.core.kvin.Kvin;
import io.github.linkedfactory.core.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.Record;
import io.github.linkedfactory.core.kvin.util.JsonFormatWriter;
import io.github.linkedfactory.core.rdf4j.common.Conversions;
import io.github.linkedfactory.core.rdf4j.common.HasValue;
import io.github.linkedfactory.core.rdf4j.common.query.CompositeBindingSet;
import io.github.linkedfactory.core.rdf4j.common.query.InnerJoinIteratorEvaluationStep;
import io.github.linkedfactory.core.rdf4j.common.query.BatchQueryEvaluationStep;
import io.github.linkedfactory.core.rdf4j.kvin.query.*;
import net.enilink.komma.core.URI;
import net.enilink.vocab.rdf.RDF;
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
Expand All @@ -30,6 +32,10 @@
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
Expand All @@ -40,6 +46,26 @@
import static io.github.linkedfactory.core.rdf4j.common.query.Helpers.findFirstFetch;

public class KvinEvaluationStrategy extends StrictEvaluationStrategy {
static class InternalJsonFormatWriter extends JsonFormatWriter {
InternalJsonFormatWriter(OutputStream outputStream) throws IOException {
super(outputStream);
}

@Override
protected void initialStartObject() {
// do nothing
}

@Override
protected void writeValue(Object value) throws IOException {
super.writeValue(value);
}

@Override
public void end() throws IOException {
// do nothing
}
}

final Kvin kvin;
final ParameterScanner scanner;
Expand Down Expand Up @@ -79,6 +105,25 @@ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Stateme
Var valueVar = stmt.getObjectVar();
Value rdfValue = Conversions.toRdfValue(tuple.value, vf);
return compareAndBind(bs, valueVar, rdfValue);
} else if (KVIN.VALUE_JSON.equals(predValue)) {
Var valueVar = stmt.getObjectVar();
Value rdfValue;
if (tuple.value instanceof Record || tuple.value instanceof Object[] || tuple.value instanceof URI) {
var baos = new ByteArrayOutputStream();
try {
var writer = new InternalJsonFormatWriter(baos);
writer.writeValue(tuple.value);
writer.close();
} catch (IOException e) {
throw new QueryEvaluationException(e);
} catch (Exception e) {
throw new QueryEvaluationException(e);
}
rdfValue = vf.createLiteral(baos.toString(StandardCharsets.UTF_8));
} else {
rdfValue = Conversions.toRdfValue(tuple.value, vf);
}
return compareAndBind(bs, valueVar, rdfValue);
} else if (KVIN.TIME.equals(predValue)) {
Var timeVar = stmt.getObjectVar();
Value timeValue = Conversions.toRdfValue(tuple.time, vf);
Expand Down Expand Up @@ -215,32 +260,6 @@ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(List<Bi
}
}

static class LongArrayComparator implements Comparator<long[]> {
final int[] signs;

public LongArrayComparator(List<Integer> signs) {
this.signs = new int[signs.size()];
int i = 0;
for (Integer sign : signs) {
this.signs[i++] = sign;
}
}

@Override
public int compare(long[] a, long[] b) {
for (int i = 0; i < a.length; i++) {
if (i >= b.length) {
return 1;
}
int diff = signs[i] * Long.compare(a[i], b[i]);
if (diff != 0) {
return diff;
}
}
return 0;
}
}

@Override
protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context) throws QueryEvaluationException {
QueryEvaluationStep leftPrepared = precompile(join.getLeftArg(), context);
Expand Down Expand Up @@ -275,21 +294,21 @@ protected QueryEvaluationStep prepare(Join join, QueryEvaluationContext context)
return values;
}, context, executorService);
} else {
return new BatchQueryEvaluationStep() {
@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindingSet) {
return new HashJoinIteration(leftPrepared, rightPrepared, bindingSet, false, joinAttributes, context);
}

@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(List<BindingSet> bindingSets) {
return new HashJoinIteration(
BatchQueryEvaluationStep.evaluate(leftPrepared, bindingSets),
join.getLeftArg().getBindingNames(),
BatchQueryEvaluationStep.evaluate(rightPrepared, bindingSets),
join.getRightArg().getBindingNames(), false);
}
};
return new BatchQueryEvaluationStep() {
@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindingSet) {
return new HashJoinIteration(leftPrepared, rightPrepared, bindingSet, false, joinAttributes, context);
}

@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(List<BindingSet> bindingSets) {
return new HashJoinIteration(
BatchQueryEvaluationStep.evaluate(leftPrepared, bindingSets),
join.getLeftArg().getBindingNames(),
BatchQueryEvaluationStep.evaluate(rightPrepared, bindingSets),
join.getRightArg().getBindingNames(), false);
}
};
}
} else {
// strictly use lateral joins if left arg contains a KVIN fetch as right arg probably depends on the results
Expand Down Expand Up @@ -330,7 +349,7 @@ boolean useHashJoin(TupleExpr leftArg, TupleExpr rightArg) {
// in case of projections with aggregates we just use the projected binding names
Set<String> leftAssured = leftArg instanceof Projection ? leftArg.getBindingNames() :
leftArg.getAssuredBindingNames();
return ! rightFetch.getRequiredBindings().stream().anyMatch(required -> leftAssured.contains(required));
return !rightFetch.getRequiredBindings().stream().anyMatch(required -> leftAssured.contains(required));
}
}
return false;
Expand Down Expand Up @@ -372,4 +391,30 @@ public ValueFactory getValueFactory() {
public Supplier<ExecutorService> getExecutorService() {
return executorService;
}

static class LongArrayComparator implements Comparator<long[]> {
final int[] signs;

public LongArrayComparator(List<Integer> signs) {
this.signs = new int[signs.size()];
int i = 0;
for (Integer sign : signs) {
this.signs[i++] = sign;
}
}

@Override
public int compare(long[] a, long[] b) {
for (int i = 0; i < a.length; i++) {
if (i >= b.length) {
return 1;
}
int diff = signs[i] * Long.compare(a[i], b[i]);
if (diff != 0) {
return diff;
}
}
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.eclipse.rdf4j.repository.sail.SailRepository
import org.eclipse.rdf4j.sail.memory.MemoryStore
import org.junit.{After, Assert, Before, Test}

import java.io.{File, IOException}
import java.io.{ByteArrayOutputStream, File, IOException}
import java.nio.charset.StandardCharsets
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor}
import java.util.concurrent.Executors
Expand Down Expand Up @@ -118,6 +119,48 @@ class ServiceTest {
}
}

@Test
def jsonTest {
val data = addRecords(2, 10)

val conn = repository.getConnection
val vf = repository.getValueFactory
try {
val time = START_TIME + 20

val values = "values ?item { <item-1> <item-2> }"
val queryStr =
s"""select * where { $values service <kvin:> {
|?item <property:value> ?v . ?v <kvin:to> $time ; <kvin:limit> 1 .
|?v <kvin:valueJson> ?record ; <kvin:time> ?time .
|} }""".stripMargin
val query = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryStr, "http://example.org/")

val dataByItemAndTime = data.filter(_.time <= time).groupBy(_.item)
.view.mapValues(_.groupBy(_.time))

val r = query.evaluate
while (r.hasNext) {
val bs = r.next
val item = URIs.createURI(bs.getValue("item").toString)
val time = bs.getValue("time").asInstanceOf[Literal].longValue()

val itemValue = dataByItemAndTime(item)(time).head.value

val baos = new ByteArrayOutputStream();
val writer = new KvinEvaluationStrategy.InternalJsonFormatWriter(baos);
writer.writeValue(itemValue)
writer.close()

Assert.assertEquals(baos.toString(StandardCharsets.UTF_8),
bs.getValue("record").asInstanceOf[Literal].getLabel())
}
r.close
} finally {
conn.close
}
}

@Test
def arrayTest {
val data = addArrays(2, 10)
Expand Down

0 comments on commit 080b34d

Please sign in to comment.