From fee0eaac0c54067ae76b2233aa86ea4a092717ef Mon Sep 17 00:00:00 2001 From: Sunjeet Singh Date: Wed, 18 Oct 2023 07:41:19 -0700 Subject: [PATCH] Refactor object read state to apply consistency check on shards holder instead of data elements --- .../core/read/engine/HollowBlobReader.java | 11 +- .../core/read/engine/HollowTypeReadState.java | 2 + .../engine/list/HollowListTypeReadState.java | 5 + .../engine/map/HollowMapTypeReadState.java | 5 + ...llowObjectDeltaHistoricalStateCreator.java | 50 +-- .../object/HollowObjectTypeReadState.java | 329 ++++++++++----- .../HollowObjectTypeReadStateShard.java | 376 +++++------------- .../engine/set/HollowSetTypeReadState.java | 5 + ...owObjectTypeDataElementsSplitJoinTest.java | 7 +- ...ollowObjectTypeDataElementsJoinerTest.java | 10 +- ...owObjectTypeDataElementsSplitJoinTest.java | 43 +- ...lowObjectTypeDataElementsSplitterTest.java | 6 +- .../object/HollowObjectTypeReadStateTest.java | 30 +- 13 files changed, 430 insertions(+), 449 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java index 3cf97b5c84..8c6bf60893 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java @@ -331,7 +331,7 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro } else { HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema; HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter); - populateTypeStateSnapshot(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema, numShards)); + populateTypeStateSnapshotWithNumShards(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema), numShards); } } else if (schema instanceof HollowListSchema) { if(!filter.includes(typeName)) { @@ -361,6 +361,15 @@ private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState t typeState.readSnapshot(in, stateEngine.getMemoryRecycler()); } + private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException { + if (numShards<=0 || ((numShards&(numShards-1))!=0)) { + throw new IllegalArgumentException("Number of shards must be a power of 2!"); + } + + stateEngine.addTypeState(typeState); + typeState.readSnapshot(in, stateEngine.getMemoryRecycler(), numShards); + } + private String readTypeStateDelta(HollowBlobInput in) throws IOException { HollowSchema schema = HollowSchema.readFrom(in); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java index 882bf6caad..3150ac174e 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java @@ -123,6 +123,8 @@ public BitSet getPreviousOrdinals() { public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException; + public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException; + public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException; protected boolean shouldReshard(int currNumShards, int deltaNumShards) { diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java index df5f08ee4f..7ff29b01c3 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java @@ -73,6 +73,11 @@ public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode mem this.shards = shards; } + @Override + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot"); + } + @Override public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException { if(shards.length > 1) diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java index 4946682809..f0f5ca75c8 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java @@ -80,6 +80,11 @@ public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memo } + @Override + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot"); + } + @Override public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException { if(shards.length > 1) diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java index 3d369a9ddb..7cca46d0ef 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java @@ -24,8 +24,10 @@ import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray; import com.netflix.hollow.core.memory.pool.WastefulRecycler; import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener; +import com.netflix.hollow.core.schema.HollowObjectSchema; import com.netflix.hollow.core.util.IntMap; import com.netflix.hollow.core.util.RemovedOrdinalIterator; +import java.util.Arrays; /** * This class contains the logic for extracting the removed records from an OBJECT type state @@ -37,11 +39,8 @@ public class HollowObjectDeltaHistoricalStateCreator { private final HollowObjectTypeDataElements historicalDataElements; - private final int shardNumberMask; - private final int shardOrdinalShift; - private HollowObjectTypeReadState typeState; - private HollowObjectTypeDataElements stateEngineDataElements[]; + private HollowObjectTypeReadState.ShardsHolder shardsHolder; private RemovedOrdinalIterator iter; private IntMap ordinalMapping; private int nextOrdinal; @@ -49,12 +48,10 @@ public class HollowObjectDeltaHistoricalStateCreator { public HollowObjectDeltaHistoricalStateCreator(HollowObjectTypeReadState typeState, boolean reverse) { this.typeState = typeState; - this.stateEngineDataElements = typeState.currentDataElements(); this.historicalDataElements = new HollowObjectTypeDataElements(typeState.getSchema(), WastefulRecycler.DEFAULT_INSTANCE); this.iter = new RemovedOrdinalIterator(typeState.getListener(PopulatedOrdinalListener.class), reverse); this.currentWriteVarLengthDataPointers = new long[typeState.getSchema().numFields()]; - this.shardNumberMask = stateEngineDataElements.length - 1; - this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(stateEngineDataElements.length); + this.shardsHolder = typeState.shardsVolatile; } public void populateHistory() { @@ -63,7 +60,7 @@ public void populateHistory() { historicalDataElements.fixedLengthData = new FixedLengthElementArray(historicalDataElements.memoryRecycler, (long)historicalDataElements.bitsPerRecord * (historicalDataElements.maxOrdinal + 1)); for(int i=0;i> shardOrdinalShift; - copyRecord(historicalDataElements, nextOrdinal, stateEngineDataElements[shard], shardOrdinal, currentWriteVarLengthDataPointers); + int whichShard = ordinal & shardsHolder.shardNumberMask; + int shardOrdinal = ordinal >> shardsHolder.shards[whichShard].shardOrdinalShift; + copyRecord(historicalDataElements, nextOrdinal, shardsHolder.shards[whichShard].dataElements, shardOrdinal, currentWriteVarLengthDataPointers); nextOrdinal++; ordinal = iter.next(); @@ -89,7 +86,7 @@ public void populateHistory() { */ public void dereferenceTypeState() { this.typeState = null; - this.stateEngineDataElements = null; + this.shardsHolder = null; this.iter = null; } @@ -98,26 +95,26 @@ public IntMap getOrdinalMapping() { } public HollowObjectTypeReadState createHistoricalTypeReadState() { - HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(null, typeState.getSchema()); - historicalTypeState.setCurrentData(historicalDataElements); + HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(typeState.getSchema(), historicalDataElements); + return historicalTypeState; } private void populateStats() { iter.reset(); int removedEntryCount = 0; - long totalVarLengthSizes[] = new long[stateEngineDataElements[0].varLengthData.length]; + long totalVarLengthSizes[] = new long[typeState.getSchema().numFields()]; int ordinal = iter.next(); while(ordinal != ORDINAL_NONE) { removedEntryCount++; - for(int i=0;i> shardOrdinalShift; - totalVarLengthSizes[i] += varLengthSize(stateEngineDataElements[shard], shardOrdinal, i); + for(int i=0;i> shardsHolder.shards[whichShard].shardOrdinalShift; + totalVarLengthSizes[i] += varLengthSize(shardsHolder.shards[whichShard].dataElements, shardOrdinal, i); } } @@ -126,9 +123,12 @@ private void populateStats() { historicalDataElements.maxOrdinal = removedEntryCount - 1; - for(int i=0;i shard.dataElements.bitsPerField[fieldIdx]) + .max(Integer::compare).get(); } else { historicalDataElements.bitsPerField[i] = (64 - Long.numberOfLeadingZeros(totalVarLengthSizes[i] + 1)) + 1; } @@ -140,4 +140,8 @@ private void populateStats() { ordinalMapping = new IntMap(removedEntryCount); } + + private boolean isVarLengthField(HollowObjectSchema.FieldType fieldType) { + return fieldType == HollowObjectSchema.FieldType.STRING || fieldType == HollowObjectSchema.FieldType.BYTES; + } } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java index 0df712aeb3..707ef3a77d 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java @@ -16,13 +16,17 @@ */ package com.netflix.hollow.core.read.engine.object; +import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; + import com.netflix.hollow.api.sampling.DisabledSamplingDirector; import com.netflix.hollow.api.sampling.HollowObjectSampler; import com.netflix.hollow.api.sampling.HollowSampler; import com.netflix.hollow.api.sampling.HollowSamplingDirector; +import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; import com.netflix.hollow.core.memory.encoding.VarInt; +import com.netflix.hollow.core.memory.encoding.ZigZag; import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler; import com.netflix.hollow.core.read.HollowBlobInput; import com.netflix.hollow.core.read.dataaccess.HollowObjectTypeDataAccess; @@ -32,6 +36,7 @@ import com.netflix.hollow.core.read.filter.HollowFilterConfig; import com.netflix.hollow.core.schema.HollowObjectSchema; import com.netflix.hollow.core.schema.HollowSchema; +import com.netflix.hollow.core.write.HollowObjectWriteRecord; import com.netflix.hollow.tools.checksum.HollowChecksum; import java.io.IOException; import java.util.Arrays; @@ -44,51 +49,53 @@ public class HollowObjectTypeReadState extends HollowTypeReadState implements Ho private final HollowObjectSchema unfilteredSchema; private final HollowObjectSampler sampler; + private int maxOrdinal; + volatile ShardsHolder shardsVolatile; static class ShardsHolder { final HollowObjectTypeReadStateShard shards[]; final int shardNumberMask; - private ShardsHolder(HollowSchema schema, int numShards) { + private ShardsHolder(HollowSchema schema, HollowObjectTypeDataElements[] dataElements, int[] shardOrdinalShifts) { + int numShards = dataElements.length; HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards]; - int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); - for(int i=0; i 1) + throw new IllegalStateException("Object type read state requires numShards when reading snapshot"); + } + + @Override + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + if(numShards > 1) maxOrdinal = VarInt.readVInt(in); - for(int i=0; i> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); - return result; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + fixedLengthValue = shard.isNull(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + switch(((HollowObjectSchema) schema).getFieldType(fieldIndex)) { + case BYTES: + case STRING: + int numBits = shard.dataElements.bitsPerField[fieldIndex]; + return (fixedLengthValue & (1L << (numBits - 1))) != 0; + case FLOAT: + return (int)fixedLengthValue == HollowObjectWriteRecord.NULL_FLOAT_BITS; + case DOUBLE: + return fixedLengthValue == HollowObjectWriteRecord.NULL_DOUBLE_BITS; + default: + return fixedLengthValue == shard.dataElements.nullValueForField[fieldIndex]; + } } @Override public int readOrdinal(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; - int result; + HollowObjectTypeReadStateShard shard; + long refOrdinal; do { shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.readOrdinal(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); - return result; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + refOrdinal = shard.readOrdinal(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + if(refOrdinal == shard.dataElements.nullValueForField[fieldIndex]) + return ORDINAL_NONE; + return (int)refOrdinal; } @Override public int readInt(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; - int result; + HollowObjectTypeReadStateShard shard; + long value; do { shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.readInt(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); - return result; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readInt(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + if(value == shard.dataElements.nullValueForField[fieldIndex]) + return Integer.MIN_VALUE; + return ZigZag.decodeInt((int)value); } @Override public float readFloat(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; - float result; + HollowObjectTypeReadStateShard shard; + int value; do { shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.readFloat(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); - return result; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readFloat(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + if(value == HollowObjectWriteRecord.NULL_FLOAT_BITS) + return Float.NaN; + return Float.intBitsToFloat(value); } @Override public double readDouble(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; - double result; + HollowObjectTypeReadStateShard shard; + long value; do { shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.readDouble(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); - return result; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readDouble(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + if(value == HollowObjectWriteRecord.NULL_DOUBLE_BITS) + return Double.NaN; + return Double.longBitsToDouble(value); } @Override public long readLong(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; - long result; + HollowObjectTypeReadStateShard shard; + long value; do { shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.readLong(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); - return result; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readLong(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + if(value == shard.dataElements.nullValueForField[fieldIndex]) + return Long.MIN_VALUE; + return ZigZag.decodeLong(value); } @Override public Boolean readBoolean(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; - Boolean result; + HollowObjectTypeReadStateShard shard; + long value; do { shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.readBoolean(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); - return result; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readBoolean(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + if(value == shard.dataElements.nullValueForField[fieldIndex]) + return null; + return value == 1 ? Boolean.TRUE : Boolean.FALSE; } @Override public byte[] readBytes(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + HollowObjectTypeReadStateShard.VarLenStats stats; byte[] result; do { - shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.readBytes(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + stats = shard.readVarLenStats(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while (readWasUnsafe(shardsHolder)); + + result = shard.readBytes(stats, fieldIndex); + } while (readWasUnsafe(shardsHolder)); + return result; } @Override public String readString(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + HollowObjectTypeReadStateShard.VarLenStats stats; String result; do { - shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.readString(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + stats = shard.readVarLenStats(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + result = shard.readString(stats, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + return result; } @Override public boolean isStringFieldEqual(int ordinal, int fieldIndex, String testValue) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + HollowObjectTypeReadStateShard.VarLenStats stats; boolean result; do { - shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - result = shard.isStringFieldEqual(ordinal >> shard.shardOrdinalShift, fieldIndex, testValue); - } while(shardsHolder != this.shardsVolatile); + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + stats = shard.readVarLenStats(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + result = shard.isStringFieldEqual(stats, fieldIndex, testValue); + } while(readWasUnsafe(shardsHolder)); + return result; } @Override public int findVarLengthFieldHashCode(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + HollowObjectTypeReadStateShard.VarLenStats stats; int hashCode; do { - shardsHolder = this.shardsVolatile; - HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; - hashCode = shard.findVarLengthFieldHashCode(ordinal >> shard.shardOrdinalShift, fieldIndex); - } while(shardsHolder != this.shardsVolatile); + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + stats = shard.readVarLenStats(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + + hashCode = shard.findVarLengthFieldHashCode(stats, fieldIndex); + } while(readWasUnsafe(shardsHolder)); + return hashCode; } + private boolean readWasUnsafe(ShardsHolder shardsHolder) { + // Use a load (acquire) fence to constrain the compiler reordering prior plain loads so + // that they cannot "float down" below the volatile load of shardsVolatile. + // This ensures data is checked against current shard holder *after* optimistic calculations + // have been performed on data. + // + // Note: the Java Memory Model allows for the reordering of plain loads and stores + // before a volatile load (those plain loads and stores can "float down" below the + // volatile load), but forbids the reordering of plain loads after a volatile load + // (those plain loads are not allowed to "float above" the volatile load). + // Similar reordering also applies to plain loads and stores and volatile stores. + // In effect the ordering of volatile loads and stores is retained and plain loads + // and stores can be shuffled around and grouped together, which increases + // optimization opportunities. + // This is why locks can be coarsened; plain loads and stores may enter the lock region + // from above (float down the acquire) or below (float above the release) but existing + // loads and stores may not exit (a "lock roach motel" and why there is almost universal + // misunderstanding of, and many misguided attempts to optimize, the infamous double + // checked locking idiom). + // + // Note: the fence provides stronger ordering guarantees than a corresponding non-plain + // load or store since the former affects all prior or subsequent loads and stores, + // whereas the latter is scoped to the particular load or store. + // + // For more details see http://gee.cs.oswego.edu/dl/html/j9mm.html + // [Comment credit: Paul Sandoz] + // + HollowUnsafeHandle.getUnsafe().loadFence(); + return shardsHolder != shardsVolatile; + } + /** * Warning: Not thread-safe. Should only be called within the update thread. * @param fieldName the field name @@ -523,10 +649,14 @@ public HollowSampler getSampler() { @Override protected void invalidate() { - HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards; stateListeners = EMPTY_LISTENERS; - for(int i=0;i shard.dataElements) + .toArray(HollowObjectTypeDataElements[]::new); } @Override @@ -565,7 +692,7 @@ protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema) BitSet populatedOrdinals = getPopulatedOrdinals(); for(int i=0;i 1) - throw new UnsupportedOperationException("Cannot directly set data on sharded type state"); - shards[0].setCurrentData(data); - maxOrdinal = data.maxOrdinal; - } @Override public int numShards() { diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateShard.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateShard.java index aecc9f3d65..52d9152e8b 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateShard.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateShard.java @@ -19,13 +19,10 @@ import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; import com.netflix.hollow.core.memory.ByteData; -import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.encoding.HashCodes; import com.netflix.hollow.core.memory.encoding.VarInt; -import com.netflix.hollow.core.memory.encoding.ZigZag; import com.netflix.hollow.core.schema.HollowObjectSchema; import com.netflix.hollow.core.schema.HollowSchema; -import com.netflix.hollow.core.write.HollowObjectWriteRecord; import com.netflix.hollow.tools.checksum.HollowChecksum; import java.util.ArrayList; import java.util.Arrays; @@ -34,268 +31,149 @@ import java.util.List; class HollowObjectTypeReadStateShard { - - private volatile HollowObjectTypeDataElements currentDataVolatile; + final HollowObjectTypeDataElements dataElements; final int shardOrdinalShift; private final HollowObjectSchema schema; - - HollowObjectTypeReadStateShard(HollowObjectSchema schema, int shardOrdinalShift) { + + HollowObjectTypeReadStateShard(HollowObjectSchema schema, HollowObjectTypeDataElements dataElements, int shardOrdinalShift) { this.schema = schema; this.shardOrdinalShift = shardOrdinalShift; + this.dataElements = dataElements; } - public boolean isNull(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long fixedLengthValue; - - do { - currentData = this.currentDataVolatile; - - long bitOffset = fieldOffset(currentData, ordinal, fieldIndex); - int numBitsForField = currentData.bitsPerField[fieldIndex]; - - fixedLengthValue = numBitsForField <= 56 ? - currentData.fixedLengthData.getElementValue(bitOffset, numBitsForField) - : currentData.fixedLengthData.getLargeElementValue(bitOffset, numBitsForField); - } while(readWasUnsafe(currentData)); - - switch(schema.getFieldType(fieldIndex)) { - case BYTES: - case STRING: - int numBits = currentData.bitsPerField[fieldIndex]; - return (fixedLengthValue & (1L << (numBits - 1))) != 0; - case FLOAT: - return (int)fixedLengthValue == HollowObjectWriteRecord.NULL_FLOAT_BITS; - case DOUBLE: - return fixedLengthValue == HollowObjectWriteRecord.NULL_DOUBLE_BITS; - default: - return fixedLengthValue == currentData.nullValueForField[fieldIndex]; - } + public long isNull(int ordinal, int fieldIndex) { + long bitOffset = fieldOffset(ordinal, fieldIndex); + int numBitsForField = dataElements.bitsPerField[fieldIndex]; + return numBitsForField <= 56 ? + dataElements.fixedLengthData.getElementValue(bitOffset, numBitsForField) + : dataElements.fixedLengthData.getLargeElementValue(bitOffset, numBitsForField); } - public int readOrdinal(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long refOrdinal; - - do { - currentData = this.currentDataVolatile; - refOrdinal = readFixedLengthFieldValue(currentData, ordinal, fieldIndex); - } while(readWasUnsafe(currentData)); - - if(refOrdinal == currentData.nullValueForField[fieldIndex]) - return ORDINAL_NONE; - return (int)refOrdinal; + public long readOrdinal(int ordinal, int fieldIndex) { + return readFixedLengthFieldValue(ordinal, fieldIndex); } - public int readInt(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long value; - - do { - currentData = this.currentDataVolatile; - value = readFixedLengthFieldValue(currentData, ordinal, fieldIndex); - } while(readWasUnsafe(currentData)); - - if(value == currentData.nullValueForField[fieldIndex]) - return Integer.MIN_VALUE; - return ZigZag.decodeInt((int)value); + public long readInt(int ordinal, int fieldIndex) { + return readFixedLengthFieldValue(ordinal, fieldIndex); } - public float readFloat(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - int value; - - do { - currentData = this.currentDataVolatile; - value = (int)readFixedLengthFieldValue(currentData, ordinal, fieldIndex); - } while(readWasUnsafe(currentData)); - - if(value == HollowObjectWriteRecord.NULL_FLOAT_BITS) - return Float.NaN; - return Float.intBitsToFloat(value); + public int readFloat(int ordinal, int fieldIndex) { + return (int)readFixedLengthFieldValue(ordinal, fieldIndex); } - public double readDouble(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long value; - - do { - currentData = this.currentDataVolatile; - long bitOffset = fieldOffset(currentData, ordinal, fieldIndex); - value = currentData.fixedLengthData.getLargeElementValue(bitOffset, 64, -1L); - } while(readWasUnsafe(currentData)); - - if(value == HollowObjectWriteRecord.NULL_DOUBLE_BITS) - return Double.NaN; - return Double.longBitsToDouble(value); + public long readDouble(int ordinal, int fieldIndex) { + long bitOffset = fieldOffset(ordinal, fieldIndex); + return dataElements.fixedLengthData.getLargeElementValue(bitOffset, 64, -1L); } public long readLong(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long value; - - do { - currentData = this.currentDataVolatile; - long bitOffset = fieldOffset(currentData, ordinal, fieldIndex); - int numBitsForField = currentData.bitsPerField[fieldIndex]; - value = currentData.fixedLengthData.getLargeElementValue(bitOffset, numBitsForField); - } while(readWasUnsafe(currentData)); - - if(value == currentData.nullValueForField[fieldIndex]) - return Long.MIN_VALUE; - return ZigZag.decodeLong(value); + long bitOffset = fieldOffset(ordinal, fieldIndex); + int numBitsForField = dataElements.bitsPerField[fieldIndex]; + return dataElements.fixedLengthData.getLargeElementValue(bitOffset, numBitsForField); } - public Boolean readBoolean(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long value; - - do { - currentData = this.currentDataVolatile; - value = readFixedLengthFieldValue(currentData, ordinal, fieldIndex); - } while(readWasUnsafe(currentData)); - - if(value == currentData.nullValueForField[fieldIndex]) - return null; - return value == 1 ? Boolean.TRUE : Boolean.FALSE; + public long readBoolean(int ordinal, int fieldIndex) { + return readFixedLengthFieldValue(ordinal, fieldIndex); } - private long readFixedLengthFieldValue(HollowObjectTypeDataElements currentData, int ordinal, int fieldIndex) { - long bitOffset = fieldOffset(currentData, ordinal, fieldIndex); - int numBitsForField = currentData.bitsPerField[fieldIndex]; + private long readFixedLengthFieldValue(int ordinal, int fieldIndex) { + long bitOffset = fieldOffset(ordinal, fieldIndex); + int numBitsForField = dataElements.bitsPerField[fieldIndex]; - long value = currentData.fixedLengthData.getElementValue(bitOffset, numBitsForField); + long value = dataElements.fixedLengthData.getElementValue(bitOffset, numBitsForField); return value; } - public byte[] readBytes(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - byte[] result; - - do { - int numBitsForField; - long endByte; - long startByte; + static class VarLenStats { + final int numBitsForField; + final long startByte; + final long endByte; - do { - currentData = this.currentDataVolatile; - - numBitsForField = currentData.bitsPerField[fieldIndex]; - long currentBitOffset = fieldOffset(currentData, ordinal, fieldIndex); - endByte = currentData.fixedLengthData.getElementValue(currentBitOffset, numBitsForField); - startByte = ordinal != 0 ? currentData.fixedLengthData.getElementValue(currentBitOffset - currentData.bitsPerRecord, numBitsForField) : 0; - } while(readWasUnsafe(currentData)); - - if((endByte & (1L << numBitsForField - 1)) != 0) - return null; - - startByte &= (1L << numBitsForField - 1) - 1; + public VarLenStats(int numBitsForField, long startByte, long endByte) { + this.numBitsForField = numBitsForField; + this.startByte = startByte; + this.endByte = endByte; + } + } - int length = (int)(endByte - startByte); - result = new byte[length]; - for(int i=0;i - * - * @param str - * @param out - * @return */ private static final ThreadLocal chararr = ThreadLocal.withInitial(() -> new char[100]); @@ -352,52 +226,12 @@ private boolean testStringEquality(ByteData data, long position, int length, Str return position == endPosition && count == testValue.length(); } - void invalidate() { - setCurrentData(null); - } - - HollowObjectTypeDataElements currentDataElements() { - return currentDataVolatile; - } - - private boolean readWasUnsafe(HollowObjectTypeDataElements data) { - // Use a load (acquire) fence to constrain the compiler reordering prior plain loads so - // that they cannot "float down" below the volatile load of currentDataVolatile. - // This ensures data is checked against currentData *after* optimistic calculations - // have been performed on data. - // - // Note: the Java Memory Model allows for the reordering of plain loads and stores - // before a volatile load (those plain loads and stores can "float down" below the - // volatile load), but forbids the reordering of plain loads after a volatile load - // (those plain loads are not allowed to "float above" the volatile load). - // Similar reordering also applies to plain loads and stores and volatile stores. - // In effect the ordering of volatile loads and stores is retained and plain loads - // and stores can be shuffled around and grouped together, which increases - // optimization opportunities. - // This is why locks can be coarsened; plain loads and stores may enter the lock region - // from above (float down the acquire) or below (float above the release) but existing - // loads and stores may not exit (a "lock roach motel" and why there is almost universal - // misunderstanding of, and many misguided attempts to optimize, the infamous double - // checked locking idiom). - // - // Note: the fence provides stronger ordering guarantees than a corresponding non-plain - // load or store since the former affects all prior or subsequent loads and stores, - // whereas the latter is scoped to the particular load or store. - // - // For more details see http://gee.cs.oswego.edu/dl/html/j9mm.html - HollowUnsafeHandle.getUnsafe().loadFence(); - return data != currentDataVolatile; - } - - void setCurrentData(HollowObjectTypeDataElements data) { - this.currentDataVolatile = data; - } - - protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema, BitSet populatedOrdinals, int shardNumber, int shardNumberMask) { + protected void applyShardToChecksum(HollowChecksum checksum, HollowSchema withSchema, BitSet populatedOrdinals, int shardNumber, int shardNumberMask) { if(!(withSchema instanceof HollowObjectSchema)) throw new IllegalArgumentException("HollowObjectTypeReadState can only calculate checksum with a HollowObjectSchema: " + schema.getName()); HollowObjectSchema commonSchema = schema.findCommonSchema((HollowObjectSchema)withSchema); + VarLenStats stats; List commonFieldNames = new ArrayList(); for(int i=0;i 1) diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java index d72d08cae9..1f1e5657d9 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java @@ -92,11 +92,14 @@ protected HollowObjectTypeReadState populateTypeStateWithFilter(int numRecords) } protected void assertDataUnchanged(int numRecords) { + assertDataUnchanged((HollowObjectTypeReadState) readStateEngine.getTypeState("TestObject"), numRecords); + } + + protected void assertDataUnchanged(HollowObjectTypeReadState typeState, int numRecords) { for(int i=0;i invocation) { try { invocation.get();