From f72303eb8b6e950c9ed6b96610ea1b3403aded6d Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Wed, 11 Oct 2023 04:59:02 -0700 Subject: [PATCH] HollowObjectTypeDataElements utilities for splitting and joining (#642) * HollowObjectTypeDataElements utilities for splitting and joining * Assume non-uniform field width across shards, add tests for filtered fields * Restore private modifier, cleanup, more test * Work in tandem with isSkipTypeShardUpdateWithNoAdditions --------- Co-authored-by: Sunjeet Singh --- .../core/memory/FixedLengthDataFactory.java | 8 + ...GapEncodedVariableLengthIntegerReader.java | 105 +++++++++++ ...llowObjectDeltaHistoricalStateCreator.java | 69 ++----- .../object/HollowObjectTypeDataElements.java | 61 +++++++ .../HollowObjectTypeDataElementsJoiner.java | 114 ++++++++++++ .../HollowObjectTypeDataElementsSplitter.java | 96 ++++++++++ ...ncodedVariableLengthIntegerReaderTest.java | 75 +++++++- ...owObjectTypeDataElementsSplitJoinTest.java | 106 +++++++++++ ...ollowObjectTypeDataElementsJoinerTest.java | 171 ++++++++++++++++++ ...owObjectTypeDataElementsSplitJoinTest.java | 120 ++++++++++++ ...lowObjectTypeDataElementsSplitterTest.java | 41 +++++ 11 files changed, 905 insertions(+), 61 deletions(-) create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsJoiner.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitter.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsJoinerTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitterTest.java diff --git a/hollow/src/main/java/com/netflix/hollow/core/memory/FixedLengthDataFactory.java b/hollow/src/main/java/com/netflix/hollow/core/memory/FixedLengthDataFactory.java index c835aaa81c..6923740d6f 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/memory/FixedLengthDataFactory.java +++ b/hollow/src/main/java/com/netflix/hollow/core/memory/FixedLengthDataFactory.java @@ -22,6 +22,14 @@ public static FixedLengthData get(HollowBlobInput in, MemoryMode memoryMode, Arr } } + public static FixedLengthData get(long numBits, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) { + if (memoryMode.equals(MemoryMode.ON_HEAP)) { + return new FixedLengthElementArray(memoryRecycler, numBits); + } else { + throw new UnsupportedOperationException("Memory mode " + memoryMode.name() + " not supported"); + } + } + public static void destroy(FixedLengthData fld, ArraySegmentRecycler memoryRecycler) { if (fld instanceof FixedLengthElementArray) { ((FixedLengthElementArray) fld).destroy(memoryRecycler); diff --git a/hollow/src/main/java/com/netflix/hollow/core/memory/encoding/GapEncodedVariableLengthIntegerReader.java b/hollow/src/main/java/com/netflix/hollow/core/memory/encoding/GapEncodedVariableLengthIntegerReader.java index 9ee3bc85b8..12aaaae718 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/memory/encoding/GapEncodedVariableLengthIntegerReader.java +++ b/hollow/src/main/java/com/netflix/hollow/core/memory/encoding/GapEncodedVariableLengthIntegerReader.java @@ -19,11 +19,15 @@ import com.netflix.hollow.core.memory.ByteDataArray; import com.netflix.hollow.core.memory.SegmentedByteArray; import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler; +import com.netflix.hollow.core.memory.pool.WastefulRecycler; import com.netflix.hollow.core.read.HollowBlobInput; import com.netflix.hollow.core.util.IOUtils; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; public class GapEncodedVariableLengthIntegerReader { @@ -140,4 +144,105 @@ public static GapEncodedVariableLengthIntegerReader combine(GapEncodedVariableLe return new GapEncodedVariableLengthIntegerReader(arr.getUnderlyingArray(), (int)arr.length()); } + + /** + * Splits this {@code GapEncodedVariableLengthIntegerReader} into {@code numSplits} new instances. + * The original data is not cleaned up. + * + * @param numSplits the number of instances to split into, should be a power of 2. + * @return an array of {@code GapEncodedVariableLengthIntegerReader} instances populated with the results of the split. + */ + public GapEncodedVariableLengthIntegerReader[] split(int numSplits) { + if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) { + throw new IllegalStateException("Split should only be called with powers of 2, it was called with " + numSplits); + } + final int toMask = numSplits - 1; + final int toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits); + GapEncodedVariableLengthIntegerReader[] to = new GapEncodedVariableLengthIntegerReader[numSplits]; + + List ordinals = new ArrayList<>(); + reset(); + while(nextElement() != Integer.MAX_VALUE) { + ordinals.add(nextElement()); + advance(); + } + + ByteDataArray[] splitOrdinals = new ByteDataArray[numSplits]; + int previousSplitOrdinal[] = new int[numSplits]; + for (int ordinal : ordinals) { + int toIndex = ordinal & toMask; + int toOrdinal = ordinal >> toOrdinalShift; + if (splitOrdinals[toIndex] == null) { + splitOrdinals[toIndex] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE); + } + VarInt.writeVInt(splitOrdinals[toIndex], toOrdinal - previousSplitOrdinal[toIndex]); + previousSplitOrdinal[toIndex] = toOrdinal; + } + for(int i=0;i[] fromOrdinals = new HashSet[from.length]; + for (int i=0;i(); + if (from[i] == null) { + continue; + } + from[i].reset(); + + while(from[i].nextElement() != Integer.MAX_VALUE) { + int splitOrdinal = from[i].nextElement(); + fromOrdinals[i].add(splitOrdinal); + joinedMaxOrdinal = Math.max(joinedMaxOrdinal, splitOrdinal*numSplits + i); + from[i].advance(); + } + } + + ByteDataArray toRemovals = null; + int previousOrdinal = 0; + for (int ordinal=0;ordinal<=joinedMaxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + if (fromOrdinals[fromIndex].contains(fromOrdinal)) { + if (toRemovals == null) { + toRemovals = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE); + } + VarInt.writeVInt(toRemovals, ordinal - previousOrdinal); + previousOrdinal = ordinal; + } + } + + if (toRemovals == null) { + return EMPTY_READER; + } else { + return new GapEncodedVariableLengthIntegerReader(toRemovals.getUnderlyingArray(), (int) toRemovals.length()); + } + } } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java index bd7cd844ff..3d369a9ddb 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java @@ -17,6 +17,8 @@ package com.netflix.hollow.core.read.engine.object; import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; +import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.copyRecord; +import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.varLengthSize; import com.netflix.hollow.core.memory.SegmentedByteArray; import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray; @@ -71,7 +73,11 @@ public void populateHistory() { int ordinal = iter.next(); while(ordinal != ORDINAL_NONE) { ordinalMapping.put(ordinal, nextOrdinal); - copyRecord(ordinal); + + int shard = ordinal & shardNumberMask; + int shardOrdinal = ordinal >> shardOrdinalShift; + copyRecord(historicalDataElements, nextOrdinal, stateEngineDataElements[shard], shardOrdinal, currentWriteVarLengthDataPointers); + nextOrdinal++; ordinal = iter.next(); } @@ -109,7 +115,9 @@ private void populateStats() { for(int i=0;i> shardOrdinalShift; + totalVarLengthSizes[i] += varLengthSize(stateEngineDataElements[shard], shardOrdinal, i); } } @@ -125,66 +133,11 @@ private void populateStats() { historicalDataElements.bitsPerField[i] = (64 - Long.numberOfLeadingZeros(totalVarLengthSizes[i] + 1)) + 1; } - historicalDataElements.nullValueForField[i] = (1L << historicalDataElements.bitsPerField[i]) - 1; + historicalDataElements.nullValueForField[i] = historicalDataElements.bitsPerField[i] == 64 ? -1L : (1L << historicalDataElements.bitsPerField[i]) - 1; historicalDataElements.bitOffsetPerField[i] = historicalDataElements.bitsPerRecord; historicalDataElements.bitsPerRecord += historicalDataElements.bitsPerField[i]; } ordinalMapping = new IntMap(removedEntryCount); } - - private long varLengthSize(int ordinal, int fieldIdx) { - int shard = ordinal & shardNumberMask; - int shardOrdinal = ordinal >> shardOrdinalShift; - - int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx]; - long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * shardOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx]; - long endByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1; - long startByte = shardOrdinal != 0 ? stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset - stateEngineDataElements[shard].bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1 : 0; - - return endByte - startByte; - } - - private void copyRecord(int ordinal) { - int shard = ordinal & shardNumberMask; - int shardOrdinal = ordinal >> shardOrdinalShift; - - for(int i=0;i> fromOrdinalShift; + + if (fromOrdinal <= from[fromIndex].maxOrdinal) { + copyRecord(to, ordinal, from[fromIndex], fromOrdinal, currentWriteVarLengthDataPointers); + } else { + // lopsided shards could result for consumers that skip type shards with no additions + writeNullRecord(to, ordinal, currentWriteVarLengthDataPointers); + } + } + + return to; + } + + private void writeNullRecord(HollowObjectTypeDataElements to, int toOrdinal, long[] currentWriteVarLengthDataPointers) { + for(int fieldIndex=0;fieldIndex> toOrdinalShift; + copyRecord(to[toIndex], toOrdinal, from, i, currentWriteVarLengthDataPointers[toIndex]); + } + return to; + } + + private void populateStats(HollowObjectTypeDataElements[] to, HollowObjectTypeDataElements from, int toMask, int toOrdinalShift) { + long[][] varLengthSizes = new long[to.length][from.schema.numFields()]; + + for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) { + int toIndex = ordinal & toMask; + int toOrdinal = ordinal >> toOrdinalShift; + to[toIndex].maxOrdinal = toOrdinal; + for(int fieldIdx=0;fieldIdx 3*4+2 + + GapEncodedVariableLengthIntegerReader[] empties = new GapEncodedVariableLengthIntegerReader[] {EMPTY_READER, EMPTY_READER}; + GapEncodedVariableLengthIntegerReader joinedEmpties = GapEncodedVariableLengthIntegerReader.join(empties); + assertEquals(EMPTY_READER, joinedEmpties); + + GapEncodedVariableLengthIntegerReader[] nulls = new GapEncodedVariableLengthIntegerReader[] {null, null}; + GapEncodedVariableLengthIntegerReader joinedNulls = GapEncodedVariableLengthIntegerReader.join(nulls); + assertEquals(EMPTY_READER, joinedNulls); + + GapEncodedVariableLengthIntegerReader[] from1 = new GapEncodedVariableLengthIntegerReader[1]; + from1[0] = reader(1, 10, 100, 105, 107, 200); + GapEncodedVariableLengthIntegerReader joined1 = GapEncodedVariableLengthIntegerReader.join(from1); + assertValues(joined1, 1, 10, 100, 105, 107, 200); + + assertIllegalStateException(() -> GapEncodedVariableLengthIntegerReader.join(null)); + } + + @Test + public void testSplit() { + GapEncodedVariableLengthIntegerReader reader = reader(1, 10, 100, 105, 107, 200); + + GapEncodedVariableLengthIntegerReader[] splitBy2 = reader.split(2); + assertEquals(2, splitBy2.length); + assertValues(splitBy2[0], 5, 50, 100); // (split[i]*numSplits + i) is the original ordinal + assertValues(splitBy2[1], 0, 52, 53); + + GapEncodedVariableLengthIntegerReader[] splitBy256 = reader.split(256); + assertEquals(256, splitBy256.length); + assertValues(splitBy256[1], 0); + assertValues(splitBy256[200], 0); + assertEquals(EMPTY_READER, splitBy256[0]); + assertEquals(EMPTY_READER, splitBy256[255]); + + GapEncodedVariableLengthIntegerReader[] splitBy2Empty = EMPTY_READER.split(2); + assertEquals(2, splitBy2Empty.length); + assertEquals(EMPTY_READER, splitBy2Empty[0]); + assertEquals(EMPTY_READER, splitBy2Empty[1]); + + assertIllegalStateException(() ->reader.split(0)); + assertIllegalStateException(() -> reader.split(3)); + } + private GapEncodedVariableLengthIntegerReader reader(int... values) { ByteDataArray arr = new ByteDataArray(WastefulRecycler.SMALL_ARRAY_RECYCLER); @@ -67,11 +128,19 @@ private GapEncodedVariableLengthIntegerReader reader(int... values) { private void assertValues(GapEncodedVariableLengthIntegerReader reader, int... expectedValues) { for(int i=0;i invocation) { + try { + invocation.get(); + Assert.fail(); + } catch (IllegalStateException e) { + // expected + } + } } diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java new file mode 100644 index 0000000000..d72d08cae9 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/AbstractHollowObjectTypeDataElementsSplitJoinTest.java @@ -0,0 +1,106 @@ +package com.netflix.hollow.core.read.engine.object; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +import com.netflix.hollow.api.objects.generic.GenericHollowObject; +import com.netflix.hollow.core.AbstractStateEngineTest; +import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.core.read.filter.HollowFilterConfig; +import com.netflix.hollow.core.schema.HollowObjectSchema; +import com.netflix.hollow.core.util.StateEngineRoundTripper; +import com.netflix.hollow.core.write.HollowObjectTypeWriteState; +import com.netflix.hollow.core.write.HollowObjectWriteRecord; +import java.io.IOException; +import org.junit.Before; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class AbstractHollowObjectTypeDataElementsSplitJoinTest extends AbstractStateEngineTest { + protected HollowObjectSchema schema; + + @Mock + protected HollowObjectTypeReadState mockObjectTypeState; + + @Before + public void setUp() { + schema = new HollowObjectSchema("TestObject", 4); + schema.addField("longField", HollowObjectSchema.FieldType.LONG); + schema.addField("stringField", HollowObjectSchema.FieldType.STRING); + schema.addField("intField", HollowObjectSchema.FieldType.INT); + schema.addField("doubleField", HollowObjectSchema.FieldType.DOUBLE); + + MockitoAnnotations.initMocks(this); + HollowObjectTypeDataElements[] fakeDataElements = new HollowObjectTypeDataElements[5]; + when(mockObjectTypeState.currentDataElements()).thenReturn(fakeDataElements); + super.setUp(); + } + + @Override + protected void initializeTypeStates() { + writeStateEngine.setTargetMaxTypeShardSize(4096); + writeStateEngine.addTypeState(new HollowObjectTypeWriteState(schema)); + } + + private void populateWriteStateEngine(int numRecords) { + initWriteStateEngine(); + HollowObjectWriteRecord rec = new HollowObjectWriteRecord(schema); + for(int i=0;i widthSmall); + + HollowObjectTypeDataElements dataElementsJoined = joiner.join(new HollowObjectTypeDataElements[] + {dataElementsSmall, dataElementsBig}); + int intFieldPosJoined = dataElementsJoined.schema.getPosition("intField"); + int widthJoined = dataElementsJoined.bitsPerField[intFieldPosJoined]; + + long val0 = dataElementsJoined.fixedLengthData.getElementValue(dataElementsJoined.bitOffsetPerField[intFieldPosJoined], widthJoined); + long val1 = dataElementsJoined.fixedLengthData.getElementValue(dataElementsJoined.bitsPerRecord + dataElementsJoined.bitOffsetPerField[intFieldPosJoined], widthJoined); + + assertEquals(widthBig, widthJoined); + assertEquals(valSmall, val0); + assertEquals(valBig, val1); + } + +// TODO: manually invoked, depends on producer side changes for supporting changing numShards in a delta chain +// @Test +// public void testLopsidedShards() { +// InMemoryBlobStore blobStore = new InMemoryBlobStore(); +// HollowProducer p = HollowProducer.withPublisher(blobStore) +// .withBlobStager(new HollowInMemoryBlobStager()) +// .build(); +// +// p.initializeDataModel(schema); +// int targetSize = 64; +// p.getWriteEngine().setTargetMaxTypeShardSize(targetSize); +// long v1 = oneRunCycle(p, new int[] {0, 1, 2, 3, 4, 5, 6, 7}); +// +// HollowConsumer c = HollowConsumer +// .withBlobRetriever(blobStore) +// .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { +// @Override +// public boolean allowDoubleSnapshot() { +// return false; +// } +// +// @Override +// public int maxDeltasBeforeDoubleSnapshot() { +// return Integer.MAX_VALUE; +// } +// }) +// .withSkipTypeShardUpdateWithNoAdditions() +// .build(); +// c.triggerRefreshTo(v1); +// +// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); +// assertEquals(true, c.getStateEngine().isSkipTypeShardUpdateWithNoAdditions()); +// +// long v2 = oneRunCycle(p, new int[] {0, 1, 2, 3, 5, 7}); +// c.triggerRefreshTo(v2); +// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); +// +// long v3 = oneRunCycle(p, new int[] { 0, 1, 3, 5}); // drop to 1 ordinal per shard, skipTypeShardWithNoAdds will make it so that maxOrdinal is adjusted +// c.triggerRefreshTo(v3); +// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); +// +// long v4 = oneRunCycle(p, new int[] { 0, 1, 2, 3}); // now add another ordinal to one shard, maxOrdinals will be lopsided +// c.triggerRefreshTo(v4); +// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); +// +// readStateEngine = c.getStateEngine(); +// assertDataUnchanged(3); +// +// long v5 = oneRunCycle(p, new int[] {0, 1}); +// +// // assert lopsided shards before join +// assertEquals(2, ((HollowObjectTypeReadState) c.getStateEngine().getTypeState("TestObject")).shardsVolatile.shards[0].currentDataElements().maxOrdinal); +// assertEquals(3, ((HollowObjectTypeReadState) c.getStateEngine().getTypeState("TestObject")).shardsVolatile.shards[1].currentDataElements().maxOrdinal); +// c.triggerRefreshTo(v5); +// assertEquals(1, c.getStateEngine().getTypeState("TestObject").numShards()); // joined to 1 shard +// readStateEngine = c.getStateEngine(); +// assertDataUnchanged(2); +// +// long v6 = oneRunCycle(p, new int[] {0, 1, 2, 3, 4, 5 }); +// c.triggerRefreshTo(v6); +// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); // split to 2 shards +// +// long v7 = oneRunCycle(p, new int[] {8, 9}); +// c.triggerRefreshTo(v7); +// assertEquals(4, c.getStateEngine().getTypeState("TestObject").numShards()); // still 2 shards +// +// long v8 = oneRunCycle(p, new int[] {8}); +// c.triggerRefreshTo(v8); +// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); // down to 1 shard +// +// c.triggerRefreshTo(v1); +// assertEquals(v1, c.getCurrentVersionId()); +// +// c.triggerRefreshTo(v8); +// assertEquals(v8, c.getCurrentVersionId()); +// } + + private long oneRunCycle(HollowProducer p, int recordIds[]) { + return p.runCycle(state -> { + HollowObjectWriteRecord rec = new HollowObjectWriteRecord(schema); + for(int recordId : recordIds) { + rec.reset(); + rec.setLong("longField", recordId); + rec.setString("stringField", "Value" + recordId); + rec.setInt("intField", recordId); + rec.setDouble("doubleField", recordId); + + state.getStateEngine().add("TestObject", rec); + } + }); + } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitJoinTest.java new file mode 100644 index 0000000000..3e2cf99f87 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitJoinTest.java @@ -0,0 +1,120 @@ +package com.netflix.hollow.core.read.engine.object; + +import static org.junit.Assert.assertEquals; + +import com.netflix.hollow.api.consumer.HollowConsumer; +import com.netflix.hollow.api.consumer.fs.HollowFilesystemBlobRetriever; +import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.core.schema.HollowSchema; +import com.netflix.hollow.core.write.HollowObjectTypeWriteState; +import com.netflix.hollow.tools.checksum.HollowChecksum; +import java.io.IOException; +import java.nio.file.Paths; +import org.junit.Assert; +import org.junit.Test; + +public class HollowObjectTypeDataElementsSplitJoinTest extends AbstractHollowObjectTypeDataElementsSplitJoinTest { + + @Override + protected void initializeTypeStates() { + writeStateEngine.setTargetMaxTypeShardSize(4 * 1000 * 1024); + writeStateEngine.addTypeState(new HollowObjectTypeWriteState(schema)); + } + + @Test + public void testSplitThenJoin() throws IOException { + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(); + + for (int numRecords=0;numRecords<1*1000;numRecords++) { + HollowObjectTypeReadState typeReadState = populateTypeStateWith(numRecords); + assertEquals(1, typeReadState.numShards()); + assertDataUnchanged(numRecords); + HollowChecksum origChecksum = typeReadState.getChecksum(typeReadState.getSchema()); + + for (int numSplits : new int[]{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}) { + HollowObjectTypeDataElements[] splitElements = splitter.split(typeReadState.currentDataElements()[0], numSplits); + HollowObjectTypeDataElements joinedElements = joiner.join(splitElements); + typeReadState.setCurrentData(joinedElements); + + assertDataUnchanged(numRecords); + HollowChecksum resultChecksum = typeReadState.getChecksum(typeReadState.getSchema()); + assertEquals(origChecksum, resultChecksum); + } + } + } + + @Test + public void testSplitThenJoinWithFilter() throws IOException { + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(); + + int numSplits = 2; + for (int numRecords=0;numRecords<1*1000;numRecords++) { + HollowObjectTypeReadState typeReadState = populateTypeStateWithFilter(numRecords); + assertEquals(1, typeReadState.numShards()); + assertDataUnchanged(numRecords); + HollowChecksum origChecksum = typeReadState.getChecksum(typeReadState.getSchema()); + + HollowObjectTypeDataElements[] splitElements = splitter.split(typeReadState.currentDataElements()[0], numSplits); + HollowObjectTypeDataElements joinedElements = joiner.join(splitElements); + typeReadState.setCurrentData(joinedElements); + + assertDataUnchanged(numRecords); + HollowChecksum resultChecksum = typeReadState.getChecksum(typeReadState.getSchema()); + assertEquals(origChecksum, resultChecksum); + } + } + + @Test + public void testSplitThenJoinWithEmptyJoin() throws IOException { + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); + + HollowObjectTypeReadState typeReadState = populateTypeStateWith(1); + assertEquals(1, typeReadState.numShards()); + + HollowObjectTypeDataElements[] splitBy4 = splitter.split(typeReadState.currentDataElements()[0], 4); + assertEquals(-1, splitBy4[1].maxOrdinal); + assertEquals(-1, splitBy4[3].maxOrdinal); + + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(); + HollowObjectTypeDataElements joined = joiner.join(new HollowObjectTypeDataElements[]{splitBy4[1], splitBy4[3]}); + + assertEquals(-1, joined.maxOrdinal); + } + + // manually invoked + // @Test + public void testSplittingAndJoiningWithSnapshotBlob() throws Exception { + + String blobPath = null; // dir where snapshot blob exists for e.g. "/tmp/"; + long v = 0l; // snapshot version for e.g. 20230915162636001l; + String objectTypeWithOneShard = null; // type name corresponding to an Object type with single shard for e.g. "Movie"; + int numSplits = 2; + + if (blobPath==null || v==0l || objectTypeWithOneShard==null) { + throw new IllegalArgumentException("These arguments need to be specified"); + } + HollowFilesystemBlobRetriever hollowBlobRetriever = new HollowFilesystemBlobRetriever(Paths.get(blobPath)); + HollowConsumer c = HollowConsumer.withBlobRetriever(hollowBlobRetriever).build(); + c.triggerRefreshTo(v); + HollowReadStateEngine readStateEngine = c.getStateEngine(); + + HollowObjectTypeReadState typeState = (HollowObjectTypeReadState) readStateEngine.getTypeState(objectTypeWithOneShard); + HollowSchema origSchema = typeState.getSchema(); + HollowChecksum originalChecksum = typeState.getChecksum(origSchema); + + assertEquals(1, typeState.numShards()); + + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); + HollowObjectTypeDataElements[] splitElements = splitter.split(typeState.currentDataElements()[0], numSplits); + + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(); + HollowObjectTypeDataElements joinedElements = joiner.join(splitElements); + + typeState.setCurrentData(joinedElements); + HollowChecksum newChecksum = typeState.getChecksum(origSchema); + + Assert.assertEquals(originalChecksum, newChecksum); + } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitterTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitterTest.java new file mode 100644 index 0000000000..c0eb106d12 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitterTest.java @@ -0,0 +1,41 @@ +package com.netflix.hollow.core.read.engine.object; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import org.junit.Assert; +import org.junit.Test; + +public class HollowObjectTypeDataElementsSplitterTest extends AbstractHollowObjectTypeDataElementsSplitJoinTest { + + @Test + public void testSplit() throws IOException { + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); + + HollowObjectTypeReadState typeReadState = populateTypeStateWith(5); + assertEquals(1, typeReadState.numShards()); + assertDataUnchanged(5); + + HollowObjectTypeDataElements[] result1 = splitter.split(typeReadState.currentDataElements()[0], 1); + typeReadState.setCurrentData(result1[0]); + assertDataUnchanged(5); + + HollowObjectTypeDataElements[] result8 = splitter.split(typeReadState.currentDataElements()[0], 8); + assertEquals(0, result8[0].maxOrdinal); // for index that landed one record after split + assertEquals(-1, result8[7].maxOrdinal); // for index that landed no records after split + + try { + splitter.split(typeReadState.currentDataElements()[0], 3); // numSplits=3 + Assert.fail(); + } catch (IllegalStateException e) { + // expected, numSplits should be a power of 2 + } + + try { + splitter.split(typeReadState.currentDataElements()[0], 0); // numSplits=0 + Assert.fail(); + } catch (IllegalStateException e) { + // expected, numSplits should be a power of 2 + } + } +}