Skip to content

Commit

Permalink
Consumer delta application supports re-sharding for Object type (#644)
Browse files Browse the repository at this point in the history
* Object type read state supports resharding

* Refactor object read state to apply consistency check on shards holder instead of data elements

* Save the extra object allocation in read path

* Reuse shards when possible for optimizing the worst case read time when resharding

* Microbenchmarking per-read during delta update

* Cleanup

---------

Co-authored-by: Sunjeet Singh <[email protected]>
  • Loading branch information
Sunjeet and Sunjeet authored Nov 2, 2023
1 parent 1cdd463 commit ec5dcb4
Show file tree
Hide file tree
Showing 15 changed files with 1,105 additions and 430 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package com.netflix.hollow.core.read.engine.object;


import com.netflix.hollow.core.read.dataaccess.HollowObjectTypeDataAccess;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import com.netflix.hollow.core.util.StateEngineRoundTripper;
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@State(Scope.Thread)
@BenchmarkMode({Mode.All})
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 15, time = 1)
@Fork(1)
/**
* Runs delta transitions in the background while benchmarking reads. Re-sharding in delta transitions can be toggled with a param.
*/
public class HollowObjectTypeReadStateDeltaTransitionBenchmark {
HollowWriteStateEngine writeStateEngine;
HollowReadStateEngine readStateEngine;
HollowObjectTypeDataAccess dataAccess;
HollowObjectMapper objectMapper;

int countStringsToRead = 500;

@Param({ "true" })
boolean isReshardingEnabled;

@Param({ "500", "1000" })
int shardSizeKBs;

@Param({ "5", "100" })
int maxStringLength;

int countStringsDb = 100000;

int deltaChanges = 2000;

ArrayList<Integer> readOrder;

ExecutorService refreshExecutor;
Future<?> reshardingFuture;
CountDownLatch doneBenchmark;

final Random r = new Random();

@Setup(Level.Iteration)
public void setUp() throws ExecutionException, InterruptedException {
final List<String> readStrings = new ArrayList<>();
final Set<Integer> readKeys = new HashSet<>();
refreshExecutor = Executors.newSingleThreadExecutor();

refreshExecutor.submit(() -> {
writeStateEngine = new HollowWriteStateEngine();
writeStateEngine.setTargetMaxTypeShardSize((long) shardSizeKBs * 1000l);
objectMapper = new HollowObjectMapper(writeStateEngine);
objectMapper.initializeTypeState(String.class);

readOrder = new ArrayList<>(countStringsToRead);
for (int i = 0; i < countStringsToRead; i++) {
readOrder.add(r.nextInt(countStringsDb));
}
readKeys.addAll(readOrder);

for (int i = 0; i < countStringsDb; i++) {
StringBuilder sb = new StringBuilder();
sb.append("string_");
sb.append(i);
sb.append("_");
int thisStringLength = r.nextInt(maxStringLength) - sb.length() + 1;
for (int j = 0; j < thisStringLength; j++) {
sb.append((char) (r.nextInt(26) + 'a'));
}
String s = sb.toString();
objectMapper.add(s);
if (readKeys.contains(i)) {
readStrings.add(s);
}
}

readStateEngine = new HollowReadStateEngine();
try {
StateEngineRoundTripper.roundTripSnapshot(writeStateEngine, readStateEngine, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
dataAccess = (HollowObjectTypeDataAccess) readStateEngine.getTypeDataAccess("String", 0);
}).get();

doneBenchmark = new CountDownLatch(1);
reshardingFuture = refreshExecutor.submit(() -> {
Random r = new Random();
long origShardSize = shardSizeKBs * 1000l;
long newShardSize = origShardSize;
do {
for (int i=0; i<readStrings.size(); i++) {
objectMapper.add(readStrings.get(i));
}
for (int i = 0; i < deltaChanges; i++) {
int changeKey = r.nextInt(countStringsDb);
if (readKeys.contains(changeKey)) {
continue;
}
StringBuilder sb = new StringBuilder();
sb.append("string_");
sb.append(changeKey);
sb.append("_");
int thisStringLength = r.nextInt(maxStringLength) - sb.length() + 1;
for (int j = 0; j < thisStringLength; j++) {
sb.append((char) (r.nextInt(26) + 'a'));
}
objectMapper.add(sb.toString());
}

try {
if (isReshardingEnabled) {
if (newShardSize == origShardSize) {
newShardSize = origShardSize / 10;
} else {
newShardSize = origShardSize;
}
writeStateEngine.setTargetMaxTypeShardSize(newShardSize);
}
StateEngineRoundTripper.roundTripDelta(writeStateEngine, readStateEngine);
} catch (IOException e) {
throw new RuntimeException(e);
}
} while (doneBenchmark.getCount() > 0);
});
}

@TearDown(Level.Iteration)
public void tearDown() {
doneBenchmark.countDown();
reshardingFuture.cancel(true);
refreshExecutor.shutdown();
try {
if (!refreshExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
refreshExecutor.shutdownNow();
}
} catch (InterruptedException e) {
refreshExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}

@Benchmark
public void testReadString(Blackhole bh) {
int j = r.nextInt(readOrder.size());
String result = dataAccess.readString(j, 0);
bh.consume(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro
} else {
HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema;
HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter);
populateTypeStateSnapshot(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema), numShards);
}
} else if (schema instanceof HollowListSchema) {
if(!filter.includes(typeName)) {
Expand Down Expand Up @@ -361,14 +361,22 @@ private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState t
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}

private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException {
if (numShards<=0 || ((numShards&(numShards-1))!=0)) {
throw new IllegalArgumentException("Number of shards must be a power of 2!");
}

stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler(), numShards);
}

private String readTypeStateDelta(HollowBlobInput in) throws IOException {
HollowSchema schema = HollowSchema.readFrom(in);

int numShards = readNumShards(in);

HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler());
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards);
} else {
discardDelta(in, schema, numShards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,14 @@ public BitSet getPreviousOrdinals() {
public abstract int maxOrdinal();

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;
public abstract void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException;

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
return currNumShards!=0 && deltaNumShards!=0 && currNumShards!=deltaNumShards;
}

public HollowSchema getSchema() {
return schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode mem
this.shards = shards;
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
Expand All @@ -91,7 +96,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
}

@Override
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException {
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException {
if (shouldReshard(shards.length, deltaNumShards)) {
throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName()
+ ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards);
}
if(shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memo

}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
Expand All @@ -98,7 +103,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
}

@Override
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException {
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException {
if (shouldReshard(shards.length, deltaNumShards)) {
throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName()
+ ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards);
}
if(shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.netflix.hollow.core.read.engine.object;

import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullField;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullFixedLengthField;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullVarLengthField;

import com.netflix.hollow.core.memory.SegmentedByteArray;
import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
Expand Down Expand Up @@ -175,7 +179,7 @@ private void mergeOrdinal(int i) {
long readStartBit = currentFromStateReadFixedLengthStartBit + from.bitOffsetPerField[fieldIndex];
copyRecordField(fieldIndex, fieldIndex, from, readStartBit, currentWriteFixedLengthStartBit, currentFromStateReadVarLengthDataPointers, currentWriteVarLengthDataPointers, removeData);
} else if(target.varLengthData[fieldIndex] != null) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
}
}
currentWriteFixedLengthStartBit += target.bitsPerField[fieldIndex];
Expand All @@ -193,7 +197,7 @@ private void mergeOrdinal(int i) {

private void addFromDelta(boolean removeData, int fieldIndex, int deltaFieldIndex) {
if(deltaFieldIndex == -1) {
writeNullField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
long readStartBit = currentDeltaStateReadFixedLengthStartBit + delta.bitOffsetPerField[deltaFieldIndex];
copyRecordField(fieldIndex, deltaFieldIndex, delta, readStartBit, currentWriteFixedLengthStartBit, currentDeltaReadVarLengthDataPointers, currentWriteVarLengthDataPointers, false);
Expand All @@ -214,7 +218,7 @@ private void copyRecordField(int fieldIndex, int fromFieldIndex, HollowObjectTyp

if(target.varLengthData[fieldIndex] != null) {
if((readValue & (1L << (copyFromData.bitsPerField[fromFieldIndex] - 1))) != 0) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
long readStart = currentReadVarLengthDataPointers[fieldIndex];
long length = readValue - readStart;
Expand All @@ -228,28 +232,9 @@ private void copyRecordField(int fieldIndex, int fromFieldIndex, HollowObjectTyp
}
} else if(!removeData) {
if(readValue == copyFromData.nullValueForField[fromFieldIndex])
writeNullFixedLengthField(fieldIndex, currentWriteFixedLengthStartBit);
writeNullFixedLengthField(target, fieldIndex, currentWriteFixedLengthStartBit);
else
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], readValue);
}
}

private void writeNullField(int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
if(target.varLengthData[fieldIndex] != null) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
writeNullFixedLengthField(fieldIndex, currentWriteFixedLengthStartBit);
}
}

private void writeNullVarLengthField(int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
long writeValue = (1L << (target.bitsPerField[fieldIndex] - 1)) | currentWriteVarLengthDataPointers[fieldIndex];
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], writeValue);
}

private void writeNullFixedLengthField(int fieldIndex, long currentWriteFixedLengthStartBit) {
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], target.nullValueForField[fieldIndex]);
}


}
Loading

0 comments on commit ec5dcb4

Please sign in to comment.