From 700dd547c3581611f5c9b2f49d61e92ec963738e Mon Sep 17 00:00:00 2001 From: Ken Wenzel Date: Fri, 19 Jul 2024 09:57:46 +0200 Subject: [PATCH] Improve reading performance further. --- .../core/kvin/parquet/KvinParquet.java | 147 +++++++++--------- 1 file changed, 75 insertions(+), 72 deletions(-) diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java index f048f03a..b6e082f8 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java @@ -36,6 +36,7 @@ import org.eclipse.rdf4j.common.concurrent.locks.Lock; import org.eclipse.rdf4j.common.concurrent.locks.ReadPrefReadWriteLockManager; import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager; +import org.eclipse.rdf4j.query.algebra.In; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +62,23 @@ public class KvinParquet implements Kvin { static final Logger log = LoggerFactory.getLogger(KvinParquet.class); - + static Comparator RECORD_COMPARATOR = (a, b) -> { + int diff = ((ByteBuffer) a.get(0)).compareTo((ByteBuffer) b.get(0)); + if (diff != 0) { + return diff; + } + diff = ((Comparable) a.get(1)).compareTo((Long) b.get(1)); + if (diff != 0) { + // time is reverse + return -diff; + } + diff = ((Comparable) a.get(2)).compareTo((Integer) b.get(2)); + if (diff != 0) { + // seqNr is reverse + return -diff; + } + return 0; + }; // used by reader final Cache itemIdCache = CacheBuilder.newBuilder().maximumSize(10000).build(); final Cache propertyIdCache = CacheBuilder.newBuilder().maximumSize(10000).build(); @@ -71,7 +88,6 @@ public class KvinParquet implements Kvin { Cache propertyIdReverseLookUpCache = CacheBuilder.newBuilder().maximumSize(10000).build(); Cache metaCache = CacheBuilder.newBuilder().maximumSize(10000).build(); String archiveLocation; - ReadWriteLockManager lockManager = new ReadPrefReadWriteLockManager(true, 5000); public KvinParquet(String archiveLocation) { @@ -614,6 +630,7 @@ protected KvinTuple createElement(URI item, URI property, URI context, long time } public URI getProperty(ByteBuffer idBuffer) throws IOException { + // skip item id idBuffer.getLong(); Long propertyId = idBuffer.getLong(); URI cachedProperty = propertyIdReverseLookUpCache.getIfPresent(propertyId); @@ -655,24 +672,6 @@ private ParquetReader createGenericReader(InputFile file, FilterC .build(); } - static Comparator TUPLE_COMPARATOR = (a, b) -> { - int diff = a.property.equals(b.property) ? 0 : a.toString().compareTo(b.toString()); - if (diff != 0) { - return diff; - } - diff = Long.compare(a.time, b.time); - if (diff != 0) { - // time is reverse - return -diff; - } - diff = Integer.compare(a.seqNr, b.seqNr); - if (diff != 0) { - // seqNr is reverse - return -diff; - } - return 0; - }; - private synchronized IExtendedIterator fetchInternal(URI item, URI property, URI context, Long end, Long begin, Long limit) throws IOException { Lock readLock = readLock(); try { @@ -702,9 +701,9 @@ private synchronized IExtendedIterator fetchInternal(URI item, URI pr return NiceIterator.emptyIterator(); } return new NiceIterator() { - final PriorityQueue>> nextTuples = - new PriorityQueue<>(Comparator.comparing(Pair::getFirst, TUPLE_COMPARATOR)); - KvinTuple prevTuple, nextTuple; + final PriorityQueue>> nextTuples = + new PriorityQueue<>(Comparator.comparing(Pair::getFirst, RECORD_COMPARATOR)); + GenericRecord prevRecord, nextRecord; long propertyValueCount; int folderIndex = -1; boolean closed; @@ -724,7 +723,7 @@ private synchronized IExtendedIterator fetchInternal(URI item, URI pr } } - KvinTuple selectNextTuple() throws IOException { + GenericRecord selectNextRecord() throws IOException { while (true) { while (nextTuples.isEmpty() && folderIndex < dataFolders.size() - 1) { nextReaders(); @@ -732,12 +731,11 @@ KvinTuple selectNextTuple() throws IOException { var min = nextTuples.isEmpty() ? null : nextTuples.poll(); if (min != null) { // omit duplicates in terms of id, time, and seqNr - boolean isDuplicate = prevTuple != null && TUPLE_COMPARATOR.compare(prevTuple, min.getFirst()) == 0; + boolean isDuplicate = prevRecord != null && RECORD_COMPARATOR.compare(prevRecord, min.getFirst()) == 0; var record = min.getSecond().read(); if (record != null) { - KvinTuple tuple = convert(record); - nextTuples.add(new Pair<>(tuple, min.getSecond())); + nextTuples.add(new Pair<>(record, min.getSecond())); } else { try { min.getSecond().close(); @@ -756,27 +754,27 @@ var record = min.getSecond().read(); @Override public boolean hasNext() { - if (nextTuple != null) { + if (nextRecord != null) { return true; } try { // skipping properties if limit is reached if (limit != 0 && propertyValueCount >= limit) { - while ((nextTuple = selectNextTuple()) != null) { - if (!nextTuple.property.equals(prevTuple.property)) { + while ((nextRecord = selectNextRecord()) != null) { + if (!nextRecord.get(0).equals(prevRecord.get(0))) { propertyValueCount = 0; break; } } } - if (nextTuple == null) { - nextTuple = selectNextTuple(); + if (nextRecord == null) { + nextRecord = selectNextRecord(); } propertyValueCount++; } catch (IOException e) { throw new UncheckedIOException(e); } - if (nextTuple != null) { + if (nextRecord != null) { return true; } else { close(); @@ -789,9 +787,14 @@ public KvinTuple next() { if (!hasNext()) { throw new NoSuchElementException(); } else { - KvinTuple tuple = nextTuple; - prevTuple = tuple; - nextTuple = null; + KvinTuple tuple; + try { + tuple = convert(nextRecord); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + prevRecord = nextRecord; + nextRecord = null; return tuple; } } @@ -826,7 +829,7 @@ void nextReaders() throws IOException { ParquetReader reader = createGenericReader(inputFile, FilterCompat.get(filterFinal)); GenericRecord record = reader.read(); if (record != null) { - nextTuples.add(new Pair<>(convert(record), reader)); + nextTuples.add(new Pair<>(record, reader)); } else { try { reader.close(); @@ -883,33 +886,33 @@ private List getDataFolders(IdMappings idMappings) throws IO throw new IOException(e); } return meta.entrySet().stream().flatMap(entry -> { - String idRange = (String) entry.getValue(); - long[] minMax = splitRange(idRange); - if (minMax != null && itemId >= minMax[0] && itemId <= minMax[1]) { - java.nio.file.Path yearFolder = Paths.get(archiveLocation, entry.getKey().toString()); - java.nio.file.Path yearMetaPath = yearFolder.resolve("meta.properties"); - try { - Properties yearMeta = metaCache.get(yearMetaPath, () -> { - Properties p = new Properties(); - if (Files.exists(yearMetaPath)) { - p.load(Files.newInputStream(yearMetaPath)); + String idRange = (String) entry.getValue(); + long[] minMax = splitRange(idRange); + if (minMax != null && itemId >= minMax[0] && itemId <= minMax[1]) { + java.nio.file.Path yearFolder = Paths.get(archiveLocation, entry.getKey().toString()); + java.nio.file.Path yearMetaPath = yearFolder.resolve("meta.properties"); + try { + Properties yearMeta = metaCache.get(yearMetaPath, () -> { + Properties p = new Properties(); + if (Files.exists(yearMetaPath)) { + p.load(Files.newInputStream(yearMetaPath)); + } + return p; + }); + return yearMeta.entrySet().stream().filter(weekEntry -> { + String weekIdRange = (String) weekEntry.getValue(); + long[] weekMinMax = splitRange(weekIdRange); + return weekMinMax != null && itemId >= weekMinMax[0] && itemId <= weekMinMax[1]; + }).map(weekEntry -> yearFolder.resolve(weekEntry.getKey().toString())); + } catch (Exception e) { + log.error("Error while loading meta data", e); } - return p; - }); - return yearMeta.entrySet().stream().filter(weekEntry -> { - String weekIdRange = (String) weekEntry.getValue(); - long[] weekMinMax = splitRange(weekIdRange); - return weekMinMax != null && itemId >= weekMinMax[0] && itemId <= weekMinMax[1]; - }).map(weekEntry -> yearFolder.resolve(weekEntry.getKey().toString())); - } catch (Exception e) { - log.error("Error while loading meta data", e); - } - } - return Stream.empty(); - }) - // sort by year and month descending (recent data first) - .sorted(Comparator.reverseOrder()) - .collect(Collectors.toList()); + } + return Stream.empty(); + }) + // sort by year and month descending (recent data first) + .sorted(Comparator.reverseOrder()) + .collect(Collectors.toList()); } @Override @@ -947,24 +950,24 @@ private KvinTupleMetadata getFirstTuple(URI item, Long itemId, Long propertyId, FilterPredicate filter = generateFetchFilter(idMappings); List dataFolders = getDataFolders(idMappings); - KvinTupleInternal firstTuple = null; + GenericRecord firstRecord = null; for (java.nio.file.Path dataFolder : dataFolders) { for (Path dataFile : getDataFiles(dataFolder.toString())) { - ParquetReader reader = createReader(getFile(dataFile), FilterCompat.get(filter)); - KvinTupleInternal tuple = reader.read(); - if (firstTuple == null || tuple != null && firstTuple != null && tuple.compareTo(firstTuple) < 0) { - firstTuple = tuple; + ParquetReader reader = createGenericReader(getFile(dataFile), FilterCompat.get(filter)); + GenericRecord record = reader.read(); + if (firstRecord == null || record != null && firstRecord != null && RECORD_COMPARATOR.compare(record, firstRecord) < 0) { + firstRecord = record; } reader.close(); } } - if (firstTuple != null) { - URI firstTupleProperty = getProperty(ByteBuffer.wrap(firstTuple.id)); + if (firstRecord != null) { + URI property = getProperty((ByteBuffer) firstRecord.get(0)); if (itemId == null) { - idMappings.propertyId = getId(firstTupleProperty, IdType.PROPERTY_ID); + idMappings.propertyId = getId(property, IdType.PROPERTY_ID); } - foundTuple = new KvinTupleMetadata(item.toString(), firstTupleProperty.toString(), + foundTuple = new KvinTupleMetadata(item.toString(), property.toString(), idMappings.itemId, idMappings.propertyId, idMappings.contextId); }