Skip to content

Commit

Permalink
Cache data files.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jul 19, 2024
1 parent 700dd54 commit 9893659
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import net.enilink.commons.util.Pair;
import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
Expand All @@ -36,7 +36,6 @@
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.ReadPrefReadWriteLockManager;
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
import org.eclipse.rdf4j.query.algebra.In;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,7 +55,6 @@
import java.util.stream.Stream;

import static io.github.linkedfactory.core.kvin.parquet.ParquetHelpers.*;
import static io.github.linkedfactory.core.kvin.parquet.Records.decodeRecord;
import static io.github.linkedfactory.core.kvin.parquet.Records.encodeRecord;
import static org.apache.parquet.filter2.predicate.FilterApi.*;

Expand Down Expand Up @@ -87,6 +85,7 @@ public class KvinParquet implements Kvin {
Map<Path, HadoopInputFile> inputFileCache = new HashMap<>(); // hadoop input file cache
Cache<Long, URI> propertyIdReverseLookUpCache = CacheBuilder.newBuilder().maximumSize(10000).build();
Cache<java.nio.file.Path, Properties> metaCache = CacheBuilder.newBuilder().maximumSize(10000).build();
Cache<java.nio.file.Path, List<Path>> filesCache = CacheBuilder.newBuilder().maximumSize(10000).build();
String archiveLocation;
ReadWriteLockManager lockManager = new ReadPrefReadWriteLockManager(true, 5000);

Expand Down Expand Up @@ -423,6 +422,7 @@ private synchronized void putInternal(Iterable<KvinTuple> tuples) throws IOExcep

// clear cache with meta data
metaCache.invalidateAll();
filesCache.invalidateAll();

// invalidate id caches - TODO could be improved by directly updating the caches
itemIdCache.invalidateAll();
Expand Down Expand Up @@ -864,10 +864,14 @@ private long[] splitRange(String range) {
}

private List<Path> getDataFiles(String path) throws IOException {
return Files.walk(Paths.get(path), 1).skip(1)
.filter(p -> p.getFileName().toString().startsWith("data__"))
.map(p -> new Path(p.toString()))
.collect(Collectors.toList());
try {
return filesCache.get(Paths.get(path), () -> Files.walk(Paths.get(path), 1).skip(1)
.filter(p -> p.getFileName().toString().startsWith("data__"))
.map(p -> new Path(p.toString()))
.collect(Collectors.toList()));
} catch (ExecutionException e) {
throw new IOException(e);
}
}

private List<java.nio.file.Path> getDataFolders(IdMappings idMappings) throws IOException {
Expand Down

0 comments on commit 9893659

Please sign in to comment.