Skip to content

Commit

Permalink
Code from stevenewald with minor patch #624
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Jun 27, 2023
1 parent bbbff00 commit 2bb3a75
Show file tree
Hide file tree
Showing 4 changed files with 412 additions and 491 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,10 @@

import com.netflix.hollow.core.HollowDataset;
import com.netflix.hollow.core.index.key.PrimaryKey;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.HollowBlobReader;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReadState;
import com.netflix.hollow.core.util.SimultaneousExecutor;
import com.netflix.hollow.core.write.HollowBlobWriter;
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.tools.history.HollowHistory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand All @@ -46,18 +36,15 @@ public class HollowHistoryKeyIndex {

private final HollowHistory history;
private final Map<String, HollowHistoryTypeKeyIndex> typeKeyIndexes;
private final HollowWriteStateEngine indexWriteStateEngine;
private HollowReadStateEngine indexReadStateEngine;
private boolean isInitialized;

public HollowHistoryKeyIndex(HollowHistory history) {
this.history = history;
this.typeKeyIndexes = new HashMap<String, HollowHistoryTypeKeyIndex>();
this.indexWriteStateEngine = new HollowWriteStateEngine();
this.indexReadStateEngine = new HollowReadStateEngine();
this.typeKeyIndexes = new HashMap<>();
}

public int numUniqueKeys(String type) {
return indexReadStateEngine.getTypeState(type).maxOrdinal() + 1;
return typeKeyIndexes.get(type).getMaxIndexedOrdinal();
}

public String getKeyDisplayString(String type, int keyOrdinal) {
Expand All @@ -77,7 +64,7 @@ public void addTypeIndex(PrimaryKey primaryKey) {
}

public HollowHistoryTypeKeyIndex addTypeIndex(PrimaryKey primaryKey, HollowDataset dataModel) {
HollowHistoryTypeKeyIndex keyIdx = new HollowHistoryTypeKeyIndex(primaryKey, dataModel, indexWriteStateEngine, indexReadStateEngine);
HollowHistoryTypeKeyIndex keyIdx = new HollowHistoryTypeKeyIndex(primaryKey, dataModel);
typeKeyIndexes.put(primaryKey.getType(), keyIdx);
return keyIdx;
}
Expand Down Expand Up @@ -117,24 +104,11 @@ public void update(HollowReadStateEngine latestStateEngine, boolean isDelta) {

// This call updates the type key indexes of all types in this history key index.
updateTypeIndexes(latestStateEngine, isDelta && !isInitialUpdate);

HollowReadStateEngine newIndexReadState = roundTripStateEngine(isInitialUpdate, !isDelta);

// if snapshot update then a new read state was generated, update the types in the history index to point to this
// new read state
if (newIndexReadState != indexReadStateEngine) {
// New ReadState was created so let's update references to old one
indexReadStateEngine = newIndexReadState;
for(final Map.Entry<String, HollowHistoryTypeKeyIndex> entry : typeKeyIndexes.entrySet()) {
entry.getValue().updateReadStateEngine(indexReadStateEngine);
}
}

rehashKeys();
isInitialized = true;
}

public boolean isInitialized() {
return !indexReadStateEngine.getTypeStates().isEmpty();
return isInitialized;
}

private void initializeTypeIndexes(HollowReadStateEngine latestStateEngine) {
Expand All @@ -146,7 +120,7 @@ private void initializeTypeIndexes(HollowReadStateEngine latestStateEngine) {
HollowObjectTypeReadState typeState = (HollowObjectTypeReadState) latestStateEngine.getTypeState(type);
if (typeState == null) continue;

index.initialize(typeState);
index.initializeKeySchema(typeState);
}
}

Expand All @@ -166,81 +140,4 @@ private void updateTypeIndexes(final HollowReadStateEngine latestStateEngine, fi
throw new RuntimeException(e);
}
}

/**
* Updates the tracked readStateEngine based on populated writeStateEngine. Depending on whether this is an initial
* load or double snapshot etc. it invokes a snapshot or delta transition.
*/
private HollowReadStateEngine roundTripStateEngine(boolean isInitialUpdate, boolean isSnapshot) {
HollowBlobWriter writer = new HollowBlobWriter(indexWriteStateEngine);
// Use existing readStateEngine on initial update or delta;
// otherwise, create new one to properly handle double snapshot
HollowReadStateEngine newReadStateEngine = (isInitialUpdate || !isSnapshot)
? indexReadStateEngine : new HollowReadStateEngine();
HollowBlobReader reader = new HollowBlobReader(newReadStateEngine);

// Use a pipe to write and read concurrently to avoid writing
// to temporary files or allocating memory
// @@@ for small states it's more efficient to sequentially write to
// and read from a byte array but it is tricky to estimate the size
SimultaneousExecutor executor = new SimultaneousExecutor(1, HollowHistoryKeyIndex.class, "round-trip");
Exception pipeException = null;
// Ensure read-side is closed after completion of read
try (PipedInputStream is = new PipedInputStream(1 << 15)) {
BufferedOutputStream out = new BufferedOutputStream(new PipedOutputStream(is));
executor.execute(() -> {
// Ensure write-side is closed after completion of write
try (Closeable ac = out) {
if (isInitialUpdate || isSnapshot) {
writer.writeSnapshot(out);
} else {
writer.writeDelta(out);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});

HollowBlobInput in = HollowBlobInput.serial(new BufferedInputStream(is));
if (isInitialUpdate || isSnapshot) {
reader.readSnapshot(in);
} else {
reader.applyDelta(in);
}

} catch (Exception e) {
pipeException = e;
}

// Ensure no underlying writer exception is lost due to broken pipe
try {
executor.awaitSuccessfulCompletion();
} catch (InterruptedException | ExecutionException e) {
if (pipeException == null) {
throw new RuntimeException(e);
}

pipeException.addSuppressed(e);
}
if (pipeException != null)
throw new RuntimeException(pipeException);

indexWriteStateEngine.prepareForNextCycle();
return newReadStateEngine;
}

private void rehashKeys() {
SimultaneousExecutor executor = new SimultaneousExecutor(getClass(), "rehash-keys");

for(final Map.Entry<String, HollowHistoryTypeKeyIndex> entry : typeKeyIndexes.entrySet()) {
executor.execute(() -> entry.getValue().hashRecordKeys());
}

try {
executor.awaitSuccessfulCompletion();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit 2bb3a75

Please sign in to comment.