From 5a28c88d08b19b8158d8b16d447d81d688f9ee36 Mon Sep 17 00:00:00 2001 From: Ken Wenzel Date: Thu, 5 Sep 2024 13:13:54 +0200 Subject: [PATCH] Use separate columns for ids. --- .../core/kvin/parquet/Compactor.java | 2 +- .../core/kvin/parquet/KvinParquet.java | 42 +++--------- .../core/kvin/parquet/ParquetHelpers.java | 33 +++++----- .../parquet/records/KvinRecordConverter.java | 45 +++++++++---- .../records/KvinRecordWriteSupport.java | 66 +++++++++++-------- 5 files changed, 98 insertions(+), 90 deletions(-) diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java index b7719434..a83d15e9 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java @@ -26,7 +26,7 @@ public class Compactor { final KvinParquet kvinParquet; final File compactionFolder; String archiveLocation; - int dataFileCompactionTrigger = 2, mappingFileCompactionTrigger = 3; + int dataFileCompactionTrigger = 3, mappingFileCompactionTrigger = 3; public Compactor(KvinParquet kvinParquet) { this.archiveLocation = kvinParquet.archiveLocation; diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java index 5ec5f704..f0e964aa 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java @@ -764,26 +764,14 @@ private FilterPredicate generateFetchFilter(long[] itemIds, long[] propertyIds, private FilterPredicate createIdFilter(long itemId, long propertyId, long contextId) { if (itemId != 0L && propertyId != 0L && contextId != 0L) { - ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 3); - keyBuffer.putLong(itemId); - keyBuffer.putLong(contextId); - keyBuffer.putLong(propertyId); - return eq(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())); + return and(eq(FilterApi.longColumn("propertyId"), propertyId), + and(eq(FilterApi.longColumn("itemId"), itemId), + eq(FilterApi.longColumn("contextId"), contextId))); } else if (contextId != 0L) { - ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 2); - keyBuffer.putLong(itemId); - keyBuffer.putLong(contextId); - return and(gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())), - lt(FilterApi.binaryColumn("id"), - Binary.fromConstantByteArray(ByteBuffer.allocate(Long.BYTES * 2) - .putLong(itemId).putLong(contextId + 1).array()))); + return and(eq(FilterApi.longColumn("itemId"), itemId), + eq(FilterApi.longColumn("contextId"), contextId)); } else { - ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES); - keyBuffer.putLong(itemId); - return and(gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())), - lt(FilterApi.binaryColumn("id"), - Binary.fromConstantByteArray(ByteBuffer.allocate(Long.BYTES) - .putLong(itemId + 1).array()))); + return eq(FilterApi.longColumn("itemId"), itemId); } } @@ -881,12 +869,8 @@ public ColumnIndexStore getColumnIndexStore(int blockIndex) { return indexCache.get(new Pair<>(fileInfo.path, block.getOrdinal()), () -> { Map columnIndexes = new HashMap<>(); Map offsetIndexes = new HashMap<>(); - int i = 0; for (ColumnChunkMetaData columnChunkMetaData : block.getColumns()) { - if (i++ < kvinTupleFirstField) { - // do not load indexes for value fields - columnIndexes.put(columnChunkMetaData.getPath(), readColumnIndex(columnChunkMetaData)); - } + columnIndexes.put(columnChunkMetaData.getPath(), readColumnIndex(columnChunkMetaData)); offsetIndexes.put(columnChunkMetaData.getPath(), readOffsetIndex(columnChunkMetaData)); } return new ColumnIndexStore() { @@ -1306,16 +1290,8 @@ public IExtendedIterator descendants(URI item, URI context, long limit) { private List getProperties(long itemId, long contextId) { try { - ByteBuffer lowKey = ByteBuffer.allocate(Long.BYTES * 2); - lowKey.putLong(itemId); - lowKey.putLong(contextId); - ByteBuffer highKey = ByteBuffer.allocate(Long.BYTES * 3); - highKey.putLong(itemId); - highKey.putLong(contextId); - highKey.putLong(Long.MAX_VALUE); - FilterPredicate filter = and(eq(FilterApi.booleanColumn("first"), true), and( - gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(lowKey.array())), - lt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(highKey.array())))); + FilterPredicate filter = and(eq(FilterApi.booleanColumn("first"), true), + createIdFilter(itemId, 0L, contextId)); List dataFolders = getDataFolders(new long[]{itemId}); Set propertyIds = new LinkedHashSet<>(); diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/ParquetHelpers.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/ParquetHelpers.java index a0683cb0..80492a28 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/ParquetHelpers.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/ParquetHelpers.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.*; import java.util.regex.Matcher; @@ -54,17 +55,17 @@ public class ParquetHelpers { static final int PAGE_SIZE = 8192; // 8 KB static final int DICT_PAGE_SIZE = 1048576; // 1 MB static final int ZSTD_COMPRESSION_LEVEL = 12; // 1 - 22 - - // mapping file schema - static Schema idMappingSchema = SchemaBuilder.record("SimpleMapping").namespace(IdMapping.class.getPackageName()).fields() - .name("id").type().longType().noDefault() - .name("value").type().stringType().noDefault().endRecord(); - public static MessageType kvinTupleType = new MessageType("KvinTupleInternal", - new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "id"), + // new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "id"), + + new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "itemId"), + new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "contextId"), + new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "propertyId"), new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "time"), new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "seqNr"), - new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BOOLEAN, "first"), + + new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BOOLEAN, "first"), + new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "valueInt"), new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT64, "valueLong"), new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.FLOAT, "valueFloat"), @@ -73,8 +74,10 @@ public class ParquetHelpers { new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BOOLEAN, "valueBool"), new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BINARY, "valueObject") ); - - static int kvinTupleFirstField = 4; + // mapping file schema + static Schema idMappingSchema = SchemaBuilder.record("SimpleMapping").namespace(IdMapping.class.getPackageName()).fields() + .name("id").type().longType().noDefault() + .name("value").type().stringType().noDefault().endRecord(); static Pattern fileWithSeqNr = Pattern.compile("^([^.].*)__([0-9]+)\\..*$"); static Pattern fileOrDotFileWithSeqNr = Pattern.compile("^\\.?([^.].*)__([0-9]+)\\..*$"); @@ -89,7 +92,6 @@ static ParquetWriter getKvinRecordWriter(Path dataFile) throws IOExc .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withConf(configuration) .withDictionaryEncoding(true) - .withDictionaryEncoding("id", false) .withDictionaryEncoding("valueObject", false) //.withCompressionCodec(CompressionCodecName.ZSTD) .withCompressionCodec(CompressionCodecName.SNAPPY) @@ -129,8 +131,9 @@ public boolean hasNext() { } pages = r.readNextFilteredRowGroup(); if (pages != null && pages.getRowCount() > 0) { - recordReader = columnIO.getRecordReader(pages, - new KvinRecordConverter(), filter); + recordReader = filter == null ? columnIO.getRecordReader(pages, + new KvinRecordConverter()) : columnIO.getRecordReader(pages, + new KvinRecordConverter()); } } if (recordReader != null) { @@ -246,8 +249,8 @@ public static void deleteMappingFiles(java.nio.file.Path folder, Set typ public static KvinTuple recordToTuple(URI item, URI property, URI context, KvinRecord record) throws IOException { Object value = record.value; if (value != null) { - if (value instanceof Binary) { - value = decodeRecord(((Binary) value).toByteBuffer()); + if (value instanceof ByteBuffer) { + value = decodeRecord((ByteBuffer) value); } } return new KvinTuple(item, property, context, record.time, record.seqNr, value); diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/records/KvinRecordConverter.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/records/KvinRecordConverter.java index 9681c75c..9debe148 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/records/KvinRecordConverter.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/records/KvinRecordConverter.java @@ -11,10 +11,13 @@ public class KvinRecordConverter extends RecordMaterializer { @Override public Converter getConverter(int fieldIndex) { switch (fieldIndex) { - case 0: return idConverter; - case 1: return timeConverter; - case 2: return seqNrConverter; - case 8: return stringValueConverter; + case 0: return itemIdConverter; + case 1: return contextIdConverter; + case 2: return propertyIdConverter; + case 3: return timeConverter; + case 4: return seqNrConverter; + case 5: return firstConverter; + case 10: return stringValueConverter; default: return valueConverter; } } @@ -29,6 +32,13 @@ public void end() { } }; + private final PrimitiveConverter firstConverter = new PrimitiveConverter() { + @Override + public void addBoolean(boolean value) { + // ignore first value + } + }; + private final PrimitiveConverter timeConverter = new PrimitiveConverter() { @Override public void addLong(long value) { @@ -43,15 +53,24 @@ public void addInt(int value) { } }; - private final PrimitiveConverter idConverter = new PrimitiveConverter() { + private final PrimitiveConverter itemIdConverter = new PrimitiveConverter() { @Override - public void addBinary(Binary value) { - var bb = value.toByteBuffer(); - currentRecord.itemId = bb.getLong(bb.position()); - // skip item id - currentRecord.contextId = bb.getLong(bb.position() + Long.BYTES); - // skip item and context ids - currentRecord.propertyId = bb.getLong(bb.position() + Long.BYTES * 2); + public void addLong(long value) { + currentRecord.itemId = value; + } + }; + + private final PrimitiveConverter contextIdConverter = new PrimitiveConverter() { + @Override + public void addLong(long value) { + currentRecord.contextId = value; + } + }; + + private final PrimitiveConverter propertyIdConverter = new PrimitiveConverter() { + @Override + public void addLong(long value) { + currentRecord.propertyId = value; } }; @@ -69,7 +88,7 @@ void addObject(Object value) { @Override public void addBinary(Binary value) { - addObject(value); + addObject(value.toByteBuffer()); } @Override diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/records/KvinRecordWriteSupport.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/records/KvinRecordWriteSupport.java index 10d41394..d99a85f3 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/records/KvinRecordWriteSupport.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/records/KvinRecordWriteSupport.java @@ -38,58 +38,68 @@ public void prepareForWrite(RecordConsumer recordConsumer) { @Override public void write(KvinRecord r) { rc.startMessage(); - rc.startField("id", 0); - ByteBuffer idBb = ByteBuffer.allocate(Long.BYTES * 3); - idBb.putLong(r.itemId).putLong(r.contextId).putLong(r.propertyId); - rc.addBinary(Binary.fromConstantByteBuffer(idBb.flip())); - rc.endField("id", 0); + rc.startField("itemId", 0); + rc.addLong(r.itemId); + rc.endField("itemId", 0); - rc.startField("time", 1); + rc.startField("contextId", 1); + rc.addLong(r.contextId); + rc.endField("contextId", 1); + + rc.startField("propertyId", 2); + rc.addLong(r.propertyId); + rc.endField("propertyId", 2); + + rc.startField("time", 3); rc.addLong(r.time); - rc.endField("time", 1); + rc.endField("time", 3); - rc.startField("seqNr", 2); + rc.startField("seqNr", 4); rc.addInteger(r.seqNr); - rc.endField("seqNr", 2); + rc.endField("seqNr", 4); boolean first = prevRecord == null || prevRecord.itemId != r.itemId || prevRecord.contextId != r.contextId || prevRecord.propertyId != r.propertyId; - if (first) { - rc.startField("first", 3); - rc.addBoolean(true); - rc.endField("first", 3); - } + rc.startField("first", 5); + rc.addBoolean(first); + rc.endField("first", 5); Object value = r.value; if (value instanceof Integer) { - rc.startField("valueInt", 4); + rc.startField("valueInt", 6); rc.addInteger((Integer) value); - rc.endField("valueInt", 4); + rc.endField("valueInt", 6); } else if (value instanceof Long) { - rc.startField("valueLong", 5); + rc.startField("valueLong", 7); rc.addLong((Long) value); - rc.endField("valueLong", 5); + rc.endField("valueLong", 7); } else if (value instanceof Float) { - rc.startField("valueFloat", 6); + rc.startField("valueFloat", 8); rc.addFloat((Float) value); - rc.endField("valueFloat", 6); + rc.endField("valueFloat", 8); } else if (value instanceof Double) { - rc.startField("valueDouble", 7); + rc.startField("valueDouble", 9); rc.addDouble((Double) value); - rc.endField("valueDouble", 7); + rc.endField("valueDouble", 9); } else if (value instanceof String) { - rc.startField("valueString", 8); + rc.startField("valueString", 10); rc.addBinary(Binary.fromString((String) value)); - rc.endField("valueString", 8); + rc.endField("valueString", 10); } else if (value instanceof Boolean) { - rc.startField("valueBool", 9); + rc.startField("valueBool", 11); rc.addBoolean((Boolean) value); - rc.endField("valueBool", 9); + rc.endField("valueBool", 11); } else if (value instanceof ByteBuffer) { - rc.startField("valueObject", 10); + rc.startField("valueObject", 12); rc.addBinary(Binary.fromConstantByteBuffer((ByteBuffer) value)); - rc.endField("valueObject", 10); + rc.endField("valueObject", 12); + } else if (value instanceof byte[]) { + rc.startField("valueObject", 12); + rc.addBinary(Binary.fromConstantByteArray((byte[]) value)); + rc.endField("valueObject", 12); } rc.endMessage(); + // store previous record + prevRecord = r; } }