Skip to content

Commit

Permalink
Use async execution for iterators in values endpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Aug 6, 2024
1 parent d0ada46 commit d3dd7de
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.github.linkedfactory.core.kvin.util;

import io.github.linkedfactory.core.rdf4j.common.query.InnerJoinIterator;
import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.commons.iterator.NiceIterator;
import org.eclipse.rdf4j.query.QueryEvaluationException;

import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class AsyncExtendedIterator<T> extends NiceIterator<T> {
static final Object NULL_ELEMENT = new Object();
final BlockingQueue<T> nextElements;
volatile boolean closed = false;
T next;

public AsyncExtendedIterator(Supplier<IExtendedIterator<T>> base, Supplier<ExecutorService> executorService) {
nextElements = new ArrayBlockingQueue<>(100);
var currentAsync = InnerJoinIterator.asyncDepth.get();
executorService.get().submit(() -> {
InnerJoinIterator.asyncDepth.set(currentAsync != null ? currentAsync + 1 : 1);
var baseIt = base.get();
try {
while (baseIt.hasNext()) {
T element = baseIt.next();
while (!nextElements.offer(element, 10, TimeUnit.MILLISECONDS)) {
if (closed) {
return;
}
}
}
baseIt.close();
while (!nextElements.offer((T) NULL_ELEMENT, 10, TimeUnit.MILLISECONDS)) {
if (closed) {
return;
}
}
} catch (InterruptedException e) {
// just return
} finally {
baseIt.close();
InnerJoinIterator.asyncDepth.remove();
}
});
}

@Override
public boolean hasNext() {
if (next == null) {
try {
T nextElement = nextElements.take();
if (nextElement != NULL_ELEMENT) {
next = nextElement;
}
} catch (InterruptedException e) {
return false;
}
}
return next != null;
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
T result = next;
next = null;
return result;
}

@Override
public void remove() throws QueryEvaluationException {
throw new UnsupportedOperationException("Remove is not supported");
}

@Override
public void close() {
closed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,30 @@
import org.osgi.service.component.annotations.*;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class FederatedServiceComponent {
private static ExecutorService executorService;
IModelSet ms;
Kvin kvin;
AbstractFederatedServiceResolver serviceResolver;
@Reference(cardinality = ReferenceCardinality.OPTIONAL)
volatile ContextProvider contextProvider;

public static synchronized ExecutorService getExecutorService() {
if (executorService == null) {
executorService = Executors.newCachedThreadPool();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (executorService != null) {
executorService.shutdown();
}
}));
}
return executorService;
}

@Activate
void activate() {
// add custom SPARQL functions
Expand All @@ -42,15 +57,15 @@ void activate() {
protected FederatedService createService(String serviceUrl)
throws QueryEvaluationException {
if (serviceUrl.startsWith("aas-api:")) {
return new AasFederatedService(serviceUrl.replaceFirst("^aas-api:", ""), this::getExecutorService);
return new AasFederatedService(serviceUrl.replaceFirst("^aas-api:", ""), () -> getExecutorService());
} else if (serviceUrl.equals("kvin:")) {
return new KvinFederatedService(kvin,
this::getExecutorService,
() -> getExecutorService(),
() -> contextProvider == null ? Kvin.DEFAULT_CONTEXT : contextProvider.getContext(),
false);
} else if (getKvinServiceUrl(serviceUrl).isPresent()) {
String url = getKvinServiceUrl(serviceUrl).get();
return new KvinFederatedService(new KvinHttp(url), this::getExecutorService,
return new KvinFederatedService(new KvinHttp(url), () -> getExecutorService(),
() -> contextProvider == null ? Kvin.DEFAULT_CONTEXT : contextProvider.getContext(),
true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@
import io.github.linkedfactory.core.kvin.Kvin;
import io.github.linkedfactory.core.rdf4j.ContextProvider;
import io.github.linkedfactory.core.rdf4j.common.query.CompositeBindingSet;
import io.github.linkedfactory.core.rdf4j.kvin.query.KvinFetchOptimizer;
import io.github.linkedfactory.core.rdf4j.kvin.query.ParameterScanner;
import io.github.linkedfactory.core.rdf4j.common.query.QueryJoinOptimizer;
import io.github.linkedfactory.core.rdf4j.common.query.QueryModelPruner;

import io.github.linkedfactory.core.rdf4j.kvin.query.KvinFetchOptimizer;
import io.github.linkedfactory.core.rdf4j.kvin.query.ParameterScanner;
import org.eclipse.rdf4j.common.exception.RDF4JException;
import org.eclipse.rdf4j.common.iteration.*;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.Service;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
package io.github.linkedfactory.service

import io.github.linkedfactory.core.kvin.util.CsvFormatParser
import io.github.linkedfactory.core.kvin.util.{AsyncExtendedIterator, CsvFormatParser}
import io.github.linkedfactory.core.kvin.{Kvin, KvinTuple, Record}
import io.github.linkedfactory.core.rdf4j.FederatedServiceComponent
import io.github.linkedfactory.service.util.{JsonFormatParser, LineProtocolParser}
import net.enilink.commons.iterator.IExtendedIterator
import net.enilink.komma.core.{URI, URIs}
Expand All @@ -29,6 +30,7 @@ import net.liftweb.json.Extraction.decompose
import net.liftweb.json.JsonAST._
import net.liftweb.json.JsonDSL._
import net.liftweb.util.Helpers._
import net.liftweb.util.Schedule
import org.apache.commons.csv.{CSVFormat, CSVPrinter}

import java.io.{InputStream, OutputStream, OutputStreamWriter}
Expand Down Expand Up @@ -295,6 +297,8 @@ class KvinService(path: List[String], store: Kvin) extends RestHelper with Logga
val interval = S.param("interval") flatMap (v => tryo(v.toDouble.longValue)) openOr 0L
val op = S.param("op") map (_.trim)

val executorService = FederatedServiceComponent.getExecutorService()
val modelUri = contextModelUri
val results = items map { item =>
val itemData = for (
property <- {
Expand All @@ -305,9 +309,9 @@ class KvinService(path: List[String], store: Kvin) extends RestHelper with Logga
) yield {
val propertyData = (interval, op) match {
case (_, Full(op)) if interval > 0 =>
store.fetch(item, property, contextModelUri, end, begin, limit, interval, op)
new AsyncExtendedIterator(() => store.fetch(item, property, modelUri, end, begin, limit, interval, op), () => executorService)
case _ =>
store.fetch(item, property, contextModelUri, end, begin, limit, 0, null)
new AsyncExtendedIterator(() => store.fetch(item, property, modelUri, end, begin, limit, 0, null), () => executorService)
}
property.toString -> propertyData
}
Expand Down

0 comments on commit d3dd7de

Please sign in to comment.