Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed SegmentedQueues not being cleaned up on session purge #833

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[fix] Fixed SegmentedQueues not being cleaned up on session purge. (#833)
[feature] Implement response-information property for request-response flow. (#840)
[fix] Optimised page file opening for disk-based queues. (#837)
[feature] Manage payload format indicator property, when set verify payload format. (#826)
Expand Down
17 changes: 8 additions & 9 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,8 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr
private void removeExpiredSession(ISessionsRepository.SessionData expiredSession) {
final String expiredAt = expiredSession.expireAt().map(Instant::toString).orElse("UNDEFINED");
LOG.debug("Removing session {}, expired on {}", expiredSession.clientId(), expiredAt);
remove(expiredSession.clientId());
remove(pool.get(expiredSession.clientId()));
sessionsRepository.delete(expiredSession);

subscriptionsDirectory.removeSharedSubscriptionsForClient(expiredSession.clientId());
}

Expand Down Expand Up @@ -489,19 +488,19 @@ private void purgeSessionState(Session session) {
throw new SessionCorruptedException("Session has already changed state: " + session);
}

unsubscribe(session);
remove(session.getClientID());

remove(session);
sessionsRepository.delete(session.getSessionData());
subscriptionsDirectory.removeSharedSubscriptionsForClient(session.getClientID());
}

void remove(String clientID) {
final Session old = pool.remove(clientID);
if (old != null) {
void remove(Session session) {
String clientID = session.getClientID();
if (pool.remove(clientID, session)) {
unsubscribe(session);
// remove from expired tracker if present
sessionExpirationService.untrack(clientID);
loopsGroup.routeCommand(clientID, "Clean up removed session", () -> {
old.cleanUp();
session.cleanUp();
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ public boolean isEmpty() {
}
}

/**
* Close the Queue and release all resources.
*/
public void close() {
queuePool.purgeQueue(name);
headSegment = null;
tailSegment = null;
}

/**
* Read next message or return null if the queue has no data.
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,23 @@ public Queue getOrCreate(String queueName) throws QueueException {
}
}

void purgeQueue(String name) {
final QueueName queueName = new QueueName(name);
final LinkedList<SegmentRef> segmentRefs = queueSegments.remove(queueName);
SegmentRef segmentRef = segmentRefs.pollLast();
segmentsAllocationLock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a debug line here, like

LOG.debug("Purging segments for queue {}", queueName);

try {
while (segmentRef != null) {
LOG.debug("Consumed tail segment {} from queue {}", segmentRef, queueName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rewrite this log as

Suggested change
LOG.debug("Consumed tail segment {} from queue {}", segmentRef, queueName);
LOG.debug("Purging segment {} from queue {}", segmentRef, queueName);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, I added both logging lines.

recycledSegments.add(segmentRef);
segmentRef = segmentRefs.pollLast();
}
} finally {
segmentsAllocationLock.unlock();
}
queues.remove(queueName);
}

/**
* Free mapped files
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,6 @@ public boolean isEmpty() {
@Override
public void closeAndPurge() {
closed = true;
segmentedQueue.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,57 @@ public void reopenQueueWithFragmentation() throws QueueException, IOException {
assertEquals("(0, 0), (0, 4194304)", segmentRefs);
}

@Test
public void testPageFileReuse() throws QueueException, IOException {
// Use smaller segmants and pages for quicker testing.
final int kb = 1024;
// Pages with only 4 segments, for quicker testing
final int queuePageSize = 16 * kb;
final int queueSegmentSize = 4 * kb;
// write 2 segments, consume one segment, next segment allocated should be one just freed.0
QueuePool queuePool = QueuePool.loadQueues(tempQueueFolder, queuePageSize, queueSegmentSize);
Queue queue1 = queuePool.getOrCreate("test_external_fragmentation");

byte[] bytes = new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'};

// fill seven segments (almost the full 8 we have)
// This should force two files to open.
for (int i = 0; i < 7; i++) {
queue1.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2])));
queue1.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2 + 1])));
}

// Create a new queue
final Queue queue2 = queuePool.getOrCreate("test_external_fragmentation2");

// Write one segment (filling the second page)
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[0])));
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[1])));

// Release the first queue
queue1.force();
queue1.close();

// Fill another six segments.
// This should not open more files.
for (int i = 1; i < 7; i++) {
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2])));
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2 + 1])));
}

queue2.force();
queuePool.close();

// Verify
// checkpoint contains che correct order, (0,0), (0, 4194304)
Properties checkpointProps3 = loadCheckpointFile(tempQueueFolder);

// We should now have segments 6, 5, 4, 3, 2, 1, 8
// or, 2.2, 2.1, 1.4, 1.3, 1.2, 1.1, 2.4
final String segmentRefs = checkpointProps3.getProperty("queues.0.segments");
assertEquals("(1, 4096), (1, 0), (0, 12288), (0, 8192), (0, 4096), (0, 0), (1, 12288)", segmentRefs);
}

private Properties loadCheckpointFile(Path dir) throws IOException {
final Path checkpointPath = dir.resolve("checkpoint.properties");
final FileReader fileReader = new FileReader(checkpointPath.toFile());
Expand Down
Loading