Skip to content

Commit

Permalink
HollowObjectTypeDataElements utilities for splitting and joining
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Oct 4, 2023
1 parent 59c4083 commit 740545b
Show file tree
Hide file tree
Showing 11 changed files with 640 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ public static FixedLengthData get(HollowBlobInput in, MemoryMode memoryMode, Arr
}
}

public static FixedLengthData get(long numBits, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
if (memoryMode.equals(MemoryMode.ON_HEAP)) {
return new FixedLengthElementArray(memoryRecycler, numBits);
} else {
throw new UnsupportedOperationException("Memory mode " + memoryMode.name() + " not supported");
}
}

public static void destroy(FixedLengthData fld, ArraySegmentRecycler memoryRecycler) {
if (fld instanceof FixedLengthElementArray) {
((FixedLengthElementArray) fld).destroy(memoryRecycler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import com.netflix.hollow.core.memory.ByteDataArray;
import com.netflix.hollow.core.memory.SegmentedByteArray;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.memory.pool.WastefulRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.util.IOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

public class GapEncodedVariableLengthIntegerReader {

Expand Down Expand Up @@ -140,4 +144,105 @@ public static GapEncodedVariableLengthIntegerReader combine(GapEncodedVariableLe

return new GapEncodedVariableLengthIntegerReader(arr.getUnderlyingArray(), (int)arr.length());
}

/**
* Splits this {@code GapEncodedVariableLengthIntegerReader} into {@code numSplits} new instances.
* The original data is not cleaned up.
*
* @param numSplits the number of instances to split into, should be a power of 2.
* @return an array of {@code GapEncodedVariableLengthIntegerReader} instances populated with the results of the split.
*/
public GapEncodedVariableLengthIntegerReader[] split(int numSplits) {
if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Split should only be called with powers of 2, it was called with " + numSplits);
}
final int toMask = numSplits - 1;
final int toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
GapEncodedVariableLengthIntegerReader[] to = new GapEncodedVariableLengthIntegerReader[numSplits];

List<Integer> ordinals = new ArrayList<>();
reset();
while(nextElement() != Integer.MAX_VALUE) {
ordinals.add(nextElement());
advance();
}

ByteDataArray[] splitOrdinals = new ByteDataArray[numSplits];
int previousSplitOrdinal[] = new int[numSplits];
for (int ordinal : ordinals) {
int toIndex = ordinal & toMask;
int toOrdinal = ordinal >> toOrdinalShift;
if (splitOrdinals[toIndex] == null) {
splitOrdinals[toIndex] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
}
VarInt.writeVInt(splitOrdinals[toIndex], toOrdinal - previousSplitOrdinal[toIndex]);
previousSplitOrdinal[toIndex] = toOrdinal;
}
for(int i=0;i<numSplits;i++) {
if (splitOrdinals[i] == null) {
to[i] = EMPTY_READER;
} else {
to[i] = new GapEncodedVariableLengthIntegerReader(splitOrdinals[i].getUnderlyingArray(), (int) splitOrdinals[i].length());
}
}

return to;
}

/**
* Join an array of {@code GapEncodedVariableLengthIntegerReader} instances into one.
* The original data is not cleaned up.
*
* @param from the array of {@code GapEncodedVariableLengthIntegerReader} to join, should have a power of 2 number of elements.
* @return an instance of {@code GapEncodedVariableLengthIntegerReader} with the joined result.
*/
public static GapEncodedVariableLengthIntegerReader join(GapEncodedVariableLengthIntegerReader[] from) {
if (from==null) {
throw new IllegalStateException("Join invoked on a null input array");
}
if (from.length<=0 || !((from.length&(from.length-1))==0)) {
throw new IllegalStateException("Join should only be called with powers of 2, it was called with " + from.length);
}

int numSplits = from.length;
final int fromMask = numSplits - 1;
final int fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
int joinedMaxOrdinal = -1;

HashSet<Integer>[] fromOrdinals = new HashSet[from.length];
for (int i=0;i<from.length;i++) {
fromOrdinals[i] = new HashSet<>();
if (from[i] == null) {
continue;
}
from[i].reset();

while(from[i].nextElement() != Integer.MAX_VALUE) {
int splitOrdinal = from[i].nextElement();
fromOrdinals[i].add(splitOrdinal);
joinedMaxOrdinal = Math.max(joinedMaxOrdinal, splitOrdinal*numSplits + i);
from[i].advance();
}
}

ByteDataArray toRemovals = null;
int previousOrdinal = 0;
for (int ordinal=0;ordinal<=joinedMaxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;
if (fromOrdinals[fromIndex].contains(fromOrdinal)) {
if (toRemovals == null) {
toRemovals = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
}
VarInt.writeVInt(toRemovals, ordinal - previousOrdinal);
previousOrdinal = ordinal;
}
}

if (toRemovals == null) {
return EMPTY_READER;
} else {
return new GapEncodedVariableLengthIntegerReader(toRemovals.getUnderlyingArray(), (int) toRemovals.length());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.netflix.hollow.core.read.engine.object;

import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.copyRecord;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.varLengthSize;

import com.netflix.hollow.core.memory.SegmentedByteArray;
import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;
Expand Down Expand Up @@ -71,7 +73,11 @@ public void populateHistory() {
int ordinal = iter.next();
while(ordinal != ORDINAL_NONE) {
ordinalMapping.put(ordinal, nextOrdinal);
copyRecord(ordinal);

int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;
copyRecord(historicalDataElements, nextOrdinal, stateEngineDataElements[shard], shardOrdinal, currentWriteVarLengthDataPointers);
nextOrdinal++;

ordinal = iter.next();
}
Expand Down Expand Up @@ -109,7 +115,9 @@ private void populateStats() {

for(int i=0;i<totalVarLengthSizes.length;i++) {
if(stateEngineDataElements[0].varLengthData[i] != null) {
totalVarLengthSizes[i] += varLengthSize(ordinal, i);
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;
totalVarLengthSizes[i] += varLengthSize(stateEngineDataElements[shard], shardOrdinal, i);
}
}

Expand All @@ -125,66 +133,11 @@ private void populateStats() {
historicalDataElements.bitsPerField[i] = (64 - Long.numberOfLeadingZeros(totalVarLengthSizes[i] + 1)) + 1;
}

historicalDataElements.nullValueForField[i] = (1L << historicalDataElements.bitsPerField[i]) - 1;
historicalDataElements.nullValueForField[i] = historicalDataElements.bitsPerField[i] == 64 ? -1L : (1L << historicalDataElements.bitsPerField[i]) - 1;
historicalDataElements.bitOffsetPerField[i] = historicalDataElements.bitsPerRecord;
historicalDataElements.bitsPerRecord += historicalDataElements.bitsPerField[i];
}

ordinalMapping = new IntMap(removedEntryCount);
}

private long varLengthSize(int ordinal, int fieldIdx) {
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;

int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * shardOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
long endByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
long startByte = shardOrdinal != 0 ? stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset - stateEngineDataElements[shard].bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1 : 0;

return endByte - startByte;
}

private void copyRecord(int ordinal) {
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;

for(int i=0;i<historicalDataElements.schema.numFields();i++) {
if(historicalDataElements.varLengthData[i] == null) {
long value = stateEngineDataElements[shard].fixedLengthData.getLargeElementValue(((long)shardOrdinal * stateEngineDataElements[shard].bitsPerRecord) + stateEngineDataElements[shard].bitOffsetPerField[i], stateEngineDataElements[shard].bitsPerField[i]);
historicalDataElements.fixedLengthData.setElementValue(((long)nextOrdinal * historicalDataElements.bitsPerRecord) + historicalDataElements.bitOffsetPerField[i], historicalDataElements.bitsPerField[i], value);
} else {
long fromStartByte = varLengthStartByte(shard, shardOrdinal, i);
long fromEndByte = varLengthEndByte(shard, shardOrdinal, i);
long size = fromEndByte - fromStartByte;

historicalDataElements.fixedLengthData.setElementValue(((long)nextOrdinal * historicalDataElements.bitsPerRecord) + historicalDataElements.bitOffsetPerField[i], historicalDataElements.bitsPerField[i], currentWriteVarLengthDataPointers[i] + size);
historicalDataElements.varLengthData[i].copy(stateEngineDataElements[shard].varLengthData[i], fromStartByte, currentWriteVarLengthDataPointers[i], size);

currentWriteVarLengthDataPointers[i] += size;
}
}

nextOrdinal++;
}

private long varLengthStartByte(int shard, int translatedOrdinal, int fieldIdx) {
if(translatedOrdinal == 0)
return 0;

int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * translatedOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
long startByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset - stateEngineDataElements[shard].bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1;

return startByte;
}

private long varLengthEndByte(int shard, int translatedOrdinal, int fieldIdx) {
int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * translatedOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
long endByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;

return endByte;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public class HollowObjectTypeDataElements {
final long nullValueForField[];
int bitsPerRecord;

private int bitsPerUnfilteredField[];
private boolean unfilteredFieldIsIncluded[];
int bitsPerUnfilteredField[];
boolean unfilteredFieldIsIncluded[];

final ArraySegmentRecycler memoryRecycler;
final MemoryMode memoryMode;
Expand Down Expand Up @@ -211,4 +211,48 @@ public void destroy() {
}
}

static long varLengthStartByte(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
if(ordinal == 0)
return 0;

int numBitsForField = from.bitsPerField[fieldIdx];
long currentBitOffset = ((long)from.bitsPerRecord * ordinal) + from.bitOffsetPerField[fieldIdx];
long startByte = from.fixedLengthData.getElementValue(currentBitOffset - from.bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1;

return startByte;
}

static long varLengthEndByte(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
int numBitsForField = from.bitsPerField[fieldIdx];
long currentBitOffset = ((long)from.bitsPerRecord * ordinal) + from.bitOffsetPerField[fieldIdx];
long endByte = from.fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;

return endByte;
}

static long varLengthSize(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
int numBitsForField = from.bitsPerField[fieldIdx];
long fromBitOffset = ((long)from.bitsPerRecord*ordinal) + from.bitOffsetPerField[fieldIdx];
long fromEndByte = from.fixedLengthData.getElementValue(fromBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
long fromStartByte = ordinal != 0 ? from.fixedLengthData.getElementValue(fromBitOffset - from.bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1 : 0;
return fromEndByte - fromStartByte;
}

static void copyRecord(HollowObjectTypeDataElements to, int toOrdinal, HollowObjectTypeDataElements from, int fromOrdinal, long[] currentWriteVarLengthDataPointers) {
for(int fieldIndex=0;fieldIndex<to.schema.numFields();fieldIndex++) {
if(to.varLengthData[fieldIndex] == null) {
long value = from.fixedLengthData.getLargeElementValue(((long)fromOrdinal * from.bitsPerRecord) + from.bitOffsetPerField[fieldIndex], from.bitsPerField[fieldIndex]);
to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], value);
} else {
long fromStartByte = varLengthStartByte(from, fromOrdinal, fieldIndex);
long fromEndByte = varLengthEndByte(from, fromOrdinal, fieldIndex);
long size = fromEndByte - fromStartByte;

to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], currentWriteVarLengthDataPointers[fieldIndex] + size);
to.varLengthData[fieldIndex].copy(from.varLengthData[fieldIndex], fromStartByte, currentWriteVarLengthDataPointers[fieldIndex], size);

currentWriteVarLengthDataPointers[fieldIndex] += size;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.netflix.hollow.core.read.engine.object;

import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.copyRecord;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.varLengthSize;

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.memory.VariableLengthDataFactory;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

/**
* Join multiple {@code HollowObjectTypeDataElements}s into 1 {@code HollowObjectTypeDataElements}.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
public class HollowObjectTypeDataElementsJoiner {

HollowObjectTypeDataElements join(HollowObjectTypeDataElements[] from) {
final int fromMask = from.length - 1;
final int fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length);
long[] currentWriteVarLengthDataPointers;

if (from.length<=0 || !((from.length&(from.length-1))==0)) {
throw new IllegalStateException("No. of DataElements to be joined must be a power of 2");
}

HollowObjectTypeDataElements to = new HollowObjectTypeDataElements(from[0].schema, from[0].memoryMode, from[0].memoryRecycler);
currentWriteVarLengthDataPointers = new long[from[0].schema.numFields()];

populateStats(to, from);

GapEncodedVariableLengthIntegerReader[] fromRemovals = new GapEncodedVariableLengthIntegerReader[from.length];
for (int i=0;i<from.length;i++) {
fromRemovals[i] = from[i].encodedRemovals;
}
to.encodedRemovals = GapEncodedVariableLengthIntegerReader.join(fromRemovals);

for (HollowObjectTypeDataElements elements : from) {
if (elements.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements joiner- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}

to.fixedLengthData = FixedLengthDataFactory.get((long)to.bitsPerRecord * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler);
for(int fieldIdx=0;fieldIdx<to.schema.numFields();fieldIdx++) {
if(from[0].varLengthData[fieldIdx] != null) {
to.varLengthData[fieldIdx] = VariableLengthDataFactory.get(to.memoryMode, to.memoryRecycler);
}
}

for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;
copyRecord(to, ordinal, from[fromIndex], fromOrdinal, currentWriteVarLengthDataPointers);
}

return to;
}

void populateStats(HollowObjectTypeDataElements to, HollowObjectTypeDataElements[] from) {
long[] varLengthSizes = new long[to.schema.numFields()];

to.maxOrdinal = -1;
for(int fromIndex=0;fromIndex<from.length;fromIndex++) {
for(int ordinal=0;ordinal<=from[fromIndex].maxOrdinal;ordinal++) {
for(int fieldIdx=0;fieldIdx<to.schema.numFields();fieldIdx++) {
if(from[fromIndex].varLengthData[fieldIdx] != null) {
varLengthSizes[fieldIdx] += varLengthSize(from[fromIndex], ordinal, fieldIdx);
}
}
}
to.maxOrdinal+= from[fromIndex].maxOrdinal + 1;
}

for(int fieldIdx=0;fieldIdx<to.schema.numFields();fieldIdx++) {
if(from[0].varLengthData[fieldIdx] == null) {
to.bitsPerField[fieldIdx] = from[0].bitsPerField[fieldIdx];
} else {
to.bitsPerField[fieldIdx] = (64 - Long.numberOfLeadingZeros(varLengthSizes[fieldIdx] + 1)) + 1;
}
to.nullValueForField[fieldIdx] = to.bitsPerField[fieldIdx] == 64 ? -1L : (1L << to.bitsPerField[fieldIdx]) - 1;
to.bitOffsetPerField[fieldIdx] = to.bitsPerRecord;
to.bitsPerRecord += to.bitsPerField[fieldIdx];
}

to.bitsPerUnfilteredField = from[0].bitsPerUnfilteredField;
to.unfilteredFieldIsIncluded = from[0].unfilteredFieldIsIncluded;
}
}
Loading

0 comments on commit 740545b

Please sign in to comment.