From ec5dcb4fe45385ddac8c906a4ee0a2debe9ed6f9 Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Thu, 2 Nov 2023 11:30:54 -0700 Subject: [PATCH] Consumer delta application supports re-sharding for Object type (#644) * Object type read state supports resharding * Refactor object read state to apply consistency check on shards holder instead of data elements * Save the extra object allocation in read path * Reuse shards when possible for optimizing the worst case read time when resharding * Microbenchmarking per-read during delta update * Cleanup --------- Co-authored-by: Sunjeet Singh --- ...TypeReadStateDeltaTransitionBenchmark.java | 180 ++++++ .../core/read/engine/HollowBlobReader.java | 14 +- .../core/read/engine/HollowTypeReadState.java | 9 +- .../engine/list/HollowListTypeReadState.java | 11 +- .../engine/map/HollowMapTypeReadState.java | 11 +- .../object/HollowObjectDeltaApplicator.java | 31 +- ...llowObjectDeltaHistoricalStateCreator.java | 50 +- .../object/HollowObjectTypeReadState.java | 566 +++++++++++++++--- .../HollowObjectTypeReadStateShard.java | 365 +++-------- .../engine/set/HollowSetTypeReadState.java | 11 +- ...owObjectTypeDataElementsSplitJoinTest.java | 7 +- ...ollowObjectTypeDataElementsJoinerTest.java | 10 +- ...owObjectTypeDataElementsSplitJoinTest.java | 43 +- ...lowObjectTypeDataElementsSplitterTest.java | 6 +- .../object/HollowObjectTypeReadStateTest.java | 221 +++++++ 15 files changed, 1105 insertions(+), 430 deletions(-) create mode 100644 hollow-perf/src/jmh/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateDeltaTransitionBenchmark.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateTest.java diff --git a/hollow-perf/src/jmh/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateDeltaTransitionBenchmark.java b/hollow-perf/src/jmh/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateDeltaTransitionBenchmark.java new file mode 100644 index 0000000000..67a9e844f7 --- /dev/null +++ b/hollow-perf/src/jmh/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateDeltaTransitionBenchmark.java @@ -0,0 +1,180 @@ +package com.netflix.hollow.core.read.engine.object; + + +import com.netflix.hollow.core.read.dataaccess.HollowObjectTypeDataAccess; +import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.core.util.StateEngineRoundTripper; +import com.netflix.hollow.core.write.HollowWriteStateEngine; +import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@State(Scope.Thread) +@BenchmarkMode({Mode.All}) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Warmup(iterations = 1, time = 1) +@Measurement(iterations = 15, time = 1) +@Fork(1) +/** + * Runs delta transitions in the background while benchmarking reads. Re-sharding in delta transitions can be toggled with a param. + */ +public class HollowObjectTypeReadStateDeltaTransitionBenchmark { + HollowWriteStateEngine writeStateEngine; + HollowReadStateEngine readStateEngine; + HollowObjectTypeDataAccess dataAccess; + HollowObjectMapper objectMapper; + + int countStringsToRead = 500; + + @Param({ "true" }) + boolean isReshardingEnabled; + + @Param({ "500", "1000" }) + int shardSizeKBs; + + @Param({ "5", "100" }) + int maxStringLength; + + int countStringsDb = 100000; + + int deltaChanges = 2000; + + ArrayList readOrder; + + ExecutorService refreshExecutor; + Future reshardingFuture; + CountDownLatch doneBenchmark; + + final Random r = new Random(); + + @Setup(Level.Iteration) + public void setUp() throws ExecutionException, InterruptedException { + final List readStrings = new ArrayList<>(); + final Set readKeys = new HashSet<>(); + refreshExecutor = Executors.newSingleThreadExecutor(); + + refreshExecutor.submit(() -> { + writeStateEngine = new HollowWriteStateEngine(); + writeStateEngine.setTargetMaxTypeShardSize((long) shardSizeKBs * 1000l); + objectMapper = new HollowObjectMapper(writeStateEngine); + objectMapper.initializeTypeState(String.class); + + readOrder = new ArrayList<>(countStringsToRead); + for (int i = 0; i < countStringsToRead; i++) { + readOrder.add(r.nextInt(countStringsDb)); + } + readKeys.addAll(readOrder); + + for (int i = 0; i < countStringsDb; i++) { + StringBuilder sb = new StringBuilder(); + sb.append("string_"); + sb.append(i); + sb.append("_"); + int thisStringLength = r.nextInt(maxStringLength) - sb.length() + 1; + for (int j = 0; j < thisStringLength; j++) { + sb.append((char) (r.nextInt(26) + 'a')); + } + String s = sb.toString(); + objectMapper.add(s); + if (readKeys.contains(i)) { + readStrings.add(s); + } + } + + readStateEngine = new HollowReadStateEngine(); + try { + StateEngineRoundTripper.roundTripSnapshot(writeStateEngine, readStateEngine, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + dataAccess = (HollowObjectTypeDataAccess) readStateEngine.getTypeDataAccess("String", 0); + }).get(); + + doneBenchmark = new CountDownLatch(1); + reshardingFuture = refreshExecutor.submit(() -> { + Random r = new Random(); + long origShardSize = shardSizeKBs * 1000l; + long newShardSize = origShardSize; + do { + for (int i=0; i 0); + }); + } + + @TearDown(Level.Iteration) + public void tearDown() { + doneBenchmark.countDown(); + reshardingFuture.cancel(true); + refreshExecutor.shutdown(); + try { + if (!refreshExecutor.awaitTermination(1, TimeUnit.SECONDS)) { + refreshExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + refreshExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @Benchmark + public void testReadString(Blackhole bh) { + int j = r.nextInt(readOrder.size()); + String result = dataAccess.readString(j, 0); + bh.consume(result); + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java index 36ad0acef1..8c6bf60893 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java @@ -331,7 +331,7 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro } else { HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema; HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter); - populateTypeStateSnapshot(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema, numShards)); + populateTypeStateSnapshotWithNumShards(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema), numShards); } } else if (schema instanceof HollowListSchema) { if(!filter.includes(typeName)) { @@ -361,14 +361,22 @@ private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState t typeState.readSnapshot(in, stateEngine.getMemoryRecycler()); } + private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException { + if (numShards<=0 || ((numShards&(numShards-1))!=0)) { + throw new IllegalArgumentException("Number of shards must be a power of 2!"); + } + + stateEngine.addTypeState(typeState); + typeState.readSnapshot(in, stateEngine.getMemoryRecycler(), numShards); + } + private String readTypeStateDelta(HollowBlobInput in) throws IOException { HollowSchema schema = HollowSchema.readFrom(in); int numShards = readNumShards(in); - HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName()); if(typeState != null) { - typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler()); + typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards); } else { discardDelta(in, schema, numShards); } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java index b1dddb264d..3150ac174e 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java @@ -122,7 +122,14 @@ public BitSet getPreviousOrdinals() { public abstract int maxOrdinal(); public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException; - public abstract void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException; + + public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException; + + public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException; + + protected boolean shouldReshard(int currNumShards, int deltaNumShards) { + return currNumShards!=0 && deltaNumShards!=0 && currNumShards!=deltaNumShards; + } public HollowSchema getSchema() { return schema; diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java index 72b262eede..7ff29b01c3 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java @@ -73,6 +73,11 @@ public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode mem this.shards = shards; } + @Override + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot"); + } + @Override public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException { if(shards.length > 1) @@ -91,7 +96,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler } @Override - public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException { + public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException { + if (shouldReshard(shards.length, deltaNumShards)) { + throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName() + + ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards); + } if(shards.length > 1) maxOrdinal = VarInt.readVInt(in); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java index 87abc7e1a6..f0f5ca75c8 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java @@ -80,6 +80,11 @@ public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memo } + @Override + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot"); + } + @Override public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException { if(shards.length > 1) @@ -98,7 +103,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler } @Override - public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException { + public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException { + if (shouldReshard(shards.length, deltaNumShards)) { + throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName() + + ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards); + } if(shards.length > 1) maxOrdinal = VarInt.readVInt(in); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaApplicator.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaApplicator.java index ddea125b34..3e3b310b94 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaApplicator.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaApplicator.java @@ -16,6 +16,10 @@ */ package com.netflix.hollow.core.read.engine.object; +import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullField; +import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullFixedLengthField; +import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullVarLengthField; + import com.netflix.hollow.core.memory.SegmentedByteArray; import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray; import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; @@ -175,7 +179,7 @@ private void mergeOrdinal(int i) { long readStartBit = currentFromStateReadFixedLengthStartBit + from.bitOffsetPerField[fieldIndex]; copyRecordField(fieldIndex, fieldIndex, from, readStartBit, currentWriteFixedLengthStartBit, currentFromStateReadVarLengthDataPointers, currentWriteVarLengthDataPointers, removeData); } else if(target.varLengthData[fieldIndex] != null) { - writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers); + writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers); } } currentWriteFixedLengthStartBit += target.bitsPerField[fieldIndex]; @@ -193,7 +197,7 @@ private void mergeOrdinal(int i) { private void addFromDelta(boolean removeData, int fieldIndex, int deltaFieldIndex) { if(deltaFieldIndex == -1) { - writeNullField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers); + writeNullField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers); } else { long readStartBit = currentDeltaStateReadFixedLengthStartBit + delta.bitOffsetPerField[deltaFieldIndex]; copyRecordField(fieldIndex, deltaFieldIndex, delta, readStartBit, currentWriteFixedLengthStartBit, currentDeltaReadVarLengthDataPointers, currentWriteVarLengthDataPointers, false); @@ -214,7 +218,7 @@ private void copyRecordField(int fieldIndex, int fromFieldIndex, HollowObjectTyp if(target.varLengthData[fieldIndex] != null) { if((readValue & (1L << (copyFromData.bitsPerField[fromFieldIndex] - 1))) != 0) { - writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers); + writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers); } else { long readStart = currentReadVarLengthDataPointers[fieldIndex]; long length = readValue - readStart; @@ -228,28 +232,9 @@ private void copyRecordField(int fieldIndex, int fromFieldIndex, HollowObjectTyp } } else if(!removeData) { if(readValue == copyFromData.nullValueForField[fromFieldIndex]) - writeNullFixedLengthField(fieldIndex, currentWriteFixedLengthStartBit); + writeNullFixedLengthField(target, fieldIndex, currentWriteFixedLengthStartBit); else target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], readValue); } } - - private void writeNullField(int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) { - if(target.varLengthData[fieldIndex] != null) { - writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers); - } else { - writeNullFixedLengthField(fieldIndex, currentWriteFixedLengthStartBit); - } - } - - private void writeNullVarLengthField(int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) { - long writeValue = (1L << (target.bitsPerField[fieldIndex] - 1)) | currentWriteVarLengthDataPointers[fieldIndex]; - target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], writeValue); - } - - private void writeNullFixedLengthField(int fieldIndex, long currentWriteFixedLengthStartBit) { - target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], target.nullValueForField[fieldIndex]); - } - - } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java index 3d369a9ddb..7cca46d0ef 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java @@ -24,8 +24,10 @@ import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray; import com.netflix.hollow.core.memory.pool.WastefulRecycler; import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener; +import com.netflix.hollow.core.schema.HollowObjectSchema; import com.netflix.hollow.core.util.IntMap; import com.netflix.hollow.core.util.RemovedOrdinalIterator; +import java.util.Arrays; /** * This class contains the logic for extracting the removed records from an OBJECT type state @@ -37,11 +39,8 @@ public class HollowObjectDeltaHistoricalStateCreator { private final HollowObjectTypeDataElements historicalDataElements; - private final int shardNumberMask; - private final int shardOrdinalShift; - private HollowObjectTypeReadState typeState; - private HollowObjectTypeDataElements stateEngineDataElements[]; + private HollowObjectTypeReadState.ShardsHolder shardsHolder; private RemovedOrdinalIterator iter; private IntMap ordinalMapping; private int nextOrdinal; @@ -49,12 +48,10 @@ public class HollowObjectDeltaHistoricalStateCreator { public HollowObjectDeltaHistoricalStateCreator(HollowObjectTypeReadState typeState, boolean reverse) { this.typeState = typeState; - this.stateEngineDataElements = typeState.currentDataElements(); this.historicalDataElements = new HollowObjectTypeDataElements(typeState.getSchema(), WastefulRecycler.DEFAULT_INSTANCE); this.iter = new RemovedOrdinalIterator(typeState.getListener(PopulatedOrdinalListener.class), reverse); this.currentWriteVarLengthDataPointers = new long[typeState.getSchema().numFields()]; - this.shardNumberMask = stateEngineDataElements.length - 1; - this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(stateEngineDataElements.length); + this.shardsHolder = typeState.shardsVolatile; } public void populateHistory() { @@ -63,7 +60,7 @@ public void populateHistory() { historicalDataElements.fixedLengthData = new FixedLengthElementArray(historicalDataElements.memoryRecycler, (long)historicalDataElements.bitsPerRecord * (historicalDataElements.maxOrdinal + 1)); for(int i=0;i> shardOrdinalShift; - copyRecord(historicalDataElements, nextOrdinal, stateEngineDataElements[shard], shardOrdinal, currentWriteVarLengthDataPointers); + int whichShard = ordinal & shardsHolder.shardNumberMask; + int shardOrdinal = ordinal >> shardsHolder.shards[whichShard].shardOrdinalShift; + copyRecord(historicalDataElements, nextOrdinal, shardsHolder.shards[whichShard].dataElements, shardOrdinal, currentWriteVarLengthDataPointers); nextOrdinal++; ordinal = iter.next(); @@ -89,7 +86,7 @@ public void populateHistory() { */ public void dereferenceTypeState() { this.typeState = null; - this.stateEngineDataElements = null; + this.shardsHolder = null; this.iter = null; } @@ -98,26 +95,26 @@ public IntMap getOrdinalMapping() { } public HollowObjectTypeReadState createHistoricalTypeReadState() { - HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(null, typeState.getSchema()); - historicalTypeState.setCurrentData(historicalDataElements); + HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(typeState.getSchema(), historicalDataElements); + return historicalTypeState; } private void populateStats() { iter.reset(); int removedEntryCount = 0; - long totalVarLengthSizes[] = new long[stateEngineDataElements[0].varLengthData.length]; + long totalVarLengthSizes[] = new long[typeState.getSchema().numFields()]; int ordinal = iter.next(); while(ordinal != ORDINAL_NONE) { removedEntryCount++; - for(int i=0;i> shardOrdinalShift; - totalVarLengthSizes[i] += varLengthSize(stateEngineDataElements[shard], shardOrdinal, i); + for(int i=0;i> shardsHolder.shards[whichShard].shardOrdinalShift; + totalVarLengthSizes[i] += varLengthSize(shardsHolder.shards[whichShard].dataElements, shardOrdinal, i); } } @@ -126,9 +123,12 @@ private void populateStats() { historicalDataElements.maxOrdinal = removedEntryCount - 1; - for(int i=0;i shard.dataElements.bitsPerField[fieldIdx]) + .max(Integer::compare).get(); } else { historicalDataElements.bitsPerField[i] = (64 - Long.numberOfLeadingZeros(totalVarLengthSizes[i] + 1)) + 1; } @@ -140,4 +140,8 @@ private void populateStats() { ordinalMapping = new IntMap(removedEntryCount); } + + private boolean isVarLengthField(HollowObjectSchema.FieldType fieldType) { + return fieldType == HollowObjectSchema.FieldType.STRING || fieldType == HollowObjectSchema.FieldType.BYTES; + } } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java index ed8379e0e6..42e1274993 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java @@ -16,13 +16,17 @@ */ package com.netflix.hollow.core.read.engine.object; +import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; + import com.netflix.hollow.api.sampling.DisabledSamplingDirector; import com.netflix.hollow.api.sampling.HollowObjectSampler; import com.netflix.hollow.api.sampling.HollowSampler; import com.netflix.hollow.api.sampling.HollowSamplingDirector; +import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; import com.netflix.hollow.core.memory.encoding.VarInt; +import com.netflix.hollow.core.memory.encoding.ZigZag; import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler; import com.netflix.hollow.core.read.HollowBlobInput; import com.netflix.hollow.core.read.dataaccess.HollowObjectTypeDataAccess; @@ -32,8 +36,10 @@ import com.netflix.hollow.core.read.filter.HollowFilterConfig; import com.netflix.hollow.core.schema.HollowObjectSchema; import com.netflix.hollow.core.schema.HollowSchema; +import com.netflix.hollow.core.write.HollowObjectWriteRecord; import com.netflix.hollow.tools.checksum.HollowChecksum; import java.io.IOException; +import java.util.Arrays; import java.util.BitSet; /** @@ -43,32 +49,48 @@ public class HollowObjectTypeReadState extends HollowTypeReadState implements Ho private final HollowObjectSchema unfilteredSchema; private final HollowObjectSampler sampler; + private int maxOrdinal; + volatile ShardsHolder shardsVolatile; - private final int shardNumberMask; - private final int shardOrdinalShift; - private final HollowObjectTypeReadStateShard shards[]; + static class ShardsHolder { + final HollowObjectTypeReadStateShard shards[]; + final int shardNumberMask; - private int maxOrdinal; + private ShardsHolder(HollowObjectTypeReadStateShard[] fromShards) { + this.shards = fromShards; + this.shardNumberMask = fromShards.length - 1; + } - public HollowObjectTypeReadState(HollowReadStateEngine fileEngine, HollowObjectSchema schema) { - this(fileEngine, MemoryMode.ON_HEAP, schema, schema, 1); + private ShardsHolder(HollowObjectTypeReadStateShard[] oldShards, HollowObjectTypeReadStateShard newShard, int newShardIndex) { + int numShards = oldShards.length; + HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards]; + for (int i=0; i 1) + throw new IllegalStateException("Object type read state requires numShards when reading snapshot"); + } + + @Override + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + if(numShards > 1) maxOrdinal = VarInt.readVInt(in); - for(int i=0;i 1) + public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException { + if (shouldReshard(shardsVolatile.shards.length, deltaNumShards)) { + reshard(deltaNumShards); + } + if(shardsVolatile.shards.length > 1) maxOrdinal = VarInt.readVInt(in); - for(int i=0;i> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + long fixedLengthValue; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + fixedLengthValue = shard.readValue(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + switch(((HollowObjectSchema) schema).getFieldType(fieldIndex)) { + case BYTES: + case STRING: + int numBits = shard.dataElements.bitsPerField[fieldIndex]; + return (fixedLengthValue & (1L << (numBits - 1))) != 0; + case FLOAT: + return (int)fixedLengthValue == HollowObjectWriteRecord.NULL_FLOAT_BITS; + case DOUBLE: + return fixedLengthValue == HollowObjectWriteRecord.NULL_DOUBLE_BITS; + default: + return fixedLengthValue == shard.dataElements.nullValueForField[fieldIndex]; + } } @Override public int readOrdinal(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].readOrdinal(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + long refOrdinal; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + refOrdinal = shard.readOrdinal(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + if(refOrdinal == shard.dataElements.nullValueForField[fieldIndex]) + return ORDINAL_NONE; + return (int)refOrdinal; } @Override public int readInt(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].readInt(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + long value; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readInt(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + if(value == shard.dataElements.nullValueForField[fieldIndex]) + return Integer.MIN_VALUE; + return ZigZag.decodeInt((int)value); } @Override public float readFloat(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].readFloat(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + int value; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readFloat(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + if(value == HollowObjectWriteRecord.NULL_FLOAT_BITS) + return Float.NaN; + return Float.intBitsToFloat(value); } @Override public double readDouble(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].readDouble(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + long value; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readDouble(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + if(value == HollowObjectWriteRecord.NULL_DOUBLE_BITS) + return Double.NaN; + return Double.longBitsToDouble(value); } @Override public long readLong(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].readLong(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + long value; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readLong(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + if(value == shard.dataElements.nullValueForField[fieldIndex]) + return Long.MIN_VALUE; + return ZigZag.decodeLong(value); } @Override public Boolean readBoolean(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].readBoolean(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + long value; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + value = shard.readBoolean(ordinal >> shard.shardOrdinalShift, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + if(value == shard.dataElements.nullValueForField[fieldIndex]) + return null; + return value == 1 ? Boolean.TRUE : Boolean.FALSE; } @Override public byte[] readBytes(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].readBytes(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + byte[] result; + int numBitsForField; + long currentBitOffset; + long endByte; + long startByte; + int shardOrdinal; + + do { + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + shardOrdinal = ordinal >> shard.shardOrdinalShift; + + numBitsForField = shard.dataElements.bitsPerField[fieldIndex]; + currentBitOffset = shard.fieldOffset(shardOrdinal, fieldIndex); + endByte = shard.dataElements.fixedLengthData.getElementValue(currentBitOffset, numBitsForField); + startByte = shardOrdinal != 0 ? shard.dataElements.fixedLengthData.getElementValue(currentBitOffset - shard.dataElements.bitsPerRecord, numBitsForField) : 0; + } while (readWasUnsafe(shardsHolder, ordinal, shard)); + + result = shard.readBytes(startByte, endByte, numBitsForField, fieldIndex); + } while (readWasUnsafe(shardsHolder, ordinal, shard)); + + return result; } @Override public String readString(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].readString(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + String result; + int numBitsForField; + long currentBitOffset; + long endByte; + long startByte; + int shardOrdinal; + + do { + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + shardOrdinal = ordinal >> shard.shardOrdinalShift; + + numBitsForField = shard.dataElements.bitsPerField[fieldIndex]; + currentBitOffset = shard.fieldOffset(shardOrdinal, fieldIndex); + endByte = shard.dataElements.fixedLengthData.getElementValue(currentBitOffset, numBitsForField); + startByte = shardOrdinal != 0 ? shard.dataElements.fixedLengthData.getElementValue(currentBitOffset - shard.dataElements.bitsPerRecord, numBitsForField) : 0; + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + result = shard.readString(startByte, endByte, numBitsForField, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + return result; } @Override public boolean isStringFieldEqual(int ordinal, int fieldIndex, String testValue) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].isStringFieldEqual(ordinal >> shardOrdinalShift, fieldIndex, testValue); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + boolean result; + int numBitsForField; + long currentBitOffset; + long endByte; + long startByte; + int shardOrdinal; + + do { + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + shardOrdinal = ordinal >> shard.shardOrdinalShift; + + numBitsForField = shard.dataElements.bitsPerField[fieldIndex]; + currentBitOffset = shard.fieldOffset(shardOrdinal, fieldIndex); + endByte = shard.dataElements.fixedLengthData.getElementValue(currentBitOffset, numBitsForField); + startByte = shardOrdinal != 0 ? shard.dataElements.fixedLengthData.getElementValue(currentBitOffset - shard.dataElements.bitsPerRecord, numBitsForField) : 0; + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + result = shard.isStringFieldEqual(startByte, endByte, numBitsForField, fieldIndex, testValue); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + return result; } @Override public int findVarLengthFieldHashCode(int ordinal, int fieldIndex) { sampler.recordFieldAccess(fieldIndex); - return shards[ordinal & shardNumberMask].findVarLengthFieldHashCode(ordinal >> shardOrdinalShift, fieldIndex); + + HollowObjectTypeReadState.ShardsHolder shardsHolder; + HollowObjectTypeReadStateShard shard; + int hashCode; + int numBitsForField; + long currentBitOffset; + long endByte; + long startByte; + int shardOrdinal; + + do { + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + shardOrdinal = ordinal >> shard.shardOrdinalShift; + + numBitsForField = shard.dataElements.bitsPerField[fieldIndex]; + currentBitOffset = shard.fieldOffset(shardOrdinal, fieldIndex); + endByte = shard.dataElements.fixedLengthData.getElementValue(currentBitOffset, numBitsForField); + startByte = shardOrdinal != 0 ? shard.dataElements.fixedLengthData.getElementValue(currentBitOffset - shard.dataElements.bitsPerRecord, numBitsForField) : 0; + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + hashCode = shard.findVarLengthFieldHashCode(startByte, endByte, numBitsForField, fieldIndex); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + return hashCode; + } + + private boolean readWasUnsafe(ShardsHolder shardsHolder, int ordinal, HollowObjectTypeReadStateShard shard) { + // Use a load (acquire) fence to constrain the compiler reordering prior plain loads so + // that they cannot "float down" below the volatile load of shardsVolatile. + // This ensures data is checked against current shard holder *after* optimistic calculations + // have been performed on data. + // + // Note: the Java Memory Model allows for the reordering of plain loads and stores + // before a volatile load (those plain loads and stores can "float down" below the + // volatile load), but forbids the reordering of plain loads after a volatile load + // (those plain loads are not allowed to "float above" the volatile load). + // Similar reordering also applies to plain loads and stores and volatile stores. + // In effect the ordering of volatile loads and stores is retained and plain loads + // and stores can be shuffled around and grouped together, which increases + // optimization opportunities. + // This is why locks can be coarsened; plain loads and stores may enter the lock region + // from above (float down the acquire) or below (float above the release) but existing + // loads and stores may not exit (a "lock roach motel" and why there is almost universal + // misunderstanding of, and many misguided attempts to optimize, the infamous double + // checked locking idiom). + // + // Note: the fence provides stronger ordering guarantees than a corresponding non-plain + // load or store since the former affects all prior or subsequent loads and stores, + // whereas the latter is scoped to the particular load or store. + // + // For more details see http://gee.cs.oswego.edu/dl/html/j9mm.html + // [Comment credit: Paul Sandoz] + // + HollowUnsafeHandle.getUnsafe().loadFence(); + ShardsHolder currShardsHolder = shardsVolatile; + // Validate against the underlying shard so that, during a delta application that involves re-sharding the worst + // case no. of times a read will be invalidated is 3: when shards are expanded or truncated, when a shard is affected + // by a split or join, and finally when delta is applied to a shard. If only shardsHolder was checked here, the + // worst-case scenario could lead to read invalidation (numShards+2) times: once for shards expansion/truncation, o + // nce for split/join on any shard, and then once when delta is applied. + return shardsHolder != currShardsHolder + && (shard != currShardsHolder.shards[ordinal & currShardsHolder.shardNumberMask]); } /** @@ -228,6 +653,7 @@ public int findVarLengthFieldHashCode(int ordinal, int fieldIndex) { * @return the number of bits required for the field */ public int bitsRequiredForField(String fieldName) { + final HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards; int maxBitsRequiredForField = shards[0].bitsRequiredForField(fieldName); for(int i=1;i 1) - throw new UnsupportedOperationException("Cannot directly set data on sharded type state"); - shards[0].setCurrentData(data); - maxOrdinal = data.maxOrdinal; - } @Override public int numShards() { - return shards.length; + return this.shardsVolatile.shards.length; } } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateShard.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateShard.java index 5cfd1aa1db..f631b139d9 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateShard.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateShard.java @@ -19,13 +19,10 @@ import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; import com.netflix.hollow.core.memory.ByteData; -import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.encoding.HashCodes; import com.netflix.hollow.core.memory.encoding.VarInt; -import com.netflix.hollow.core.memory.encoding.ZigZag; import com.netflix.hollow.core.schema.HollowObjectSchema; import com.netflix.hollow.core.schema.HollowSchema; -import com.netflix.hollow.core.write.HollowObjectWriteRecord; import com.netflix.hollow.tools.checksum.HollowChecksum; import java.util.ArrayList; import java.util.Arrays; @@ -34,266 +31,111 @@ import java.util.List; class HollowObjectTypeReadStateShard { - - private volatile HollowObjectTypeDataElements currentDataVolatile; + final HollowObjectTypeDataElements dataElements; + final int shardOrdinalShift; private final HollowObjectSchema schema; - - HollowObjectTypeReadStateShard(HollowObjectSchema schema) { + + HollowObjectTypeReadStateShard(HollowObjectSchema schema, HollowObjectTypeDataElements dataElements, int shardOrdinalShift) { this.schema = schema; + this.shardOrdinalShift = shardOrdinalShift; + this.dataElements = dataElements; } - public boolean isNull(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long fixedLengthValue; - - do { - currentData = this.currentDataVolatile; - - long bitOffset = fieldOffset(currentData, ordinal, fieldIndex); - int numBitsForField = currentData.bitsPerField[fieldIndex]; - - fixedLengthValue = numBitsForField <= 56 ? - currentData.fixedLengthData.getElementValue(bitOffset, numBitsForField) - : currentData.fixedLengthData.getLargeElementValue(bitOffset, numBitsForField); - } while(readWasUnsafe(currentData)); - - switch(schema.getFieldType(fieldIndex)) { - case BYTES: - case STRING: - int numBits = currentData.bitsPerField[fieldIndex]; - return (fixedLengthValue & (1L << (numBits - 1))) != 0; - case FLOAT: - return (int)fixedLengthValue == HollowObjectWriteRecord.NULL_FLOAT_BITS; - case DOUBLE: - return fixedLengthValue == HollowObjectWriteRecord.NULL_DOUBLE_BITS; - default: - return fixedLengthValue == currentData.nullValueForField[fieldIndex]; - } + public long readValue(int ordinal, int fieldIndex) { + long bitOffset = fieldOffset(ordinal, fieldIndex); + int numBitsForField = dataElements.bitsPerField[fieldIndex]; + return numBitsForField <= 56 ? + dataElements.fixedLengthData.getElementValue(bitOffset, numBitsForField) + : dataElements.fixedLengthData.getLargeElementValue(bitOffset, numBitsForField); } - public int readOrdinal(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long refOrdinal; - - do { - currentData = this.currentDataVolatile; - refOrdinal = readFixedLengthFieldValue(currentData, ordinal, fieldIndex); - } while(readWasUnsafe(currentData)); - - if(refOrdinal == currentData.nullValueForField[fieldIndex]) - return ORDINAL_NONE; - return (int)refOrdinal; + public long readOrdinal(int ordinal, int fieldIndex) { + return readFixedLengthFieldValue(ordinal, fieldIndex); } - public int readInt(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long value; - - do { - currentData = this.currentDataVolatile; - value = readFixedLengthFieldValue(currentData, ordinal, fieldIndex); - } while(readWasUnsafe(currentData)); - - if(value == currentData.nullValueForField[fieldIndex]) - return Integer.MIN_VALUE; - return ZigZag.decodeInt((int)value); + public long readInt(int ordinal, int fieldIndex) { + return readFixedLengthFieldValue(ordinal, fieldIndex); } - public float readFloat(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - int value; - - do { - currentData = this.currentDataVolatile; - value = (int)readFixedLengthFieldValue(currentData, ordinal, fieldIndex); - } while(readWasUnsafe(currentData)); - - if(value == HollowObjectWriteRecord.NULL_FLOAT_BITS) - return Float.NaN; - return Float.intBitsToFloat(value); + public int readFloat(int ordinal, int fieldIndex) { + return (int)readFixedLengthFieldValue(ordinal, fieldIndex); } - public double readDouble(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long value; - - do { - currentData = this.currentDataVolatile; - long bitOffset = fieldOffset(currentData, ordinal, fieldIndex); - value = currentData.fixedLengthData.getLargeElementValue(bitOffset, 64, -1L); - } while(readWasUnsafe(currentData)); - - if(value == HollowObjectWriteRecord.NULL_DOUBLE_BITS) - return Double.NaN; - return Double.longBitsToDouble(value); + public long readDouble(int ordinal, int fieldIndex) { + long bitOffset = fieldOffset(ordinal, fieldIndex); + return dataElements.fixedLengthData.getLargeElementValue(bitOffset, 64, -1L); } public long readLong(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long value; - - do { - currentData = this.currentDataVolatile; - long bitOffset = fieldOffset(currentData, ordinal, fieldIndex); - int numBitsForField = currentData.bitsPerField[fieldIndex]; - value = currentData.fixedLengthData.getLargeElementValue(bitOffset, numBitsForField); - } while(readWasUnsafe(currentData)); - - if(value == currentData.nullValueForField[fieldIndex]) - return Long.MIN_VALUE; - return ZigZag.decodeLong(value); + long bitOffset = fieldOffset(ordinal, fieldIndex); + int numBitsForField = dataElements.bitsPerField[fieldIndex]; + return dataElements.fixedLengthData.getLargeElementValue(bitOffset, numBitsForField); } - public Boolean readBoolean(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; - long value; - - do { - currentData = this.currentDataVolatile; - value = readFixedLengthFieldValue(currentData, ordinal, fieldIndex); - } while(readWasUnsafe(currentData)); - - if(value == currentData.nullValueForField[fieldIndex]) - return null; - return value == 1 ? Boolean.TRUE : Boolean.FALSE; + public long readBoolean(int ordinal, int fieldIndex) { + return readFixedLengthFieldValue(ordinal, fieldIndex); } - private long readFixedLengthFieldValue(HollowObjectTypeDataElements currentData, int ordinal, int fieldIndex) { - long bitOffset = fieldOffset(currentData, ordinal, fieldIndex); - int numBitsForField = currentData.bitsPerField[fieldIndex]; + private long readFixedLengthFieldValue(int ordinal, int fieldIndex) { + long bitOffset = fieldOffset(ordinal, fieldIndex); + int numBitsForField = dataElements.bitsPerField[fieldIndex]; - long value = currentData.fixedLengthData.getElementValue(bitOffset, numBitsForField); + long value = dataElements.fixedLengthData.getElementValue(bitOffset, numBitsForField); return value; } - public byte[] readBytes(int ordinal, int fieldIndex) { - HollowObjectTypeDataElements currentData; + public byte[] readBytes(long startByte, long endByte, int numBitsForField, int fieldIndex) { byte[] result; - do { - int numBitsForField; - long endByte; - long startByte; - - do { - currentData = this.currentDataVolatile; - - numBitsForField = currentData.bitsPerField[fieldIndex]; - long currentBitOffset = fieldOffset(currentData, ordinal, fieldIndex); - endByte = currentData.fixedLengthData.getElementValue(currentBitOffset, numBitsForField); - startByte = ordinal != 0 ? currentData.fixedLengthData.getElementValue(currentBitOffset - currentData.bitsPerRecord, numBitsForField) : 0; - } while(readWasUnsafe(currentData)); - - if((endByte & (1L << numBitsForField - 1)) != 0) - return null; + if((endByte & (1L << numBitsForField - 1)) != 0) + return null; - startByte &= (1L << numBitsForField - 1) - 1; + startByte &= (1L << numBitsForField - 1) - 1; - int length = (int)(endByte - startByte); - result = new byte[length]; - for(int i=0;i - * - * @param str - * @param out - * @return */ private static final ThreadLocal chararr = ThreadLocal.withInitial(() -> new char[100]); @@ -350,48 +188,12 @@ private boolean testStringEquality(ByteData data, long position, int length, Str return position == endPosition && count == testValue.length(); } - void invalidate() { - setCurrentData(null); - } - - HollowObjectTypeDataElements currentDataElements() { - return currentDataVolatile; - } - - private boolean readWasUnsafe(HollowObjectTypeDataElements data) { - // Use a load (acquire) fence to constrain the compiler reordering prior plain loads so - // that they cannot "float down" below the volatile load of currentDataVolatile. - // This ensures data is checked against currentData *after* optimistic calculations - // have been performed on data. - // - // Note: the Java Memory Model allows for the reordering of plain loads and stores - // before a volatile load (those plain loads and stores can "float down" below the - // volatile load), but forbids the reordering of plain loads after a volatile load - // (those plain loads are not allowed to "float above" the volatile load). - // Similar reordering also applies to plain loads and stores and volatile stores. - // In effect the ordering of volatile loads and stores is retained and plain loads - // and stores can be shuffled around and grouped together, which increases - // optimization opportunities. - // This is why locks can be coarsened; plain loads and stores may enter the lock region - // from above (float down the acquire) or below (float above the release) but existing - // loads and stores may not exit (a "lock roach motel" and why there is almost universal - // misunderstanding of, and many misguided attempts to optimize, the infamous double - // checked locking idiom). - // - // Note: the fence provides stronger ordering guarantees than a corresponding non-plain - // load or store since the former affects all prior or subsequent loads and stores, - // whereas the latter is scoped to the particular load or store. - // - // For more details see http://gee.cs.oswego.edu/dl/html/j9mm.html - HollowUnsafeHandle.getUnsafe().loadFence(); - return data != currentDataVolatile; - } - - void setCurrentData(HollowObjectTypeDataElements data) { - this.currentDataVolatile = data; - } + protected void applyShardToChecksum(HollowChecksum checksum, HollowSchema withSchema, BitSet populatedOrdinals, int shardNumber, int shardNumberMask) { + int numBitsForField; + long bitOffset; + long endByte; + long startByte; - protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema, BitSet populatedOrdinals, int shardNumber, int numShards) { if(!(withSchema instanceof HollowObjectSchema)) throw new IllegalArgumentException("HollowObjectTypeReadState can only calculate checksum with a HollowObjectSchema: " + schema.getName()); @@ -407,27 +209,28 @@ protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema, fieldIndexes[i] = schema.getPosition(commonFieldNames.get(i)); } - HollowObjectTypeDataElements currentData = currentDataVolatile; int ordinal = populatedOrdinals.nextSetBit(0); while(ordinal != ORDINAL_NONE) { - if((ordinal & (numShards - 1)) == shardNumber) { - int shardOrdinal = ordinal / numShards; + if((ordinal & shardNumberMask) == shardNumber) { + int shardOrdinal = ordinal >> shardOrdinalShift; checksum.applyInt(ordinal); for(int i=0;i 1) @@ -99,7 +104,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler } @Override - public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException { + public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException { + if (shouldReshard(shards.length, deltaNumShards)) { + throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName() + + ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards); + } if(shards.length > 1) maxOrdinal = VarInt.readVInt(in); diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java index d72d08cae9..1f1e5657d9 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java @@ -92,11 +92,14 @@ protected HollowObjectTypeReadState populateTypeStateWithFilter(int numRecords) } protected void assertDataUnchanged(int numRecords) { + assertDataUnchanged((HollowObjectTypeReadState) readStateEngine.getTypeState("TestObject"), numRecords); + } + + protected void assertDataUnchanged(HollowObjectTypeReadState typeState, int numRecords) { for(int i=0;i> shardOrdinalShift; + shardOrdinals[shardIndex][shardOrdinal] = ordinal; + } + + for (int shardIndex=0; shardIndex shardingFactor(0, 1)); + assertIllegalStateException(() -> shardingFactor(2, 0)); + assertIllegalStateException(() -> shardingFactor(1, 1)); + assertIllegalStateException(() -> shardingFactor(1, -1)); + assertIllegalStateException(() -> shardingFactor(2, 3)); + } + + @Test + public void testResharding() throws Exception { + + for (int shardingFactor : new int[]{2, 4, 8, 16}) // 32, 64, 128, 256, 512, 1024... + { + for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(1000)) + { + HollowObjectTypeReadState objectTypeReadState = populateTypeStateWith(numRecords); + assertDataUnchanged(objectTypeReadState, numRecords); + + // Splitting shards + { + int prevShardCount = objectTypeReadState.numShards(); + int newShardCount = shardingFactor * prevShardCount; + objectTypeReadState.reshard(newShardCount); + + assertEquals(newShardCount, objectTypeReadState.numShards()); + assertEquals(newShardCount, shardingFactor * prevShardCount); + } + assertDataUnchanged(objectTypeReadState, numRecords); + + // Joining shards + { + int prevShardCount = objectTypeReadState.numShards(); + int newShardCount = prevShardCount / shardingFactor; + objectTypeReadState.reshard(newShardCount); + + assertEquals(newShardCount, objectTypeReadState.numShards()); + assertEquals(shardingFactor * newShardCount, prevShardCount); + } + assertDataUnchanged(objectTypeReadState, numRecords); + } + } + } + + @Test + public void testReshardingWithFilter() throws Exception { + + for (int shardingFactor : new int[]{2, 64}) + { + for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(10000)) + { + HollowObjectTypeReadState objectTypeReadState = populateTypeStateWithFilter(numRecords); + assertDataUnchanged(objectTypeReadState, numRecords); + + // Splitting shards + { + int prevShardCount = objectTypeReadState.numShards(); + int newShardCount = shardingFactor * prevShardCount; + objectTypeReadState.reshard(newShardCount); + + assertEquals(newShardCount, objectTypeReadState.numShards()); + assertEquals(newShardCount, shardingFactor * prevShardCount); + } + assertDataUnchanged(objectTypeReadState, numRecords); + + // Joining shards + { + int prevShardCount = objectTypeReadState.numShards(); + int newShardCount = prevShardCount / shardingFactor; + objectTypeReadState.reshard(newShardCount); + + assertEquals(newShardCount, objectTypeReadState.numShards()); + assertEquals(shardingFactor * newShardCount, prevShardCount); + } + assertDataUnchanged(objectTypeReadState, numRecords); + } + } + } + + @Test + public void testReshardingIntermediateStages_expandWithOriginalDataElements() throws Exception { + for (int shardingFactor : new int[]{2, 4}) { + for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(5000)) + { + HollowObjectTypeReadState expectedTypeState = populateTypeStateWith(numRecords); + + HollowObjectTypeReadState.ShardsHolder original = expectedTypeState.shardsVolatile; + HollowObjectTypeReadState.ShardsHolder expanded = expectedTypeState.expandWithOriginalDataElements(original, shardingFactor); + + HollowObjectTypeReadState actualTypeState = new HollowObjectTypeReadState(readStateEngine, MemoryMode.ON_HEAP, schema, schema); + actualTypeState.shardsVolatile = expanded; + + assertEquals(shardingFactor * expectedTypeState.numShards(), actualTypeState.numShards()); + assertDataUnchanged(actualTypeState, numRecords); + } + } + } + + @Test + public void testReshardingIntermediateStages_splitDataElementsForOneShard() throws Exception { + for (int shardingFactor : new int[]{2, 4}) { + for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(5000)) + { + HollowObjectTypeReadState typeState = populateTypeStateWith(numRecords); + + HollowObjectTypeReadState.ShardsHolder originalShardsHolder = typeState.shardsVolatile; + int originalNumShards = typeState.numShards(); + + // expand shards + typeState.shardsVolatile = typeState.expandWithOriginalDataElements(originalShardsHolder, shardingFactor); + + for(int i=0; i invocation) { + try { + invocation.get(); + Assert.fail(); + } catch (IllegalStateException e) { + // expected + } + } +}