Skip to content

Commit

Permalink
Improve reading performance further.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jul 19, 2024
1 parent 1b0976e commit 700dd54
Showing 1 changed file with 75 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.ReadPrefReadWriteLockManager;
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
import org.eclipse.rdf4j.query.algebra.In;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,7 +62,23 @@

public class KvinParquet implements Kvin {
static final Logger log = LoggerFactory.getLogger(KvinParquet.class);

static Comparator<GenericRecord> RECORD_COMPARATOR = (a, b) -> {
int diff = ((ByteBuffer) a.get(0)).compareTo((ByteBuffer) b.get(0));
if (diff != 0) {
return diff;
}
diff = ((Comparable<Long>) a.get(1)).compareTo((Long) b.get(1));
if (diff != 0) {
// time is reverse
return -diff;
}
diff = ((Comparable<Integer>) a.get(2)).compareTo((Integer) b.get(2));
if (diff != 0) {
// seqNr is reverse
return -diff;
}
return 0;
};
// used by reader
final Cache<URI, Long> itemIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
final Cache<URI, Long> propertyIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
Expand All @@ -71,7 +88,6 @@ public class KvinParquet implements Kvin {
Cache<Long, URI> propertyIdReverseLookUpCache = CacheBuilder.newBuilder().maximumSize(10000).build();
Cache<java.nio.file.Path, Properties> metaCache = CacheBuilder.newBuilder().maximumSize(10000).build();
String archiveLocation;

ReadWriteLockManager lockManager = new ReadPrefReadWriteLockManager(true, 5000);

public KvinParquet(String archiveLocation) {
Expand Down Expand Up @@ -614,6 +630,7 @@ protected KvinTuple createElement(URI item, URI property, URI context, long time
}

public URI getProperty(ByteBuffer idBuffer) throws IOException {
// skip item id
idBuffer.getLong();
Long propertyId = idBuffer.getLong();
URI cachedProperty = propertyIdReverseLookUpCache.getIfPresent(propertyId);
Expand Down Expand Up @@ -655,24 +672,6 @@ private ParquetReader<GenericRecord> createGenericReader(InputFile file, FilterC
.build();
}

static Comparator<KvinTuple> TUPLE_COMPARATOR = (a, b) -> {
int diff = a.property.equals(b.property) ? 0 : a.toString().compareTo(b.toString());
if (diff != 0) {
return diff;
}
diff = Long.compare(a.time, b.time);
if (diff != 0) {
// time is reverse
return -diff;
}
diff = Integer.compare(a.seqNr, b.seqNr);
if (diff != 0) {
// seqNr is reverse
return -diff;
}
return 0;
};

private synchronized IExtendedIterator<KvinTuple> fetchInternal(URI item, URI property, URI context, Long end, Long begin, Long limit) throws IOException {
Lock readLock = readLock();
try {
Expand Down Expand Up @@ -702,9 +701,9 @@ private synchronized IExtendedIterator<KvinTuple> fetchInternal(URI item, URI pr
return NiceIterator.emptyIterator();
}
return new NiceIterator<KvinTuple>() {
final PriorityQueue<Pair<KvinTuple, ParquetReader<GenericRecord>>> nextTuples =
new PriorityQueue<>(Comparator.comparing(Pair::getFirst, TUPLE_COMPARATOR));
KvinTuple prevTuple, nextTuple;
final PriorityQueue<Pair<GenericRecord, ParquetReader<GenericRecord>>> nextTuples =
new PriorityQueue<>(Comparator.comparing(Pair::getFirst, RECORD_COMPARATOR));
GenericRecord prevRecord, nextRecord;
long propertyValueCount;
int folderIndex = -1;
boolean closed;
Expand All @@ -724,20 +723,19 @@ private synchronized IExtendedIterator<KvinTuple> fetchInternal(URI item, URI pr
}
}

KvinTuple selectNextTuple() throws IOException {
GenericRecord selectNextRecord() throws IOException {
while (true) {
while (nextTuples.isEmpty() && folderIndex < dataFolders.size() - 1) {
nextReaders();
}
var min = nextTuples.isEmpty() ? null : nextTuples.poll();
if (min != null) {
// omit duplicates in terms of id, time, and seqNr
boolean isDuplicate = prevTuple != null && TUPLE_COMPARATOR.compare(prevTuple, min.getFirst()) == 0;
boolean isDuplicate = prevRecord != null && RECORD_COMPARATOR.compare(prevRecord, min.getFirst()) == 0;

var record = min.getSecond().read();
if (record != null) {
KvinTuple tuple = convert(record);
nextTuples.add(new Pair<>(tuple, min.getSecond()));
nextTuples.add(new Pair<>(record, min.getSecond()));
} else {
try {
min.getSecond().close();
Expand All @@ -756,27 +754,27 @@ var record = min.getSecond().read();

@Override
public boolean hasNext() {
if (nextTuple != null) {
if (nextRecord != null) {
return true;
}
try {
// skipping properties if limit is reached
if (limit != 0 && propertyValueCount >= limit) {
while ((nextTuple = selectNextTuple()) != null) {
if (!nextTuple.property.equals(prevTuple.property)) {
while ((nextRecord = selectNextRecord()) != null) {
if (!nextRecord.get(0).equals(prevRecord.get(0))) {
propertyValueCount = 0;
break;
}
}
}
if (nextTuple == null) {
nextTuple = selectNextTuple();
if (nextRecord == null) {
nextRecord = selectNextRecord();
}
propertyValueCount++;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if (nextTuple != null) {
if (nextRecord != null) {
return true;
} else {
close();
Expand All @@ -789,9 +787,14 @@ public KvinTuple next() {
if (!hasNext()) {
throw new NoSuchElementException();
} else {
KvinTuple tuple = nextTuple;
prevTuple = tuple;
nextTuple = null;
KvinTuple tuple;
try {
tuple = convert(nextRecord);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
prevRecord = nextRecord;
nextRecord = null;
return tuple;
}
}
Expand Down Expand Up @@ -826,7 +829,7 @@ void nextReaders() throws IOException {
ParquetReader<GenericRecord> reader = createGenericReader(inputFile, FilterCompat.get(filterFinal));
GenericRecord record = reader.read();
if (record != null) {
nextTuples.add(new Pair<>(convert(record), reader));
nextTuples.add(new Pair<>(record, reader));
} else {
try {
reader.close();
Expand Down Expand Up @@ -883,33 +886,33 @@ private List<java.nio.file.Path> getDataFolders(IdMappings idMappings) throws IO
throw new IOException(e);
}
return meta.entrySet().stream().flatMap(entry -> {
String idRange = (String) entry.getValue();
long[] minMax = splitRange(idRange);
if (minMax != null && itemId >= minMax[0] && itemId <= minMax[1]) {
java.nio.file.Path yearFolder = Paths.get(archiveLocation, entry.getKey().toString());
java.nio.file.Path yearMetaPath = yearFolder.resolve("meta.properties");
try {
Properties yearMeta = metaCache.get(yearMetaPath, () -> {
Properties p = new Properties();
if (Files.exists(yearMetaPath)) {
p.load(Files.newInputStream(yearMetaPath));
String idRange = (String) entry.getValue();
long[] minMax = splitRange(idRange);
if (minMax != null && itemId >= minMax[0] && itemId <= minMax[1]) {
java.nio.file.Path yearFolder = Paths.get(archiveLocation, entry.getKey().toString());
java.nio.file.Path yearMetaPath = yearFolder.resolve("meta.properties");
try {
Properties yearMeta = metaCache.get(yearMetaPath, () -> {
Properties p = new Properties();
if (Files.exists(yearMetaPath)) {
p.load(Files.newInputStream(yearMetaPath));
}
return p;
});
return yearMeta.entrySet().stream().filter(weekEntry -> {
String weekIdRange = (String) weekEntry.getValue();
long[] weekMinMax = splitRange(weekIdRange);
return weekMinMax != null && itemId >= weekMinMax[0] && itemId <= weekMinMax[1];
}).map(weekEntry -> yearFolder.resolve(weekEntry.getKey().toString()));
} catch (Exception e) {
log.error("Error while loading meta data", e);
}
return p;
});
return yearMeta.entrySet().stream().filter(weekEntry -> {
String weekIdRange = (String) weekEntry.getValue();
long[] weekMinMax = splitRange(weekIdRange);
return weekMinMax != null && itemId >= weekMinMax[0] && itemId <= weekMinMax[1];
}).map(weekEntry -> yearFolder.resolve(weekEntry.getKey().toString()));
} catch (Exception e) {
log.error("Error while loading meta data", e);
}
}
return Stream.empty();
})
// sort by year and month descending (recent data first)
.sorted(Comparator.reverseOrder())
.collect(Collectors.toList());
}
return Stream.empty();
})
// sort by year and month descending (recent data first)
.sorted(Comparator.reverseOrder())
.collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -947,24 +950,24 @@ private KvinTupleMetadata getFirstTuple(URI item, Long itemId, Long propertyId,
FilterPredicate filter = generateFetchFilter(idMappings);
List<java.nio.file.Path> dataFolders = getDataFolders(idMappings);

KvinTupleInternal firstTuple = null;
GenericRecord firstRecord = null;
for (java.nio.file.Path dataFolder : dataFolders) {
for (Path dataFile : getDataFiles(dataFolder.toString())) {
ParquetReader<KvinTupleInternal> reader = createReader(getFile(dataFile), FilterCompat.get(filter));
KvinTupleInternal tuple = reader.read();
if (firstTuple == null || tuple != null && firstTuple != null && tuple.compareTo(firstTuple) < 0) {
firstTuple = tuple;
ParquetReader<GenericRecord> reader = createGenericReader(getFile(dataFile), FilterCompat.get(filter));
GenericRecord record = reader.read();
if (firstRecord == null || record != null && firstRecord != null && RECORD_COMPARATOR.compare(record, firstRecord) < 0) {
firstRecord = record;
}
reader.close();
}
}

if (firstTuple != null) {
URI firstTupleProperty = getProperty(ByteBuffer.wrap(firstTuple.id));
if (firstRecord != null) {
URI property = getProperty((ByteBuffer) firstRecord.get(0));
if (itemId == null) {
idMappings.propertyId = getId(firstTupleProperty, IdType.PROPERTY_ID);
idMappings.propertyId = getId(property, IdType.PROPERTY_ID);
}
foundTuple = new KvinTupleMetadata(item.toString(), firstTupleProperty.toString(),
foundTuple = new KvinTupleMetadata(item.toString(), property.toString(),
idMappings.itemId, idMappings.propertyId, idMappings.contextId);
}

Expand Down

0 comments on commit 700dd54

Please sign in to comment.