Skip to content

Commit

Permalink
Patch allocated target file length
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Jun 23, 2023
1 parent 5f91441 commit 6001a79
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static FixedLengthData get(HollowBlobInput in, MemoryMode memoryMode, Arr
}
}

// allocate (for write)
// allocate (for write) // unused
public static FixedLengthData allocate(HollowBlobInput in,
MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler,
String fileName) throws IOException {
Expand All @@ -39,13 +39,13 @@ public static FixedLengthData allocate(HollowBlobInput in,

public static FixedLengthData allocate(long numBits, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler,
String fileName) throws IOException {
long numLongs = ((numBits - 1) >>> 6) + 1;
numLongs ++; // accommodate for reading a long starting at bit index within numLongs-1
long numBytes = numLongs << 3;
if (memoryMode.equals(MemoryMode.ON_HEAP)) {
return new FixedLengthElementArray(memoryRecycler, numBits);
} else {
File targetFile = provisionTargetFile(numBytes, fileName);
long numLongs = ((numBits - 1) >>> 6) + 1;
long numBytes = numLongs << 3;
// add Long.BYTES to provisioned file size to accommodate unaligned read starting offset in last long
File targetFile = provisionTargetFile(numBytes + Long.BYTES, fileName);
try (HollowBlobInput targetBlob = HollowBlobInput.randomAccess(targetFile, MAX_SINGLE_BUFFER_CAPACITY)) {
return EncodedLongBuffer.newFrom(targetBlob, numLongs, targetFile); // TODO: test with different single buffer capacities
}
Expand All @@ -65,7 +65,7 @@ public static void destroy(FixedLengthData fld, ArraySegmentRecycler memoryRecyc
if (fld instanceof FixedLengthElementArray) {
((FixedLengthElementArray) fld).destroy(memoryRecycler);
} else if (fld instanceof EncodedLongBuffer) {
LOG.info("SNAP: Destroy operation invoked on EncodedLongBuffer (FixedLengthData)");
// LOG.info("SNAP: Destroy operation invoked on EncodedLongBuffer (FixedLengthData)");
((EncodedLongBuffer) fld).destroy();
} else {
throw new UnsupportedOperationException("Unknown type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static void destroy(VariableLengthData vld) throws IOException {
if (vld instanceof SegmentedByteArray) {
((SegmentedByteArray) vld).destroy();
} else if (vld instanceof EncodedByteBuffer) {
LOG.info("SNAP: Destroy operation invoked on EncodedByteBuffer (VariableLengthData)");
// LOG.info("SNAP: Destroy operation invoked on EncodedByteBuffer (VariableLengthData)");
((EncodedByteBuffer) vld).destroy();
} else {
throw new UnsupportedOperationException("Unknown type");
Expand Down Expand Up @@ -87,6 +87,10 @@ public void orderedCopy(VariableLengthData src, long srcPos, long destPos, long
while (length > 0) {
int toReadBytes = (int) Math.min(length, (long) chunk.length);
int readBytes = encodedByteBuffer.getBytes(srcPos, toReadBytes, chunk);
if (readBytes == 0) {
throw new IllegalStateException(String.format("SNAP: 0 bytes read from encoded byte buffer, " +
"srcPos= %s, toReadBytes= %s, chunk.length=%s", srcPos, toReadBytes, chunk.length));
}
length = length - readBytes;
srcPos = srcPos + readBytes;

Expand Down Expand Up @@ -120,7 +124,7 @@ public VariableLengthData commit() throws IOException {
this.raf.seek(0);
try (HollowBlobInput hbi = HollowBlobInput.mmap(this.file, this.raf, MAX_SINGLE_BUFFER_CAPACITY, false)) {
byteBuffer.loadFrom(hbi, this.raf.length());
LOG.info("SNAP: Closing randomaccessfile because HollowBlobInput does not manage the lifecycle (will not close) for " + file);
// LOG.info("SNAP: Closing randomaccessfile because HollowBlobInput does not manage the lifecycle (will not close) for " + file);
this.raf.close();
return byteBuffer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public byte getByte(long index) throws BufferUnderflowException {
}
else {
assert(index < capacity + Long.BYTES);
LOG.warning("SNAP: This is happening, not necessarily bad but test using unit test readUsingVariableLengthDataModes");
// LOG.warning("SNAP: This is happening, not necessarily bad but test using unit test readUsingVariableLengthDataModes");
// this situation occurs when read for bits near the end of the buffer requires reading a long value that
// extends past the buffer capacity by upto Long.BYTES bytes. To handle this case,
// return 0 for (index >= capacity - Long.BYTES && index < capacity )
Expand All @@ -171,7 +171,7 @@ public int getBytes(long index, long len, byte[] bytes, boolean restorePos) {
// extends past the buffer capacity by upto Long.BYTES bytes. To handle this case,
// return 0 for (index >= capacity - Long.BYTES && index < capacity )
// these zero bytes will be discarded anyway when the returned long value is shifted to get the queried bits
LOG.warning(String.format("Unexpected read past the end, index=%s, capacity=%s", index, capacity));
LOG.warning(String.format("Unexpected read past the end, index=%s, capacity=%s, len=%s", index, capacity, len));
}
int spineIndex = (int)(index >>> (shift));
ByteBuffer buf = spine[spineIndex];
Expand Down Expand Up @@ -293,7 +293,7 @@ public void unmapBlob() {
// count will sustain it from getting cleaned up, but cleanup will be promptly invoked on delta blob files after
// consumption and on per-shard per-type delta target files when it is superseded by another file in a future delta.
if (this.referenceCount.decrementAndGet() == 0) {
LOG.info("SNAP: Unmapping BlobByteBuffer because ref count has reached 0");
// LOG.info("SNAP: Unmapping BlobByteBuffer because ref count has reached 0");
for (int i = 0; i < spine.length; i++) {
ByteBuffer buf = spine[i];
if (buf != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,12 @@ public long skipBytes(long n) throws IOException {
@Override
public void close() throws IOException {
if (input instanceof RandomAccessFile) {
LOG.info("SNAP: close called on BlobByteBuffer composing instance of RandomAccessFile");
// LOG.info("SNAP: close called on BlobByteBuffer composing instance of RandomAccessFile");
if (manageRafLifecycle) {
LOG.info("SNAP: HollowBlobInput manages the lifecycle of randomaccessfile " + file + ". Calling close.");
// LOG.info("SNAP: HollowBlobInput manages the lifecycle of randomaccessfile " + file + ". Calling close.");
((RandomAccessFile) input).close();
} else {
LOG.info("SNAP: HollowBlobInput does not manage the lifecycle (will not close) of randomaccessfile " + file + ". Won't close file.");
// LOG.info("SNAP: HollowBlobInput does not manage the lifecycle (will not close) of randomaccessfile " + file + ". Won't close file.");
}
if (buffer != null) {
buffer.unmapBlob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener;
import com.netflix.hollow.core.util.IntMap;
import com.netflix.hollow.core.util.RemovedOrdinalIterator;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* This class contains the logic for extracting the removed records from an OBJECT type state
Expand All @@ -32,6 +34,7 @@
* Not intended for external consumption.
*/
public class HollowObjectDeltaHistoricalStateCreator {
private static final Logger LOG = Logger.getLogger(HollowObjectDeltaHistoricalStateCreator.class.getName());

private final HollowObjectTypeDataElements historicalDataElements;

Expand Down Expand Up @@ -159,8 +162,28 @@ private void copyRecord(int ordinal) {
long size = fromEndByte - fromStartByte;

historicalDataElements.fixedLengthData.setElementValue(((long)nextOrdinal * historicalDataElements.bitsPerRecord) + historicalDataElements.bitOffsetPerField[i], historicalDataElements.bitsPerField[i], currentWriteVarLengthDataPointers[i] + size);
historicalDataElements.varLengthData[i].copy(stateEngineDataElements[shard].varLengthData[i], fromStartByte, currentWriteVarLengthDataPointers[i], size);

try {
historicalDataElements.varLengthData[i].copy(stateEngineDataElements[shard].varLengthData[i], fromStartByte, currentWriteVarLengthDataPointers[i], size);
} catch (ArrayIndexOutOfBoundsException e) {
LOG.log(Level.SEVERE,
String.format("ArrayIndexOutOfBoundsException when building historical state: " +
"fieldName=%s, " +
"fieldType=%s, " +
"shard=%s, " +
"stateEngineDataElements[shard].varLengthData[i].length()=%s, " +
"fromStartByte=%s, " +
"size=%s, " +
"currentWriteVarLengthDataPointers[i]=%s, ",
historicalDataElements.schema.getFieldName(i),
historicalDataElements.schema.getFieldType(i),
shard,
stateEngineDataElements[shard].varLengthData[i].length(),
fromStartByte,
size,
currentWriteVarLengthDataPointers[i]),
e);
throw e;
}
currentWriteVarLengthDataPointers[i] += size;
}
}
Expand Down

0 comments on commit 6001a79

Please sign in to comment.