Skip to content

Commit

Permalink
Optimised page file opening (#837)
Browse files Browse the repository at this point in the history
Instead of mapping the entire page file every time a segment is re-used,
we map the page file once, and re-use the mapped buffer for each segment.
The cache uses WeakReferences to ensure that a page file that is no
longer used does not stay mapped.
  • Loading branch information
hylkevds authored May 27, 2024
1 parent 031e77b commit f222721
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[fix] Optimised page file opening for disk-based queues. (#837)
[feature] Manage payload format indicator property, when set verify payload format. (#826)
[refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827)
[feature] Add Netty native transport support on MacOS. Bundle all the native transport module by default (#806)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.moquette.broker.unsafequeues;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation of SegmentAllocator. It uses a series of files (named pages) and split them in segments.
Expand All @@ -15,7 +20,10 @@
* */
class PagedFilesAllocator implements SegmentAllocator {

private static final Logger LOG = LoggerFactory.getLogger(PagedFilesAllocator.class);

interface AllocationListener {

void segmentedCreated(String name, Segment segment);
}

Expand All @@ -27,6 +35,8 @@ interface AllocationListener {
private MappedByteBuffer currentPage;
private FileChannel currentPageFile;

private final Map<Integer, WeakReference<MappedByteBuffer>> pageCache = new HashMap<>();

PagedFilesAllocator(Path pagesFolder, int pageSize, int segmentSize, int lastPage, int lastSegmentAllocated) throws QueueException {
if (pageSize % segmentSize != 0) {
throw new IllegalArgumentException("The pageSize must be an exact multiple of the segmentSize");
Expand All @@ -36,11 +46,30 @@ interface AllocationListener {
this.segmentSize = segmentSize;
this.lastPage = lastPage;
this.lastSegmentAllocated = lastSegmentAllocated;
this.currentPage = openRWPageFile(this.pagesFolder, this.lastPage);
this.currentPage = openOrRetrievePageFile(this.lastPage);
}

private MappedByteBuffer openOrRetrievePageFile(int pageId) throws QueueException {
MappedByteBuffer pageBuffer = null;
WeakReference<MappedByteBuffer> pageBufferRef = pageCache.get(pageId);
if (pageBufferRef != null) {
pageBuffer = pageBufferRef.get();
}
if (pageBuffer == null) {
pageBuffer = openRWPageFile(pageId);
pageBufferRef = new WeakReference<>(pageBuffer);
WeakReference<MappedByteBuffer> old = pageCache.put(pageId, pageBufferRef);
//Sanity check, should not happen...
if (old != null && old.get() != null) {
LOG.warn("Page file {} opened even though it already is open!", pageId);
}
}
return pageBuffer;
}

private MappedByteBuffer openRWPageFile(Path pagesFolder, int pageId) throws QueueException {
private MappedByteBuffer openRWPageFile(int pageId) throws QueueException {
final Path pageFile = pagesFolder.resolve(String.format("%d.page", pageId));
LOG.debug("Opening page {} from file {}", pageId, pageFile);
boolean createNew = false;
if (!Files.exists(pageFile)) {
try {
Expand Down Expand Up @@ -71,7 +100,7 @@ private MappedByteBuffer openRWPageFile(Path pagesFolder, int pageId) throws Que
public Segment nextFreeSegment() throws QueueException {
if (currentPageIsExhausted()) {
lastPage++;
currentPage = openRWPageFile(pagesFolder, lastPage);
currentPage = openOrRetrievePageFile(lastPage);
lastSegmentAllocated = 0;
}

Expand All @@ -84,7 +113,7 @@ public Segment nextFreeSegment() throws QueueException {

@Override
public Segment reopenSegment(int pageId, int beginOffset) throws QueueException {
final MappedByteBuffer page = openRWPageFile(pagesFolder, pageId);
final MappedByteBuffer page = openOrRetrievePageFile(pageId);
final SegmentPointer begin = new SegmentPointer(pageId, beginOffset);
final SegmentPointer end = new SegmentPointer(pageId, beginOffset + segmentSize - 1);
return new Segment(page, begin, end);
Expand Down

0 comments on commit f222721

Please sign in to comment.