Skip to content

Commit

Permalink
Merge pull request #12 from Netflix/shard-large-types
Browse files Browse the repository at this point in the history
Shard large types
  • Loading branch information
dkoszewnik authored Jan 27, 2017
2 parents 8f0215f + 2341c73 commit 14adb52
Show file tree
Hide file tree
Showing 44 changed files with 3,194 additions and 1,571 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
*/
public class HollowBlobHeader {

public static final int HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER = 1029;
public static final int HOLLOW_BLOB_VERSION_HEADER = 1030;

private Map<String, String> headerTags = new HashMap<String, String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public HollowBlobHeader readHeader(InputStream is) throws IOException {
DataInputStream dis = new DataInputStream(is);

int headerVersion = dis.readInt();
if(headerVersion != HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER && headerVersion != HollowBlobHeader.HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER) {
if(headerVersion != HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER) {
throw new IOException("The HollowBlob you are trying to read is incompatible. "
+ "The expected Hollow blob version was " + HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER + " but the actual version was " + headerVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package com.netflix.hollow.core.read.engine;

import static com.netflix.hollow.core.HollowBlobHeader.HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER;

import com.netflix.hollow.core.memory.encoding.VarInt;

import com.netflix.hollow.core.schema.HollowListSchema;
Expand Down Expand Up @@ -160,34 +158,33 @@ private void notifyEndUpdate() {
private String readTypeStateSnapshot(DataInputStream is, HollowBlobHeader header, HollowFilterConfig filter) throws IOException {
HollowSchema schema = HollowSchema.readFrom(is);

if(header.getBlobFormatVersion() != HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER)
skipForwardsCompatibilityBytes(is);

int numShards = readNumShards(is);

if(schema instanceof HollowObjectSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowObjectTypeReadState.discardSnapshot(is, (HollowObjectSchema)schema);
HollowObjectTypeReadState.discardSnapshot(is, (HollowObjectSchema)schema, numShards);
} else {
HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema;
HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter);
populateTypeStateSnapshot(is, new HollowObjectTypeReadState(stateEngine, filteredSchema, unfilteredSchema));
populateTypeStateSnapshot(is, new HollowObjectTypeReadState(stateEngine, filteredSchema, unfilteredSchema, numShards));
}
} else if (schema instanceof HollowListSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowListTypeReadState.discardSnapshot(is);
HollowListTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowListTypeReadState(stateEngine, (HollowListSchema)schema));
populateTypeStateSnapshot(is, new HollowListTypeReadState(stateEngine, (HollowListSchema)schema, numShards));
}
} else if(schema instanceof HollowSetSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowSetTypeReadState.discardSnapshot(is);
HollowSetTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowSetTypeReadState(stateEngine, (HollowSetSchema)schema));
populateTypeStateSnapshot(is, new HollowSetTypeReadState(stateEngine, (HollowSetSchema)schema, numShards));
}
} else if(schema instanceof HollowMapSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowMapTypeReadState.discardSnapshot(is);
HollowMapTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowMapTypeReadState(stateEngine, (HollowMapSchema)schema));
populateTypeStateSnapshot(is, new HollowMapTypeReadState(stateEngine, (HollowMapSchema)schema, numShards));
}
}

Expand All @@ -202,19 +199,29 @@ private void populateTypeStateSnapshot(DataInputStream is, HollowTypeReadState t
private String readTypeStateDelta(DataInputStream is, HollowBlobHeader header) throws IOException {
HollowSchema schema = HollowSchema.readFrom(is);

if(header.getBlobFormatVersion() != HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER)
skipForwardsCompatibilityBytes(is);
int numShards = readNumShards(is);

HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
typeState.applyDelta(is, schema, stateEngine.getMemoryRecycler());
} else {
discardDelta(is, schema);
discardDelta(is, schema, numShards);
}

return schema.getName();
}

private int readNumShards(DataInputStream is) throws IOException {
int backwardsCompatibilityBytes = VarInt.readVInt(is);

if(backwardsCompatibilityBytes == 0)
return 1; /// produced by a version of hollow prior to 2.1.0, always only 1 shard.

skipForwardsCompatibilityBytes(is);

return VarInt.readVInt(is);
}

private void skipForwardsCompatibilityBytes(DataInputStream is) throws IOException {
int bytesToSkip = VarInt.readVInt(is);
while(bytesToSkip > 0) {
Expand All @@ -226,15 +233,15 @@ private void skipForwardsCompatibilityBytes(DataInputStream is) throws IOExcepti
}


private void discardDelta(DataInputStream dis, HollowSchema schema) throws IOException {
private void discardDelta(DataInputStream dis, HollowSchema schema, int numShards) throws IOException {
if(schema instanceof HollowObjectSchema)
HollowObjectTypeReadState.discardDelta(dis, (HollowObjectSchema)schema);
HollowObjectTypeReadState.discardDelta(dis, (HollowObjectSchema)schema, numShards);
else if(schema instanceof HollowListSchema)
HollowListTypeReadState.discardDelta(dis);
HollowListTypeReadState.discardDelta(dis, numShards);
else if(schema instanceof HollowSetSchema)
HollowSetTypeReadState.discardDelta(dis);
HollowSetTypeReadState.discardDelta(dis, numShards);
else if(schema instanceof HollowMapSchema)
HollowMapTypeReadState.discardDelta(dis);
HollowMapTypeReadState.discardDelta(dis, numShards);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,20 @@ public HollowReadStateEngine getStateEngine() {
return stateEngine;
}

protected void notifyListenerAboutDeltaChanges(GapEncodedVariableLengthIntegerReader removals, GapEncodedVariableLengthIntegerReader additions) {
protected void notifyListenerAboutDeltaChanges(GapEncodedVariableLengthIntegerReader removals, GapEncodedVariableLengthIntegerReader additions, int shardNumber, int numShards) {
for(HollowTypeStateListener stateListener : stateListeners) {
removals.reset();
int removedOrdinal = removals.nextElement();
while(removedOrdinal < Integer.MAX_VALUE) {
stateListener.removedOrdinal(removedOrdinal);
stateListener.removedOrdinal((removedOrdinal * numShards) + shardNumber);
removals.advance();
removedOrdinal = removals.nextElement();
}

additions.reset();
int addedOrdinal = additions.nextElement();
while(addedOrdinal < Integer.MAX_VALUE) {
stateListener.addedOrdinal(addedOrdinal);
stateListener.addedOrdinal((addedOrdinal * numShards) + shardNumber);
additions.advance();
addedOrdinal = additions.nextElement();
}
Expand Down Expand Up @@ -193,5 +193,10 @@ public HollowTypeReadState getTypeState() {
* @return an approximate accounting of the current cost of the "ordinal holes" in this type state.
*/
public abstract long getApproximateHoleCostInBytes();

/**
* @return The number of shards into which this type is split. Sharding is transparent, so this has no effect on normal usage.
*/
public abstract int numShards();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;

import com.netflix.hollow.core.util.RemovedOrdinalIterator;
import com.netflix.hollow.core.util.IntMap;
import com.netflix.hollow.core.memory.pool.WastefulRecycler;
Expand All @@ -33,10 +32,13 @@
public class HollowListDeltaHistoricalStateCreator {

private final HollowListTypeReadState typeState;
private final HollowListTypeDataElements stateEngineDataElements;
private final HollowListTypeDataElements stateEngineDataElements[];
private final HollowListTypeDataElements historicalDataElements;
private final RemovedOrdinalIterator iter;

private final int shardNumberMask;
private final int shardOrdinalShift;

private IntMap ordinalMapping;
private int nextOrdinal = 0;
private long nextStartElement = 0;
Expand All @@ -46,6 +48,8 @@ public HollowListDeltaHistoricalStateCreator(HollowListTypeReadState typeState)
this.stateEngineDataElements = typeState.currentDataElements();
this.historicalDataElements = new HollowListTypeDataElements(WastefulRecycler.DEFAULT_INSTANCE);
this.iter = new RemovedOrdinalIterator(typeState.getListener(PopulatedOrdinalListener.class));
this.shardNumberMask = stateEngineDataElements.length - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(stateEngineDataElements.length);
}

public void populateHistory() {
Expand All @@ -70,7 +74,7 @@ public IntMap getOrdinalMapping() {
}

public HollowListTypeReadState createHistoricalTypeReadState() {
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema());
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema(), 1);
historicalTypeState.setCurrentData(historicalDataElements);
return historicalTypeState;
}
Expand All @@ -89,18 +93,21 @@ private void populateStats() {
historicalDataElements.maxOrdinal = removedEntryCount - 1;
historicalDataElements.totalNumberOfElements = totalElementCount;
historicalDataElements.bitsPerListPointer = totalElementCount == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalElementCount);
historicalDataElements.bitsPerElement = stateEngineDataElements.bitsPerElement;
historicalDataElements.bitsPerElement = stateEngineDataElements[0].bitsPerElement;

ordinalMapping = new IntMap(removedEntryCount);
}

private void copyRecord(int ordinal) {
long bitsPerElement = stateEngineDataElements.bitsPerElement;
long fromStartElement = ordinal == 0 ? 0 : stateEngineDataElements.listPointerArray.getElementValue((long)(ordinal - 1) * stateEngineDataElements.bitsPerListPointer, stateEngineDataElements.bitsPerListPointer);
long fromEndElement = stateEngineDataElements.listPointerArray.getElementValue((long)ordinal * stateEngineDataElements.bitsPerListPointer, stateEngineDataElements.bitsPerListPointer);
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;

long bitsPerElement = stateEngineDataElements[shard].bitsPerElement;
long fromStartElement = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].listPointerArray.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long fromEndElement = stateEngineDataElements[shard].listPointerArray.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long size = fromEndElement - fromStartElement;

historicalDataElements.elementArray.copyBits(stateEngineDataElements.elementArray, bitsPerElement * fromStartElement, bitsPerElement * nextStartElement, size * bitsPerElement);
historicalDataElements.elementArray.copyBits(stateEngineDataElements[shard].elementArray, bitsPerElement * fromStartElement, bitsPerElement * nextStartElement, size * bitsPerElement);
historicalDataElements.listPointerArray.setElementValue(historicalDataElements.bitsPerListPointer * nextOrdinal, historicalDataElements.bitsPerListPointer, nextStartElement + size);

ordinalMapping.put(ordinal, nextOrdinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,28 @@ private void readFromStream(DataInputStream dis, boolean isDelta) throws IOExcep
elementArray = FixedLengthElementArray.deserializeFrom(dis, memoryRecycler);
}

static void discardFromStream(DataInputStream dis, boolean isDelta) throws IOException {
VarInt.readVInt(dis); /// max ordinal

if(isDelta) {
/// addition/removal ordinals
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
static void discardFromStream(DataInputStream dis, int numShards, boolean isDelta) throws IOException {
if(numShards > 1)
VarInt.readVInt(dis); /// max ordinal

for(int i=0;i<numShards;i++) {
VarInt.readVInt(dis); /// max ordinal

if(isDelta) {
/// addition/removal ordinals
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
}

/// statistics
VarInt.readVInt(dis);
VarInt.readVInt(dis);
VarInt.readVLong(dis);

/// fixed-length data
FixedLengthElementArray.discardFrom(dis);
FixedLengthElementArray.discardFrom(dis);
}

/// statistics
VarInt.readVInt(dis);
VarInt.readVInt(dis);
VarInt.readVLong(dis);

/// fixed-length data
FixedLengthElementArray.discardFrom(dis);
FixedLengthElementArray.discardFrom(dis);
}

public void applyDelta(HollowListTypeDataElements fromData, HollowListTypeDataElements deltaData) {
Expand Down
Loading

0 comments on commit 14adb52

Please sign in to comment.