Skip to content

Commit

Permalink
Refactor object read state to apply consistency check on shards holde…
Browse files Browse the repository at this point in the history
…r instead of data elements
  • Loading branch information
Sunjeet committed Oct 18, 2023
1 parent 7c3e87f commit fee0eaa
Show file tree
Hide file tree
Showing 13 changed files with 430 additions and 449 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro
} else {
HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema;
HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter);
populateTypeStateSnapshot(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema), numShards);
}
} else if (schema instanceof HollowListSchema) {
if(!filter.includes(typeName)) {
Expand Down Expand Up @@ -361,6 +361,15 @@ private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState t
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}

private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException {
if (numShards<=0 || ((numShards&(numShards-1))!=0)) {
throw new IllegalArgumentException("Number of shards must be a power of 2!");
}

stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler(), numShards);
}

private String readTypeStateDelta(HollowBlobInput in) throws IOException {
HollowSchema schema = HollowSchema.readFrom(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ public BitSet getPreviousOrdinals() {

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode mem
this.shards = shards;
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memo

}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;
import com.netflix.hollow.core.memory.pool.WastefulRecycler;
import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener;
import com.netflix.hollow.core.schema.HollowObjectSchema;
import com.netflix.hollow.core.util.IntMap;
import com.netflix.hollow.core.util.RemovedOrdinalIterator;
import java.util.Arrays;

/**
* This class contains the logic for extracting the removed records from an OBJECT type state
Expand All @@ -37,24 +39,19 @@ public class HollowObjectDeltaHistoricalStateCreator {

private final HollowObjectTypeDataElements historicalDataElements;

private final int shardNumberMask;
private final int shardOrdinalShift;

private HollowObjectTypeReadState typeState;
private HollowObjectTypeDataElements stateEngineDataElements[];
private HollowObjectTypeReadState.ShardsHolder shardsHolder;
private RemovedOrdinalIterator iter;
private IntMap ordinalMapping;
private int nextOrdinal;
private final long currentWriteVarLengthDataPointers[];

public HollowObjectDeltaHistoricalStateCreator(HollowObjectTypeReadState typeState, boolean reverse) {
this.typeState = typeState;
this.stateEngineDataElements = typeState.currentDataElements();
this.historicalDataElements = new HollowObjectTypeDataElements(typeState.getSchema(), WastefulRecycler.DEFAULT_INSTANCE);
this.iter = new RemovedOrdinalIterator(typeState.getListener(PopulatedOrdinalListener.class), reverse);
this.currentWriteVarLengthDataPointers = new long[typeState.getSchema().numFields()];
this.shardNumberMask = stateEngineDataElements.length - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(stateEngineDataElements.length);
this.shardsHolder = typeState.shardsVolatile;
}

public void populateHistory() {
Expand All @@ -63,7 +60,7 @@ public void populateHistory() {
historicalDataElements.fixedLengthData = new FixedLengthElementArray(historicalDataElements.memoryRecycler, (long)historicalDataElements.bitsPerRecord * (historicalDataElements.maxOrdinal + 1));

for(int i=0;i<historicalDataElements.schema.numFields();i++) {
if(stateEngineDataElements[0].varLengthData[i] != null) {
if(isVarLengthField(typeState.getSchema().getFieldType(i))) {
historicalDataElements.varLengthData[i] = new SegmentedByteArray(historicalDataElements.memoryRecycler);
}
}
Expand All @@ -74,9 +71,9 @@ public void populateHistory() {
while(ordinal != ORDINAL_NONE) {
ordinalMapping.put(ordinal, nextOrdinal);

int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;
copyRecord(historicalDataElements, nextOrdinal, stateEngineDataElements[shard], shardOrdinal, currentWriteVarLengthDataPointers);
int whichShard = ordinal & shardsHolder.shardNumberMask;
int shardOrdinal = ordinal >> shardsHolder.shards[whichShard].shardOrdinalShift;
copyRecord(historicalDataElements, nextOrdinal, shardsHolder.shards[whichShard].dataElements, shardOrdinal, currentWriteVarLengthDataPointers);
nextOrdinal++;

ordinal = iter.next();
Expand All @@ -89,7 +86,7 @@ public void populateHistory() {
*/
public void dereferenceTypeState() {
this.typeState = null;
this.stateEngineDataElements = null;
this.shardsHolder = null;
this.iter = null;
}

Expand All @@ -98,26 +95,26 @@ public IntMap getOrdinalMapping() {
}

public HollowObjectTypeReadState createHistoricalTypeReadState() {
HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(null, typeState.getSchema());
historicalTypeState.setCurrentData(historicalDataElements);
HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(typeState.getSchema(), historicalDataElements);

return historicalTypeState;
}

private void populateStats() {
iter.reset();
int removedEntryCount = 0;
long totalVarLengthSizes[] = new long[stateEngineDataElements[0].varLengthData.length];
long totalVarLengthSizes[] = new long[typeState.getSchema().numFields()];

int ordinal = iter.next();

while(ordinal != ORDINAL_NONE) {
removedEntryCount++;

for(int i=0;i<totalVarLengthSizes.length;i++) {
if(stateEngineDataElements[0].varLengthData[i] != null) {
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;
totalVarLengthSizes[i] += varLengthSize(stateEngineDataElements[shard], shardOrdinal, i);
for(int i=0;i<typeState.getSchema().numFields();i++) {
if(isVarLengthField(typeState.getSchema().getFieldType(i))) {
int whichShard = ordinal & shardsHolder.shardNumberMask;
int shardOrdinal = ordinal >> shardsHolder.shards[whichShard].shardOrdinalShift;
totalVarLengthSizes[i] += varLengthSize(shardsHolder.shards[whichShard].dataElements, shardOrdinal, i);
}
}

Expand All @@ -126,9 +123,12 @@ private void populateStats() {

historicalDataElements.maxOrdinal = removedEntryCount - 1;

for(int i=0;i<stateEngineDataElements[0].bitsPerField.length;i++) {
if(stateEngineDataElements[0].varLengthData[i] == null) {
historicalDataElements.bitsPerField[i] = stateEngineDataElements[0].bitsPerField[i];
for(int i=0;i<typeState.getSchema().numFields();i++) {
if(!isVarLengthField(typeState.getSchema().getFieldType(i))) {
final int fieldIdx = i;
historicalDataElements.bitsPerField[i] = Arrays.stream(shardsHolder.shards)
.map(shard -> shard.dataElements.bitsPerField[fieldIdx])
.max(Integer::compare).get();
} else {
historicalDataElements.bitsPerField[i] = (64 - Long.numberOfLeadingZeros(totalVarLengthSizes[i] + 1)) + 1;
}
Expand All @@ -140,4 +140,8 @@ private void populateStats() {

ordinalMapping = new IntMap(removedEntryCount);
}

private boolean isVarLengthField(HollowObjectSchema.FieldType fieldType) {
return fieldType == HollowObjectSchema.FieldType.STRING || fieldType == HollowObjectSchema.FieldType.BYTES;
}
}
Loading

0 comments on commit fee0eaa

Please sign in to comment.