Skip to content

Commit

Permalink
lifecycle staging files and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Jun 19, 2023
1 parent 4a91f9b commit 37e9084
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ private void applyDeltaOnlyPlan(HollowUpdatePlan updatePlan, HollowConsumer.Refr
}

private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable {
LOG.warning("SNAP: Attempting delta transition ...");
LOG.info("Attempting delta transition ...");
if (!memoryMode.equals(MemoryMode.ON_HEAP)) {
LOG.warning("SNAP: Attempting delta transition in shared-memory mode ...");
LOG.info("SNAP: Attempting delta transition in shared-memory mode ...");
}

try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, blob);
Expand Down Expand Up @@ -252,7 +252,7 @@ private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPl
throw t;
} finally {
// if (!memoryMode.equals(MemoryMode.ON_HEAP)) {
LOG.warning("SNAP: Delta transition applied to version " + blob.getToVersion());
LOG.info("SNAP: Delta transition applied to version " + blob.getToVersion());
// }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ public class EncodedByteBuffer implements VariableLengthData {
private BlobByteBuffer bufferView;
private long size;

private final File stagedFile;
private final File managedFile;
private boolean destroyActionHasBeenTakenBeforeDiag = false;

public EncodedByteBuffer(File stagedFile) {
this.stagedFile = stagedFile;
public EncodedByteBuffer(File managedFile) {
this.managedFile = managedFile;
this.size = 0;
}

Expand All @@ -47,13 +48,16 @@ public BlobByteBuffer getBufferView() {
public void destroy() throws IOException {
if (bufferView != null) {
bufferView.unmapBlob();
destroyActionHasBeenTakenBeforeDiag = true;
} else {
LOG.warning("SNAP: destroy() called on EncodedByteBuffer thats already been destroyed previously");
if (destroyActionHasBeenTakenBeforeDiag) {
LOG.warning("SNAP: destroy() called on EncodedByteBuffer thats already been destroyed previously");
}
}
bufferView = null;
if (stagedFile != null) {
LOG.info("SNAP: EncodedByteBuffer destroy() is also deleting staged file " + stagedFile.getAbsolutePath());
Files.delete(stagedFile.toPath());
if (managedFile != null) {
LOG.info("SNAP: EncodedByteBuffer destroy() is also deleting staged file " + managedFile.getAbsolutePath());
Files.delete(managedFile.toPath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class EncodedLongBuffer implements FixedLengthData {
private long maxByteIndex = -1;

private final File managedFile;
private boolean destroyActionHasBeenTakenBeforeDiag = false;

public EncodedLongBuffer(File managedFile) {
this.managedFile = managedFile;
Expand All @@ -64,13 +65,15 @@ public EncodedLongBuffer(File managedFile) {
public void destroy() throws IOException {
if (bufferView != null) {
bufferView.unmapBlob();
destroyActionHasBeenTakenBeforeDiag = true;
} else {
LOG.warning("SNAP: destroy() called on EncodedLongBuffer thats been destroyed previously");
if (destroyActionHasBeenTakenBeforeDiag) {
LOG.warning("SNAP: destroy() called on EncodedLongBuffer thats been destroyed previously");
}
}
bufferView = null;

if (managedFile != null) {
LOG.warning("SNAP: destroy() called on EncodedLongBuffer invoking delete on backing file " + managedFile.getAbsolutePath());
Files.delete(managedFile.toPath());
}
// System.out.println("SNAP: WARNING - shouldn't be getting invoked");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.UUID;
import org.junit.Test;

public class BlobByteBufferTest {
Expand All @@ -15,7 +16,7 @@ public void writeThenRead() throws IOException {
int padBytes = 8;
int singleBufferCapacity = 1024;

File targetFile = new File("test-BlobByteBuffer-" + System.currentTimeMillis());
File targetFile = new File("test-BlobByteBuffer-" + System.currentTimeMillis() + "-" + UUID.randomUUID());
targetFile.deleteOnExit();
RandomAccessFile raf = new RandomAccessFile(targetFile, "rw");
raf.setLength((14 * Long.BYTES) + padBytes);
Expand All @@ -32,6 +33,8 @@ public void writeThenRead() throws IOException {
Long.MAX_VALUE, Long.MAX_VALUE,
};

assertEquals(1, buf.getReferenceCount().get());

for (int offset = 0; offset < padBytes; offset ++) {
for (int i = 0; i < values.length; i ++) {
buf.putLong(offset + i * Long.BYTES, values[i]);
Expand All @@ -44,5 +47,31 @@ public void writeThenRead() throws IOException {
}
raf.close();

buf.unmapBlob();
assertEquals(0, buf.getReferenceCount().get());
}

@Test
public void testReferenceCounting() throws IOException {
File targetFile = new File("test-BlobByteBuffer-" + System.currentTimeMillis() + "-" + UUID.randomUUID());
targetFile.deleteOnExit();
int singleBufferCapacity = 64;
RandomAccessFile raf = new RandomAccessFile(targetFile, "rw");
raf.setLength(14 * Long.BYTES);
FileChannel channel = raf.getChannel();
BlobByteBuffer buf = BlobByteBuffer.mmapBlob(channel, singleBufferCapacity);
raf.close();

assertEquals(1, buf.getReferenceCount().get());

BlobByteBuffer dupBuf = buf.duplicate();
assertEquals(2, buf.getReferenceCount().get());

// can unmap in same order as init
buf.unmapBlob();
assertEquals(1, buf.getReferenceCount().get());

dupBuf.unmapBlob();
assertEquals(0, buf.getReferenceCount().get());
}
}

0 comments on commit 37e9084

Please sign in to comment.