Skip to content

Commit

Permalink
Use separate columns for ids.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Sep 5, 2024
1 parent b767220 commit 5a28c88
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class Compactor {
final KvinParquet kvinParquet;
final File compactionFolder;
String archiveLocation;
int dataFileCompactionTrigger = 2, mappingFileCompactionTrigger = 3;
int dataFileCompactionTrigger = 3, mappingFileCompactionTrigger = 3;

public Compactor(KvinParquet kvinParquet) {
this.archiveLocation = kvinParquet.archiveLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,26 +764,14 @@ private FilterPredicate generateFetchFilter(long[] itemIds, long[] propertyIds,

private FilterPredicate createIdFilter(long itemId, long propertyId, long contextId) {
if (itemId != 0L && propertyId != 0L && contextId != 0L) {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 3);
keyBuffer.putLong(itemId);
keyBuffer.putLong(contextId);
keyBuffer.putLong(propertyId);
return eq(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array()));
return and(eq(FilterApi.longColumn("propertyId"), propertyId),
and(eq(FilterApi.longColumn("itemId"), itemId),
eq(FilterApi.longColumn("contextId"), contextId)));
} else if (contextId != 0L) {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 2);
keyBuffer.putLong(itemId);
keyBuffer.putLong(contextId);
return and(gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())),
lt(FilterApi.binaryColumn("id"),
Binary.fromConstantByteArray(ByteBuffer.allocate(Long.BYTES * 2)
.putLong(itemId).putLong(contextId + 1).array())));
return and(eq(FilterApi.longColumn("itemId"), itemId),
eq(FilterApi.longColumn("contextId"), contextId));
} else {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES);
keyBuffer.putLong(itemId);
return and(gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())),
lt(FilterApi.binaryColumn("id"),
Binary.fromConstantByteArray(ByteBuffer.allocate(Long.BYTES)
.putLong(itemId + 1).array())));
return eq(FilterApi.longColumn("itemId"), itemId);
}
}

Expand Down Expand Up @@ -881,12 +869,8 @@ public ColumnIndexStore getColumnIndexStore(int blockIndex) {
return indexCache.get(new Pair<>(fileInfo.path, block.getOrdinal()), () -> {
Map<ColumnPath, ColumnIndex> columnIndexes = new HashMap<>();
Map<ColumnPath, OffsetIndex> offsetIndexes = new HashMap<>();
int i = 0;
for (ColumnChunkMetaData columnChunkMetaData : block.getColumns()) {
if (i++ < kvinTupleFirstField) {
// do not load indexes for value fields
columnIndexes.put(columnChunkMetaData.getPath(), readColumnIndex(columnChunkMetaData));
}
columnIndexes.put(columnChunkMetaData.getPath(), readColumnIndex(columnChunkMetaData));
offsetIndexes.put(columnChunkMetaData.getPath(), readOffsetIndex(columnChunkMetaData));
}
return new ColumnIndexStore() {
Expand Down Expand Up @@ -1306,16 +1290,8 @@ public IExtendedIterator<URI> descendants(URI item, URI context, long limit) {

private List<URI> getProperties(long itemId, long contextId) {
try {
ByteBuffer lowKey = ByteBuffer.allocate(Long.BYTES * 2);
lowKey.putLong(itemId);
lowKey.putLong(contextId);
ByteBuffer highKey = ByteBuffer.allocate(Long.BYTES * 3);
highKey.putLong(itemId);
highKey.putLong(contextId);
highKey.putLong(Long.MAX_VALUE);
FilterPredicate filter = and(eq(FilterApi.booleanColumn("first"), true), and(
gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(lowKey.array())),
lt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(highKey.array()))));
FilterPredicate filter = and(eq(FilterApi.booleanColumn("first"), true),
createIdFilter(itemId, 0L, contextId));

List<java.nio.file.Path> dataFolders = getDataFolders(new long[]{itemId});
Set<Long> propertyIds = new LinkedHashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.*;
import java.util.regex.Matcher;
Expand All @@ -54,17 +55,17 @@ public class ParquetHelpers {
static final int PAGE_SIZE = 8192; // 8 KB
static final int DICT_PAGE_SIZE = 1048576; // 1 MB
static final int ZSTD_COMPRESSION_LEVEL = 12; // 1 - 22

// mapping file schema
static Schema idMappingSchema = SchemaBuilder.record("SimpleMapping").namespace(IdMapping.class.getPackageName()).fields()
.name("id").type().longType().noDefault()
.name("value").type().stringType().noDefault().endRecord();

public static MessageType kvinTupleType = new MessageType("KvinTupleInternal",
new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "id"),
// new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "id"),

new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "itemId"),
new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "contextId"),
new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "propertyId"),
new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "time"),
new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "seqNr"),
new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BOOLEAN, "first"),

new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BOOLEAN, "first"),

new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "valueInt"),
new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT64, "valueLong"),
new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.FLOAT, "valueFloat"),
Expand All @@ -73,8 +74,10 @@ public class ParquetHelpers {
new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BOOLEAN, "valueBool"),
new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.BINARY, "valueObject")
);

static int kvinTupleFirstField = 4;
// mapping file schema
static Schema idMappingSchema = SchemaBuilder.record("SimpleMapping").namespace(IdMapping.class.getPackageName()).fields()
.name("id").type().longType().noDefault()
.name("value").type().stringType().noDefault().endRecord();

static Pattern fileWithSeqNr = Pattern.compile("^([^.].*)__([0-9]+)\\..*$");
static Pattern fileOrDotFileWithSeqNr = Pattern.compile("^\\.?([^.].*)__([0-9]+)\\..*$");
Expand All @@ -89,7 +92,6 @@ static ParquetWriter<KvinRecord> getKvinRecordWriter(Path dataFile) throws IOExc
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withConf(configuration)
.withDictionaryEncoding(true)
.withDictionaryEncoding("id", false)
.withDictionaryEncoding("valueObject", false)
//.withCompressionCodec(CompressionCodecName.ZSTD)
.withCompressionCodec(CompressionCodecName.SNAPPY)
Expand Down Expand Up @@ -129,8 +131,9 @@ public boolean hasNext() {
}
pages = r.readNextFilteredRowGroup();
if (pages != null && pages.getRowCount() > 0) {
recordReader = columnIO.getRecordReader(pages,
new KvinRecordConverter(), filter);
recordReader = filter == null ? columnIO.getRecordReader(pages,
new KvinRecordConverter()) : columnIO.getRecordReader(pages,
new KvinRecordConverter());
}
}
if (recordReader != null) {
Expand Down Expand Up @@ -246,8 +249,8 @@ public static void deleteMappingFiles(java.nio.file.Path folder, Set<String> typ
public static KvinTuple recordToTuple(URI item, URI property, URI context, KvinRecord record) throws IOException {
Object value = record.value;
if (value != null) {
if (value instanceof Binary) {
value = decodeRecord(((Binary) value).toByteBuffer());
if (value instanceof ByteBuffer) {
value = decodeRecord((ByteBuffer) value);
}
}
return new KvinTuple(item, property, context, record.time, record.seqNr, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ public class KvinRecordConverter extends RecordMaterializer<KvinRecord> {
@Override
public Converter getConverter(int fieldIndex) {
switch (fieldIndex) {
case 0: return idConverter;
case 1: return timeConverter;
case 2: return seqNrConverter;
case 8: return stringValueConverter;
case 0: return itemIdConverter;
case 1: return contextIdConverter;
case 2: return propertyIdConverter;
case 3: return timeConverter;
case 4: return seqNrConverter;
case 5: return firstConverter;
case 10: return stringValueConverter;
default: return valueConverter;
}
}
Expand All @@ -29,6 +32,13 @@ public void end() {
}
};

private final PrimitiveConverter firstConverter = new PrimitiveConverter() {
@Override
public void addBoolean(boolean value) {
// ignore first value
}
};

private final PrimitiveConverter timeConverter = new PrimitiveConverter() {
@Override
public void addLong(long value) {
Expand All @@ -43,15 +53,24 @@ public void addInt(int value) {
}
};

private final PrimitiveConverter idConverter = new PrimitiveConverter() {
private final PrimitiveConverter itemIdConverter = new PrimitiveConverter() {
@Override
public void addBinary(Binary value) {
var bb = value.toByteBuffer();
currentRecord.itemId = bb.getLong(bb.position());
// skip item id
currentRecord.contextId = bb.getLong(bb.position() + Long.BYTES);
// skip item and context ids
currentRecord.propertyId = bb.getLong(bb.position() + Long.BYTES * 2);
public void addLong(long value) {
currentRecord.itemId = value;
}
};

private final PrimitiveConverter contextIdConverter = new PrimitiveConverter() {
@Override
public void addLong(long value) {
currentRecord.contextId = value;
}
};

private final PrimitiveConverter propertyIdConverter = new PrimitiveConverter() {
@Override
public void addLong(long value) {
currentRecord.propertyId = value;
}
};

Expand All @@ -69,7 +88,7 @@ void addObject(Object value) {

@Override
public void addBinary(Binary value) {
addObject(value);
addObject(value.toByteBuffer());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,58 +38,68 @@ public void prepareForWrite(RecordConsumer recordConsumer) {
@Override
public void write(KvinRecord r) {
rc.startMessage();
rc.startField("id", 0);
ByteBuffer idBb = ByteBuffer.allocate(Long.BYTES * 3);
idBb.putLong(r.itemId).putLong(r.contextId).putLong(r.propertyId);
rc.addBinary(Binary.fromConstantByteBuffer(idBb.flip()));
rc.endField("id", 0);
rc.startField("itemId", 0);
rc.addLong(r.itemId);
rc.endField("itemId", 0);

rc.startField("time", 1);
rc.startField("contextId", 1);
rc.addLong(r.contextId);
rc.endField("contextId", 1);

rc.startField("propertyId", 2);
rc.addLong(r.propertyId);
rc.endField("propertyId", 2);

rc.startField("time", 3);
rc.addLong(r.time);
rc.endField("time", 1);
rc.endField("time", 3);

rc.startField("seqNr", 2);
rc.startField("seqNr", 4);
rc.addInteger(r.seqNr);
rc.endField("seqNr", 2);
rc.endField("seqNr", 4);

boolean first = prevRecord == null || prevRecord.itemId != r.itemId || prevRecord.contextId != r.contextId ||
prevRecord.propertyId != r.propertyId;
if (first) {
rc.startField("first", 3);
rc.addBoolean(true);
rc.endField("first", 3);
}
rc.startField("first", 5);
rc.addBoolean(first);
rc.endField("first", 5);

Object value = r.value;
if (value instanceof Integer) {
rc.startField("valueInt", 4);
rc.startField("valueInt", 6);
rc.addInteger((Integer) value);
rc.endField("valueInt", 4);
rc.endField("valueInt", 6);
} else if (value instanceof Long) {
rc.startField("valueLong", 5);
rc.startField("valueLong", 7);
rc.addLong((Long) value);
rc.endField("valueLong", 5);
rc.endField("valueLong", 7);
} else if (value instanceof Float) {
rc.startField("valueFloat", 6);
rc.startField("valueFloat", 8);
rc.addFloat((Float) value);
rc.endField("valueFloat", 6);
rc.endField("valueFloat", 8);
} else if (value instanceof Double) {
rc.startField("valueDouble", 7);
rc.startField("valueDouble", 9);
rc.addDouble((Double) value);
rc.endField("valueDouble", 7);
rc.endField("valueDouble", 9);
} else if (value instanceof String) {
rc.startField("valueString", 8);
rc.startField("valueString", 10);
rc.addBinary(Binary.fromString((String) value));
rc.endField("valueString", 8);
rc.endField("valueString", 10);
} else if (value instanceof Boolean) {
rc.startField("valueBool", 9);
rc.startField("valueBool", 11);
rc.addBoolean((Boolean) value);
rc.endField("valueBool", 9);
rc.endField("valueBool", 11);
} else if (value instanceof ByteBuffer) {
rc.startField("valueObject", 10);
rc.startField("valueObject", 12);
rc.addBinary(Binary.fromConstantByteBuffer((ByteBuffer) value));
rc.endField("valueObject", 10);
rc.endField("valueObject", 12);
} else if (value instanceof byte[]) {
rc.startField("valueObject", 12);
rc.addBinary(Binary.fromConstantByteArray((byte[]) value));
rc.endField("valueObject", 12);
}
rc.endMessage();
// store previous record
prevRecord = r;
}
}

0 comments on commit 5a28c88

Please sign in to comment.