diff --git a/hollow/src/main/java/com/netflix/hollow/tools/history/keyindex/HollowHistoryKeyIndex.java b/hollow/src/main/java/com/netflix/hollow/tools/history/keyindex/HollowHistoryKeyIndex.java index 24cdf68365..1f163308ab 100644 --- a/hollow/src/main/java/com/netflix/hollow/tools/history/keyindex/HollowHistoryKeyIndex.java +++ b/hollow/src/main/java/com/netflix/hollow/tools/history/keyindex/HollowHistoryKeyIndex.java @@ -18,20 +18,10 @@ import com.netflix.hollow.core.HollowDataset; import com.netflix.hollow.core.index.key.PrimaryKey; -import com.netflix.hollow.core.read.HollowBlobInput; -import com.netflix.hollow.core.read.engine.HollowBlobReader; import com.netflix.hollow.core.read.engine.HollowReadStateEngine; import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReadState; import com.netflix.hollow.core.util.SimultaneousExecutor; -import com.netflix.hollow.core.write.HollowBlobWriter; -import com.netflix.hollow.core.write.HollowWriteStateEngine; import com.netflix.hollow.tools.history.HollowHistory; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -46,18 +36,15 @@ public class HollowHistoryKeyIndex { private final HollowHistory history; private final Map typeKeyIndexes; - private final HollowWriteStateEngine indexWriteStateEngine; - private HollowReadStateEngine indexReadStateEngine; + private boolean isInitialized; public HollowHistoryKeyIndex(HollowHistory history) { this.history = history; - this.typeKeyIndexes = new HashMap(); - this.indexWriteStateEngine = new HollowWriteStateEngine(); - this.indexReadStateEngine = new HollowReadStateEngine(); + this.typeKeyIndexes = new HashMap<>(); } public int numUniqueKeys(String type) { - return indexReadStateEngine.getTypeState(type).maxOrdinal() + 1; + return typeKeyIndexes.get(type).getMaxIndexedOrdinal(); } public String getKeyDisplayString(String type, int keyOrdinal) { @@ -77,7 +64,7 @@ public void addTypeIndex(PrimaryKey primaryKey) { } public HollowHistoryTypeKeyIndex addTypeIndex(PrimaryKey primaryKey, HollowDataset dataModel) { - HollowHistoryTypeKeyIndex keyIdx = new HollowHistoryTypeKeyIndex(primaryKey, dataModel, indexWriteStateEngine, indexReadStateEngine); + HollowHistoryTypeKeyIndex keyIdx = new HollowHistoryTypeKeyIndex(primaryKey, dataModel); typeKeyIndexes.put(primaryKey.getType(), keyIdx); return keyIdx; } @@ -117,24 +104,11 @@ public void update(HollowReadStateEngine latestStateEngine, boolean isDelta) { // This call updates the type key indexes of all types in this history key index. updateTypeIndexes(latestStateEngine, isDelta && !isInitialUpdate); - - HollowReadStateEngine newIndexReadState = roundTripStateEngine(isInitialUpdate, !isDelta); - - // if snapshot update then a new read state was generated, update the types in the history index to point to this - // new read state - if (newIndexReadState != indexReadStateEngine) { - // New ReadState was created so let's update references to old one - indexReadStateEngine = newIndexReadState; - for(final Map.Entry entry : typeKeyIndexes.entrySet()) { - entry.getValue().updateReadStateEngine(indexReadStateEngine); - } - } - - rehashKeys(); + isInitialized = true; } public boolean isInitialized() { - return !indexReadStateEngine.getTypeStates().isEmpty(); + return isInitialized; } private void initializeTypeIndexes(HollowReadStateEngine latestStateEngine) { @@ -146,7 +120,7 @@ private void initializeTypeIndexes(HollowReadStateEngine latestStateEngine) { HollowObjectTypeReadState typeState = (HollowObjectTypeReadState) latestStateEngine.getTypeState(type); if (typeState == null) continue; - index.initialize(typeState); + index.initializeKeySchema(typeState); } } @@ -166,81 +140,4 @@ private void updateTypeIndexes(final HollowReadStateEngine latestStateEngine, fi throw new RuntimeException(e); } } - - /** - * Updates the tracked readStateEngine based on populated writeStateEngine. Depending on whether this is an initial - * load or double snapshot etc. it invokes a snapshot or delta transition. - */ - private HollowReadStateEngine roundTripStateEngine(boolean isInitialUpdate, boolean isSnapshot) { - HollowBlobWriter writer = new HollowBlobWriter(indexWriteStateEngine); - // Use existing readStateEngine on initial update or delta; - // otherwise, create new one to properly handle double snapshot - HollowReadStateEngine newReadStateEngine = (isInitialUpdate || !isSnapshot) - ? indexReadStateEngine : new HollowReadStateEngine(); - HollowBlobReader reader = new HollowBlobReader(newReadStateEngine); - - // Use a pipe to write and read concurrently to avoid writing - // to temporary files or allocating memory - // @@@ for small states it's more efficient to sequentially write to - // and read from a byte array but it is tricky to estimate the size - SimultaneousExecutor executor = new SimultaneousExecutor(1, HollowHistoryKeyIndex.class, "round-trip"); - Exception pipeException = null; - // Ensure read-side is closed after completion of read - try (PipedInputStream is = new PipedInputStream(1 << 15)) { - BufferedOutputStream out = new BufferedOutputStream(new PipedOutputStream(is)); - executor.execute(() -> { - // Ensure write-side is closed after completion of write - try (Closeable ac = out) { - if (isInitialUpdate || isSnapshot) { - writer.writeSnapshot(out); - } else { - writer.writeDelta(out); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - - HollowBlobInput in = HollowBlobInput.serial(new BufferedInputStream(is)); - if (isInitialUpdate || isSnapshot) { - reader.readSnapshot(in); - } else { - reader.applyDelta(in); - } - - } catch (Exception e) { - pipeException = e; - } - - // Ensure no underlying writer exception is lost due to broken pipe - try { - executor.awaitSuccessfulCompletion(); - } catch (InterruptedException | ExecutionException e) { - if (pipeException == null) { - throw new RuntimeException(e); - } - - pipeException.addSuppressed(e); - } - if (pipeException != null) - throw new RuntimeException(pipeException); - - indexWriteStateEngine.prepareForNextCycle(); - return newReadStateEngine; - } - - private void rehashKeys() { - SimultaneousExecutor executor = new SimultaneousExecutor(getClass(), "rehash-keys"); - - for(final Map.Entry entry : typeKeyIndexes.entrySet()) { - executor.execute(() -> entry.getValue().hashRecordKeys()); - } - - try { - executor.awaitSuccessfulCompletion(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - } diff --git a/hollow/src/main/java/com/netflix/hollow/tools/history/keyindex/HollowHistoryTypeKeyIndex.java b/hollow/src/main/java/com/netflix/hollow/tools/history/keyindex/HollowHistoryTypeKeyIndex.java index 5d260439ae..3a1ba595a4 100644 --- a/hollow/src/main/java/com/netflix/hollow/tools/history/keyindex/HollowHistoryTypeKeyIndex.java +++ b/hollow/src/main/java/com/netflix/hollow/tools/history/keyindex/HollowHistoryTypeKeyIndex.java @@ -18,485 +18,245 @@ import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; import static com.netflix.hollow.tools.util.SearchUtils.MULTI_FIELD_KEY_DELIMITER; -import static com.netflix.hollow.tools.util.SearchUtils.getFieldPathIndexes; -import static com.netflix.hollow.tools.util.SearchUtils.getOrdinalToDisplay; -import static com.netflix.hollow.tools.util.SearchUtils.parseKey; +import com.netflix.hollow.Hollow; import com.netflix.hollow.core.HollowDataset; import com.netflix.hollow.core.index.key.PrimaryKey; import com.netflix.hollow.core.memory.encoding.HashCodes; import com.netflix.hollow.core.read.HollowReadFieldUtils; -import com.netflix.hollow.core.read.engine.HollowReadStateEngine; import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener; import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReadState; import com.netflix.hollow.core.schema.HollowObjectSchema; +import com.netflix.hollow.core.schema.HollowObjectSchema.FieldType; import com.netflix.hollow.core.util.IntList; -import com.netflix.hollow.core.util.LongList; import com.netflix.hollow.core.util.RemovedOrdinalIterator; -import com.netflix.hollow.core.write.HollowObjectTypeWriteState; -import com.netflix.hollow.core.write.HollowObjectWriteRecord; -import com.netflix.hollow.core.write.HollowTypeWriteState; -import com.netflix.hollow.core.write.HollowWriteStateEngine; import java.util.Arrays; import java.util.BitSet; +import java.util.HashMap; public class HollowHistoryTypeKeyIndex { - private final PrimaryKey primaryKey; - private final String[][] keyFieldParts; + private final FieldType[] fieldTypes; + + private final String[][] keyFieldNames; + private final int[][] keyFieldIndices; private final boolean[] keyFieldIsIndexed; - private HollowObjectSchema keySchema; - private int[] hashedRecordKeys; - private int[][] hashedFieldKeys; - private LongList hashedFieldKeyChains; + private boolean isInitialized = false; + private int maxIndexedOrdinal = 0; - private int maxIndexedKeyOrdinal = 0; + private final HollowOrdinalMapper ordinalMapping; + private final HashMap ordinalFieldHashMapping; - private final HollowWriteStateEngine writeStateEngine; - private HollowReadStateEngine readStateEngine; - private boolean isInitialized = false; - public HollowHistoryTypeKeyIndex(PrimaryKey primaryKey, HollowDataset dataModel, HollowWriteStateEngine writeEngine, HollowReadStateEngine readEngine) { + public HollowHistoryTypeKeyIndex(PrimaryKey primaryKey, HollowDataset dataModel) { this.primaryKey = primaryKey; - this.writeStateEngine = writeEngine; - this.readStateEngine = readEngine; - this.keyFieldParts = getKeyFieldParts(dataModel); + this.fieldTypes = new FieldType[primaryKey.numFields()]; + + this.keyFieldNames = new String[primaryKey.numFields()][]; + this.keyFieldIndices = new int[primaryKey.numFields()][]; this.keyFieldIsIndexed = new boolean[primaryKey.numFields()]; + initializeKeyParts(dataModel); + + this.ordinalMapping = new HollowOrdinalMapper(primaryKey, keyFieldIsIndexed, keyFieldIndices); + this.ordinalFieldHashMapping = new HashMap<>(); + } + + public boolean isInitialized() { + return isInitialized; + } + + public int findKeyIndexOrdinal(HollowObjectTypeReadState typeState, int ordinal) { + return ordinalMapping.findAssignedOrdinal(typeState, ordinal); + } + + public int getMaxIndexedOrdinal() { + return maxIndexedOrdinal; + } + + public String[] getKeyFields() { + return primaryKey.getFieldPaths(); } public void addFieldIndex(String fieldName, HollowDataset dataModel) { String[] fieldPathParts = PrimaryKey.getCompleteFieldPathParts(dataModel, primaryKey.getType(), fieldName); - - for(int i=0;iv2 or v2->v1 to the history type key index populateNewCurrentRecordKeysIntoIndex(latestTypeState); } else { - // usually on the first fwd delta transition, or on double snapshot + maxIndexedOrdinal = 0; populateAllCurrentRecordKeysIntoIndex(latestTypeState); } } - public void hashRecordKeys() { - HollowObjectTypeReadState keyTypeState = (HollowObjectTypeReadState) readStateEngine.getTypeState(primaryKey.getType()); - if (keyTypeState == null) return; - - int hashTableSize = HashCodes.hashTableSize(keyTypeState.maxOrdinal() + 1); + private void populateNewCurrentRecordKeysIntoIndex(HollowObjectTypeReadState typeState) { + PopulatedOrdinalListener listener = typeState.getListener(PopulatedOrdinalListener.class); + BitSet populatedOrdinals = listener.getPopulatedOrdinals(); + BitSet previousOrdinals = listener.getPreviousOrdinals(); - if(hashedRecordKeys == null || hashedRecordKeys.length < hashTableSize) { - rehashAllRecordKeys(keyTypeState, hashTableSize); - } else { - hashNewRecordKeys(keyTypeState); + RemovedOrdinalIterator iter = new RemovedOrdinalIterator(populatedOrdinals, previousOrdinals); + int ordinal = iter.next(); + while (ordinal != ORDINAL_NONE) { + writeKeyObject(typeState, ordinal, true); + ordinal = iter.next(); } } - private void hashNewRecordKeys(HollowObjectTypeReadState keyTypeState) { - for(int i=maxIndexedKeyOrdinal+1;i<=keyTypeState.maxOrdinal();i++) - indexOrdinal(keyTypeState, i, hashedRecordKeys, hashedFieldKeys, hashedFieldKeyChains); - maxIndexedKeyOrdinal = keyTypeState.maxOrdinal(); - } - - private void rehashAllRecordKeys(HollowObjectTypeReadState keyTypeState, int hashTableSize) { - int[] hashedRecordKeys = initializeHashedKeyArray(hashTableSize); - int[][] hashedFieldKeys = new int[primaryKey.numFields()][]; - LongList hashedFieldKeyChains = new LongList(); - - for(int i=0;i> 32); - representativeOrdinal = (chainIndex == Integer.MAX_VALUE) ? ORDINAL_NONE : (int)hashedFieldKeyChains.get(chainIndex); - } - return; - } - - bucket++; - bucket &= hashedFieldKeys[fieldIndex].length - 1; - } - } - - private interface Matcher { - boolean foundMatch(int ordinal); - } - - private int findKeyFieldHashCode(HollowObjectTypeReadState typeState, int ordinal, String[] keyFieldParts, int keyFieldPartPosition) { - int schemaPosition = typeState.getSchema().getPosition(keyFieldParts[keyFieldPartPosition]); - if(keyFieldPartPosition < keyFieldParts.length - 1) { - HollowObjectTypeReadState nextPartTypeState = (HollowObjectTypeReadState) typeState.getSchema().getReferencedTypeState(schemaPosition); - int nextOrdinal = typeState.readOrdinal(ordinal, schemaPosition); - return findKeyFieldHashCode(nextPartTypeState, nextOrdinal, keyFieldParts, keyFieldPartPosition + 1); - } else { - return HollowReadFieldUtils.fieldHashCode(typeState, ordinal, schemaPosition); - } - } - - private boolean recordMatchesKey(HollowObjectTypeReadState typeState, int ordinal, HollowObjectTypeReadState keyTypeState, int keyOrdinal) { - for(int i=0;i indexFieldObjectMapping; + private final HashMap assignedOrdinalToIndex; + + private final PrimaryKey primaryKey; + private final int[][] keyFieldIndices; + private final boolean[] keyFieldIsIndexed; + + private final ObjectInternPool memoizedPool; + + public HollowOrdinalMapper(PrimaryKey primaryKey, boolean[] keyFieldIsIndexed, int[][] keyFieldIndices) { + // Start with prime number to assist OA + this.ordinalMappings = new Integer[2069]; + this.originalHash = new Integer[2069]; + Arrays.fill(this.ordinalMappings, ORDINAL_NONE); + + this.primaryKey = primaryKey; + this.keyFieldIndices = keyFieldIndices; + this.keyFieldIsIndexed = keyFieldIsIndexed; + + this.indexFieldObjectMapping = new HashMap<>(); + this.assignedOrdinalToIndex = new HashMap<>(); + + this.memoizedPool = new ObjectInternPool(); + } + + public int findAssignedOrdinal(HollowObjectTypeReadState typeState, int keyOrdinal) { + int hashedRecord = hashKeyRecord(typeState, keyOrdinal); + int index = indexFromHash(hashedRecord, ordinalMappings.length); + + while (ordinalMappings[index]!=ORDINAL_NONE) { + if(recordsAreEqual(typeState, keyOrdinal, index)) + return ordinalMappings[index]; + index = (index + 1) % ordinalMappings.length; + } + + return ORDINAL_NONE; + } + + private boolean recordsAreEqual(HollowObjectTypeReadState typeState, int keyOrdinal, int index) { + for(int fieldIdx=0;fieldIdx LOAD_FACTOR) { + expandAndRehashTable(); + } + + int index = indexFromHash(hashedRecord, ordinalMappings.length); + + // Linear probing + while (ordinalMappings[index] != ORDINAL_NONE) { + if(recordsAreEqual(typeState, ordinal, index)) { + this.assignedOrdinalToIndex.put(assignedOrdinal, index); + return ORDINAL_NONE; + } + index = (index + 1) % ordinalMappings.length; + } + + ordinalMappings[index] = assignedOrdinal; + originalHash[index] = hashedRecord; + size++; + + storeFields(typeState, ordinal, index); + + this.assignedOrdinalToIndex.put(assignedOrdinal, index); + return index; + } + + private void storeFields(HollowObjectTypeReadState typeState, int ordinal, int index) { + if(!indexFieldObjectMapping.containsKey(index)) + indexFieldObjectMapping.put(index, new Object[primaryKey.numFields()]); + + for(int i=0;i newIndexFieldObjectMapping = new HashMap<>(); + + for(int i=0;i integerInternPool = new GenericInternPool<>(); + final private GenericInternPool floatInternPool = new GenericInternPool<>(); + final private GenericInternPool doubleInternPool = new GenericInternPool<>(); + final private GenericInternPool longInternPool = new GenericInternPool<>(); + + public Object intern(Object objectToIntern) { + if(objectToIntern==null) { + throw new IllegalArgumentException("Cannot intern null objects"); + } + + // Automatically handles booleans and integers within cached range + if(objectAutomaticallyCached(objectToIntern)) { + return objectToIntern; + } + + if(objectToIntern instanceof Float) { + return floatInternPool.intern((Float) objectToIntern); + } else if(objectToIntern instanceof Double) { + return doubleInternPool.intern((Double) objectToIntern); + } else if(objectToIntern instanceof Integer) { + return integerInternPool.intern((Integer) objectToIntern); + } else if(objectToIntern instanceof Long) { + return longInternPool.intern((Long) objectToIntern); + } else if(objectToIntern instanceof String) { + // Use Java's builtin intern function + return ((String) objectToIntern).intern(); + } else { + String className = objectToIntern.getClass().getName(); + throw new IllegalArgumentException("Cannot intern object of type " + className); + } + } + + private boolean objectAutomaticallyCached(Object objectToIntern) { + if(objectToIntern instanceof Boolean) { + return true; + } else if(objectToIntern instanceof Integer) { + return -128 <= (Integer) objectToIntern && (Integer) objectToIntern <= 127; + } + return false; + } +} + +class GenericInternPool { + private final Map pool = new HashMap<>(); + + public T intern(T object) { + if (object == null) { + throw new IllegalArgumentException("Cannot intern null objects"); + } + + synchronized (pool) { + T interned = pool.get(object); + if (interned == null) { + interned = object; + pool.put(object, object); + } + return interned; + } + } +} \ No newline at end of file