Skip to content

Commit

Permalink
HollowObjectTypeDataElements utilities for splitting and joining (#642)
Browse files Browse the repository at this point in the history
* HollowObjectTypeDataElements utilities for splitting and joining

* Assume non-uniform field width across shards, add tests for filtered fields

* Restore private modifier, cleanup, more test

* Work in tandem with isSkipTypeShardUpdateWithNoAdditions

---------

Co-authored-by: Sunjeet Singh <[email protected]>
  • Loading branch information
Sunjeet and Sunjeet authored Oct 11, 2023
1 parent 59c4083 commit f72303e
Show file tree
Hide file tree
Showing 11 changed files with 905 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ public static FixedLengthData get(HollowBlobInput in, MemoryMode memoryMode, Arr
}
}

public static FixedLengthData get(long numBits, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
if (memoryMode.equals(MemoryMode.ON_HEAP)) {
return new FixedLengthElementArray(memoryRecycler, numBits);
} else {
throw new UnsupportedOperationException("Memory mode " + memoryMode.name() + " not supported");
}
}

public static void destroy(FixedLengthData fld, ArraySegmentRecycler memoryRecycler) {
if (fld instanceof FixedLengthElementArray) {
((FixedLengthElementArray) fld).destroy(memoryRecycler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import com.netflix.hollow.core.memory.ByteDataArray;
import com.netflix.hollow.core.memory.SegmentedByteArray;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.memory.pool.WastefulRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.util.IOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

public class GapEncodedVariableLengthIntegerReader {

Expand Down Expand Up @@ -140,4 +144,105 @@ public static GapEncodedVariableLengthIntegerReader combine(GapEncodedVariableLe

return new GapEncodedVariableLengthIntegerReader(arr.getUnderlyingArray(), (int)arr.length());
}

/**
* Splits this {@code GapEncodedVariableLengthIntegerReader} into {@code numSplits} new instances.
* The original data is not cleaned up.
*
* @param numSplits the number of instances to split into, should be a power of 2.
* @return an array of {@code GapEncodedVariableLengthIntegerReader} instances populated with the results of the split.
*/
public GapEncodedVariableLengthIntegerReader[] split(int numSplits) {
if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Split should only be called with powers of 2, it was called with " + numSplits);
}
final int toMask = numSplits - 1;
final int toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
GapEncodedVariableLengthIntegerReader[] to = new GapEncodedVariableLengthIntegerReader[numSplits];

List<Integer> ordinals = new ArrayList<>();
reset();
while(nextElement() != Integer.MAX_VALUE) {
ordinals.add(nextElement());
advance();
}

ByteDataArray[] splitOrdinals = new ByteDataArray[numSplits];
int previousSplitOrdinal[] = new int[numSplits];
for (int ordinal : ordinals) {
int toIndex = ordinal & toMask;
int toOrdinal = ordinal >> toOrdinalShift;
if (splitOrdinals[toIndex] == null) {
splitOrdinals[toIndex] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
}
VarInt.writeVInt(splitOrdinals[toIndex], toOrdinal - previousSplitOrdinal[toIndex]);
previousSplitOrdinal[toIndex] = toOrdinal;
}
for(int i=0;i<numSplits;i++) {
if (splitOrdinals[i] == null) {
to[i] = EMPTY_READER;
} else {
to[i] = new GapEncodedVariableLengthIntegerReader(splitOrdinals[i].getUnderlyingArray(), (int) splitOrdinals[i].length());
}
}

return to;
}

/**
* Join an array of {@code GapEncodedVariableLengthIntegerReader} instances into one.
* The original data is not cleaned up.
*
* @param from the array of {@code GapEncodedVariableLengthIntegerReader} to join, should have a power of 2 number of elements.
* @return an instance of {@code GapEncodedVariableLengthIntegerReader} with the joined result.
*/
public static GapEncodedVariableLengthIntegerReader join(GapEncodedVariableLengthIntegerReader[] from) {
if (from==null) {
throw new IllegalStateException("Join invoked on a null input array");
}
if (from.length<=0 || !((from.length&(from.length-1))==0)) {
throw new IllegalStateException("Join should only be called with powers of 2, it was called with " + from.length);
}

int numSplits = from.length;
final int fromMask = numSplits - 1;
final int fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
int joinedMaxOrdinal = -1;

HashSet<Integer>[] fromOrdinals = new HashSet[from.length];
for (int i=0;i<from.length;i++) {
fromOrdinals[i] = new HashSet<>();
if (from[i] == null) {
continue;
}
from[i].reset();

while(from[i].nextElement() != Integer.MAX_VALUE) {
int splitOrdinal = from[i].nextElement();
fromOrdinals[i].add(splitOrdinal);
joinedMaxOrdinal = Math.max(joinedMaxOrdinal, splitOrdinal*numSplits + i);
from[i].advance();
}
}

ByteDataArray toRemovals = null;
int previousOrdinal = 0;
for (int ordinal=0;ordinal<=joinedMaxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;
if (fromOrdinals[fromIndex].contains(fromOrdinal)) {
if (toRemovals == null) {
toRemovals = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
}
VarInt.writeVInt(toRemovals, ordinal - previousOrdinal);
previousOrdinal = ordinal;
}
}

if (toRemovals == null) {
return EMPTY_READER;
} else {
return new GapEncodedVariableLengthIntegerReader(toRemovals.getUnderlyingArray(), (int) toRemovals.length());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.netflix.hollow.core.read.engine.object;

import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.copyRecord;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.varLengthSize;

import com.netflix.hollow.core.memory.SegmentedByteArray;
import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;
Expand Down Expand Up @@ -71,7 +73,11 @@ public void populateHistory() {
int ordinal = iter.next();
while(ordinal != ORDINAL_NONE) {
ordinalMapping.put(ordinal, nextOrdinal);
copyRecord(ordinal);

int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;
copyRecord(historicalDataElements, nextOrdinal, stateEngineDataElements[shard], shardOrdinal, currentWriteVarLengthDataPointers);
nextOrdinal++;

ordinal = iter.next();
}
Expand Down Expand Up @@ -109,7 +115,9 @@ private void populateStats() {

for(int i=0;i<totalVarLengthSizes.length;i++) {
if(stateEngineDataElements[0].varLengthData[i] != null) {
totalVarLengthSizes[i] += varLengthSize(ordinal, i);
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;
totalVarLengthSizes[i] += varLengthSize(stateEngineDataElements[shard], shardOrdinal, i);
}
}

Expand All @@ -125,66 +133,11 @@ private void populateStats() {
historicalDataElements.bitsPerField[i] = (64 - Long.numberOfLeadingZeros(totalVarLengthSizes[i] + 1)) + 1;
}

historicalDataElements.nullValueForField[i] = (1L << historicalDataElements.bitsPerField[i]) - 1;
historicalDataElements.nullValueForField[i] = historicalDataElements.bitsPerField[i] == 64 ? -1L : (1L << historicalDataElements.bitsPerField[i]) - 1;
historicalDataElements.bitOffsetPerField[i] = historicalDataElements.bitsPerRecord;
historicalDataElements.bitsPerRecord += historicalDataElements.bitsPerField[i];
}

ordinalMapping = new IntMap(removedEntryCount);
}

private long varLengthSize(int ordinal, int fieldIdx) {
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;

int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * shardOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
long endByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
long startByte = shardOrdinal != 0 ? stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset - stateEngineDataElements[shard].bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1 : 0;

return endByte - startByte;
}

private void copyRecord(int ordinal) {
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;

for(int i=0;i<historicalDataElements.schema.numFields();i++) {
if(historicalDataElements.varLengthData[i] == null) {
long value = stateEngineDataElements[shard].fixedLengthData.getLargeElementValue(((long)shardOrdinal * stateEngineDataElements[shard].bitsPerRecord) + stateEngineDataElements[shard].bitOffsetPerField[i], stateEngineDataElements[shard].bitsPerField[i]);
historicalDataElements.fixedLengthData.setElementValue(((long)nextOrdinal * historicalDataElements.bitsPerRecord) + historicalDataElements.bitOffsetPerField[i], historicalDataElements.bitsPerField[i], value);
} else {
long fromStartByte = varLengthStartByte(shard, shardOrdinal, i);
long fromEndByte = varLengthEndByte(shard, shardOrdinal, i);
long size = fromEndByte - fromStartByte;

historicalDataElements.fixedLengthData.setElementValue(((long)nextOrdinal * historicalDataElements.bitsPerRecord) + historicalDataElements.bitOffsetPerField[i], historicalDataElements.bitsPerField[i], currentWriteVarLengthDataPointers[i] + size);
historicalDataElements.varLengthData[i].copy(stateEngineDataElements[shard].varLengthData[i], fromStartByte, currentWriteVarLengthDataPointers[i], size);

currentWriteVarLengthDataPointers[i] += size;
}
}

nextOrdinal++;
}

private long varLengthStartByte(int shard, int translatedOrdinal, int fieldIdx) {
if(translatedOrdinal == 0)
return 0;

int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * translatedOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
long startByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset - stateEngineDataElements[shard].bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1;

return startByte;
}

private long varLengthEndByte(int shard, int translatedOrdinal, int fieldIdx) {
int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * translatedOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
long endByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;

return endByte;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,65 @@ public void destroy() {
}
}

static long varLengthStartByte(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
if(ordinal == 0)
return 0;

int numBitsForField = from.bitsPerField[fieldIdx];
long currentBitOffset = ((long)from.bitsPerRecord * ordinal) + from.bitOffsetPerField[fieldIdx];
long startByte = from.fixedLengthData.getElementValue(currentBitOffset - from.bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1;

return startByte;
}

static long varLengthEndByte(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
int numBitsForField = from.bitsPerField[fieldIdx];
long currentBitOffset = ((long)from.bitsPerRecord * ordinal) + from.bitOffsetPerField[fieldIdx];
long endByte = from.fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;

return endByte;
}

static long varLengthSize(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
int numBitsForField = from.bitsPerField[fieldIdx];
long fromBitOffset = ((long)from.bitsPerRecord*ordinal) + from.bitOffsetPerField[fieldIdx];
long fromEndByte = from.fixedLengthData.getElementValue(fromBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
long fromStartByte = ordinal != 0 ? from.fixedLengthData.getElementValue(fromBitOffset - from.bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1 : 0;
return fromEndByte - fromStartByte;
}

static void copyRecord(HollowObjectTypeDataElements to, int toOrdinal, HollowObjectTypeDataElements from, int fromOrdinal, long[] currentWriteVarLengthDataPointers) {
for(int fieldIndex=0;fieldIndex<to.schema.numFields();fieldIndex++) {
if(to.varLengthData[fieldIndex] == null) {
long value = from.fixedLengthData.getLargeElementValue(((long)fromOrdinal * from.bitsPerRecord) + from.bitOffsetPerField[fieldIndex], from.bitsPerField[fieldIndex]);
to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], value);
} else {
long fromStartByte = varLengthStartByte(from, fromOrdinal, fieldIndex);
long fromEndByte = varLengthEndByte(from, fromOrdinal, fieldIndex);
long size = fromEndByte - fromStartByte;

to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], currentWriteVarLengthDataPointers[fieldIndex] + size);
to.varLengthData[fieldIndex].copy(from.varLengthData[fieldIndex], fromStartByte, currentWriteVarLengthDataPointers[fieldIndex], size);

currentWriteVarLengthDataPointers[fieldIndex] += size;
}
}
}

static void writeNullField(HollowObjectTypeDataElements target, int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
if(target.varLengthData[fieldIndex] != null) {
writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
writeNullFixedLengthField(target, fieldIndex, currentWriteFixedLengthStartBit);
}
}

static void writeNullVarLengthField(HollowObjectTypeDataElements target, int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
long writeValue = (1L << (target.bitsPerField[fieldIndex] - 1)) | currentWriteVarLengthDataPointers[fieldIndex];
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], writeValue);
}

static void writeNullFixedLengthField(HollowObjectTypeDataElements target, int fieldIndex, long currentWriteFixedLengthStartBit) {
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], target.nullValueForField[fieldIndex]);
}
}
Loading

0 comments on commit f72303e

Please sign in to comment.