Skip to content

Commit

Permalink
Add custom parquet reader.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Aug 7, 2024
1 parent 1260bd2 commit 0159ed9
Show file tree
Hide file tree
Showing 7 changed files with 708 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@
import io.github.linkedfactory.core.kvin.KvinListener;
import io.github.linkedfactory.core.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.Record;
import io.github.linkedfactory.core.kvin.parquet.records.GroupRecordConverterExt;
import io.github.linkedfactory.core.kvin.parquet.records.SimpleGroupExt;
import io.github.linkedfactory.core.kvin.util.AggregatingIterator;
import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.commons.iterator.NiceIterator;
import net.enilink.commons.iterator.WrappedIterator;
import net.enilink.commons.util.Pair;
import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
Expand All @@ -31,7 +34,10 @@
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.io.api.Binary;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.ReadPrefReadWriteLockManager;
Expand All @@ -46,6 +52,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Timestamp;
Expand All @@ -62,8 +69,8 @@
public class KvinParquet implements Kvin {
static final Logger log = LoggerFactory.getLogger(KvinParquet.class);
static final long[] EMPTY_IDS = {0};
static Comparator<GenericRecord> RECORD_COMPARATOR = (a, b) -> {
int diff = ((ByteBuffer) a.get(0)).compareTo((ByteBuffer) b.get(0));
static Comparator<GroupRecord> GROUP_RECORD_COMPARATOR = (a, b) -> {
int diff = a.id.compareTo(b.id);
if (diff != 0) {
return diff;
}
Expand All @@ -79,12 +86,13 @@ public class KvinParquet implements Kvin {
}
return 0;
};

// used by reader
final Cache<URI, Long> itemIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
final Cache<URI, Long> propertyIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
final Cache<URI, Long> contextIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
// Lock
Map<Path, HadoopInputFile> inputFileCache = new HashMap<>(); // hadoop input file cache
Map<Path, InputFileInfo> inputFileCache = new HashMap<>(); // hadoop input file cache
Cache<Long, URI> propertyIdReverseLookUpCache = CacheBuilder.newBuilder().maximumSize(10000).build();
Cache<java.nio.file.Path, Properties> metaCache = CacheBuilder.newBuilder().maximumSize(10000).build();
Cache<java.nio.file.Path, List<Path>> filesCache = CacheBuilder.newBuilder().maximumSize(10000).build();
Expand Down Expand Up @@ -115,7 +123,7 @@ static boolean anyBetween(long[] values, long min, long max) {

private List<IdMapping> fetchMappingIds(Path mappingFile, FilterPredicate filter) throws IOException {
List<IdMapping> mappings = null;
HadoopInputFile inputFile = getFile(mappingFile);
HadoopInputFile inputFile = getFile(mappingFile).file;
try (ParquetReader<IdMapping> reader = createReader(inputFile, FilterCompat.get(filter))) {
while (true) {
var mapping = reader.read();
Expand All @@ -138,13 +146,8 @@ private void readMaxIds(WriteContext writeContext, java.nio.file.Path metadataPa
for (Map.Entry<String, List<Pair<String, Integer>>> entry : mappingFiles.entrySet()) {
long maxId = 0L;
for (Pair<String, Integer> mappingFile : entry.getValue()) {
HadoopInputFile inputFile = getFile(new Path(metadataPath.resolve(mappingFile.getFirst()).toString()));
ParquetReadOptions readOptions = HadoopReadOptions
.builder(inputFile.getConfiguration(), inputFile.getPath())
.build();

ParquetMetadata meta = ParquetFileReader.readFooter(inputFile, readOptions, inputFile.newStream());
for (BlockMetaData blockMeta : meta.getBlocks()) {
InputFileInfo inputFile = getFile(new Path(metadataPath.resolve(mappingFile.getFirst()).toString()));
for (BlockMetaData blockMeta : inputFile.metadata.getBlocks()) {
for (ColumnChunkMetaData columnMeta : blockMeta.getColumns()) {
if (columnMeta.getPath().equals(idPath)) {
// get max id from statistics
Expand All @@ -167,20 +170,24 @@ private void readMaxIds(WriteContext writeContext, java.nio.file.Path metadataPa
}
}

private HadoopInputFile getFile(Path path) {
HadoopInputFile inputFile;
private InputFileInfo getFile(Path path) {
InputFileInfo inputFileInfo;
synchronized (inputFileCache) {
inputFile = inputFileCache.get(path);
if (inputFile == null) {
inputFileInfo = inputFileCache.get(path);
if (inputFileInfo == null) {
try {
inputFile = HadoopInputFile.fromPath(path, new Configuration());
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(configuration, path);
var options = optionsBuilder.build();
ParquetMetadata metadata = ParquetFileReader.readFooter(inputFile, options, inputFile.newStream());
inputFileInfo = new InputFileInfo(path, inputFile, metadata);
} catch (IOException e) {
throw new RuntimeException(e);
}
inputFileCache.put(path, inputFile);
inputFileCache.put(path, inputFileInfo);
}
}
return inputFile;
return inputFileInfo;
}

Lock writeLock() throws IOException {
Expand Down Expand Up @@ -849,12 +856,81 @@ private <T> ParquetReader<T> createReader(InputFile file, FilterCompat.Filter fi
.build();
}

private ParquetReader<GenericRecord> createGenericReader(InputFile file, FilterCompat.Filter filter) throws IOException {
return AvroParquetReader.<GenericRecord>builder(file)
.withDataModel(GenericData.get())
.useStatsFilter()
.withFilter(filter)
.build();
private IExtendedIterator<GroupRecord> createGroupReader(InputFileInfo fileInfo, FilterCompat.Filter filter) {
try {
ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(configuration, fileInfo.path);
optionsBuilder.withAllocator(new HeapByteBufferAllocator());
optionsBuilder.withRecordFilter(filter);
ParquetReadOptions options = optionsBuilder.build();
ParquetFileReader r = new ParquetFileReader(configuration, fileInfo.path, fileInfo.metadata, options);
return new NiceIterator<>() {
RecordReader recordReader;
Group next;
PageReadStore pages;
long readRows;

@Override
public boolean hasNext() {
if (next == null) {
try {
while (next == null) {
if (pages == null || readRows == pages.getRowCount()) {
readRows = 0;
recordReader = null;
if (pages != null) {
pages.close();
}
pages = r.readNextFilteredRowGroup();
if (pages != null && pages.getRowCount() > 0) {
recordReader = fileInfo.columnIO.getRecordReader(pages,
new GroupRecordConverterExt(fileInfo.metadata.getFileMetaData().getSchema()), filter);
}
}
if (recordReader != null) {
while (readRows < pages.getRowCount()) {
next = (Group) recordReader.read();
readRows++;
if (!recordReader.shouldSkipCurrentRecord()) {
break;
}
}
} else {
r.close();
break;
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return next != null;
}

@Override
public GroupRecord next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
GroupRecord result = new GroupRecord(next, 0);
next = null;
return result;
}

@Override
public void close() {
if (pages != null) {
pages.close();
}
try {
r.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
};
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private IExtendedIterator<KvinTuple> fetchInternal(List<URI> items, List<URI> properties, URI context, Long end, Long begin, Long limit) throws IOException {
Expand Down Expand Up @@ -894,9 +970,9 @@ private IExtendedIterator<KvinTuple> fetchInternal(List<URI> items, List<URI> pr
return NiceIterator.emptyIterator();
}
return new NiceIterator<KvinTuple>() {
final PriorityQueue<Pair<GenericRecord, ParquetReader<GenericRecord>>> nextTuples =
new PriorityQueue<>(Comparator.comparing(Pair::getFirst, RECORD_COMPARATOR));
GenericRecord prevRecord;
final PriorityQueue<Pair<GroupRecord, IExtendedIterator<GroupRecord>>> nextTuples =
new PriorityQueue<>(Comparator.comparing(Pair::getFirst, GROUP_RECORD_COMPARATOR));
GroupRecord prevRecord;
KvinTuple nextTuple;
long propertyValueCount;
int folderIndex = -1;
Expand All @@ -919,6 +995,9 @@ private IExtendedIterator<KvinTuple> fetchInternal(List<URI> items, List<URI> pr

KvinTuple selectNextTuple() throws IOException {
boolean skipAfterLimit = limit != 0 && propertyValueCount >= limit;
if (skipAfterLimit && propertyIds.length == 1) {
return null;
}

KvinTuple tuple = null;
while (tuple == null) {
Expand All @@ -928,11 +1007,11 @@ KvinTuple selectNextTuple() throws IOException {
var min = nextTuples.isEmpty() ? null : nextTuples.poll();
if (min != null) {
// omit duplicates in terms of id, time, and seqNr
boolean isDuplicate = prevRecord != null && RECORD_COMPARATOR.compare(prevRecord, min.getFirst()) == 0;
boolean isDuplicate = prevRecord != null && GROUP_RECORD_COMPARATOR.compare(prevRecord, min.getFirst()) == 0;
if (!isDuplicate) {
if (skipAfterLimit) {
// reset value count if property changes
if (!min.getFirst().get(0).equals(prevRecord.get(0))) {
if (!min.getFirst().id.equals(prevRecord.id)) {
skipAfterLimit = false;
propertyValueCount = 0;
}
Expand All @@ -943,14 +1022,10 @@ KvinTuple selectNextTuple() throws IOException {
propertyValueCount++;
}
}
var record = min.getSecond().read();
if (record != null) {
nextTuples.add(new Pair<>(record, min.getSecond()));
if (min.getSecond().hasNext()) {
nextTuples.add(new Pair<>(min.getSecond().next(), min.getSecond()));
} else {
try {
min.getSecond().close();
} catch (IOException e) {
}
min.getSecond().close();
}
} else {
break;
Expand Down Expand Up @@ -993,10 +1068,7 @@ public void close() {
if (!closed) {
try {
while (!nextTuples.isEmpty()) {
try {
nextTuples.poll().getSecond().close();
} catch (IOException e) {
}
nextTuples.poll().getSecond().close();
}
} finally {
readLock.release();
Expand All @@ -1005,10 +1077,11 @@ public void close() {
}
}

KvinTuple convert(GenericRecord record) throws IOException {
var itemId = ((ByteBuffer) record.get(0)).getLong(0);
KvinTuple convert(GroupRecord record) throws IOException {
var bb = record.id;
var itemId = bb.getLong(bb.position());
// skip item and context ids
var propertyId = ((ByteBuffer) record.get(0)).getLong(Long.BYTES * 2);
var propertyId = bb.getLong(bb.position() + Long.BYTES * 2);
URI item = null;
URI property = null;
for (int i = 0; i < itemIds.length; i++) {
Expand All @@ -1032,16 +1105,11 @@ void nextReaders() throws IOException {
folderIndex++;
List<Path> currentFiles = getDataFiles(dataFolders.get(folderIndex).toString());
for (Path file : currentFiles) {
HadoopInputFile inputFile = getFile(file);
ParquetReader<GenericRecord> reader = createGenericReader(inputFile, FilterCompat.get(filterFinal));
GenericRecord record = reader.read();
if (record != null) {
nextTuples.add(new Pair<>(record, reader));
IExtendedIterator<GroupRecord> reader = createGroupReader(getFile(file), FilterCompat.get(filterFinal));
if (reader.hasNext()) {
nextTuples.add(new Pair<>(reader.next(), reader));
} else {
try {
reader.close();
} catch (IOException e) {
}
reader.close();
}
}
}
Expand Down Expand Up @@ -1153,10 +1221,10 @@ private List<URI> getProperties(long itemId, long contextId) {

for (java.nio.file.Path dataFolder : dataFolders) {
for (Path dataFile : getDataFiles(dataFolder.toString())) {
ParquetReader<GenericRecord> reader = createGenericReader(getFile(dataFile), FilterCompat.get(filter));
GenericRecord record;
while ((record = reader.read()) != null) {
ByteBuffer idBb = (ByteBuffer) record.get(0);
var reader = createGroupReader(getFile(dataFile), FilterCompat.get(filter));
while (reader.hasNext()) {
var record = reader.next();
ByteBuffer idBb = ((Binary) record.get(0)).toByteBuffer();
// skip item id
idBb.getLong();
// skip context id
Expand Down Expand Up @@ -1212,6 +1280,44 @@ enum IdType {
CONTEXT_ID
}

static class GroupRecord {
final ByteBuffer id;
final Group group;
final int row;

GroupRecord(Group group, int row) {
this.group = group;
this.id = group.getBinary(0, 0).toByteBuffer();
this.row = row;
}

public Object get(int i) {
var value = ((SimpleGroupExt) group).getObject(i, row);
var type = group.getType().getType(i);
if (value instanceof Binary) {
var annotation = type.getLogicalTypeAnnotation();
if (annotation != null && "STRING".equals(annotation.toString())) {
return new String(((Binary) value).getBytes(), StandardCharsets.UTF_8);
}
}
return value;
}
}

static class InputFileInfo {
final Path path;
final HadoopInputFile file;
final ParquetMetadata metadata;
final MessageColumnIO columnIO;

InputFileInfo(Path path, HadoopInputFile file, ParquetMetadata metadata) {
this.path = path;
this.file = file;
this.metadata = metadata;
this.columnIO = new ColumnIOFactory().getColumnIO(metadata.getFileMetaData().getSchema());
}
}

static class WriterState {
java.nio.file.Path file;
ParquetWriter<KvinTupleInternal> writer;
Expand Down
Loading

0 comments on commit 0159ed9

Please sign in to comment.