From ac2f5d4e4f10b78bd47d774479401f18dd5d1e87 Mon Sep 17 00:00:00 2001 From: Zach Bray Date: Tue, 2 Apr 2024 10:14:05 +0100 Subject: [PATCH] Add failing test for deletion of index segments on sequence reset. Users have spotted exceptions like the following: ``` java.lang.IllegalStateException: Replay index header file did not exist, but the following segment files did: [/hydra/data/log/replay-index-3-1-1, /hydra/data/log/replay-index-3-1-2] at uk.co.real_logic.artio.engine.logger.ReplayIndex$SessionIndex.checkSegmentFilesDoNotExist(ReplayIndex.java:561) at uk.co.real_logic.artio.engine.logger.ReplayIndex$SessionIndex.(ReplayIndex.java:536) at uk.co.real_logic.artio.engine.logger.ReplayIndex.lambda$new$0(ReplayIndex.java:63) at org.agrona.collections.Long2ObjectHashMap.computeIfAbsent(Long2ObjectHashMap.java:382) at uk.co.real_logic.artio.engine.logger.ReplayIndex.sessionIndex(ReplayIndex.java:475) at uk.co.real_logic.artio.engine.logger.ReplayIndex.onFixMessage(ReplayIndex.java:417) at uk.co.real_logic.artio.engine.logger.ReplayIndex.onFragment(ReplayIndex.java:287) at uk.co.real_logic.artio.engine.logger.ReplayIndex.onFragment(ReplayIndex.java:253) at uk.co.real_logic.artio.engine.logger.Indexer.onFragment(Indexer.java:180) at io.aeron.Image.controlledPoll(Image.java:369) at io.aeron.Subscription.controlledPoll(Subscription.java:240) at uk.co.real_logic.artio.engine.logger.Indexer.doWork(Indexer.java:78) at org.agrona.concurrent.CompositeAgent.doWork(CompositeAgent.java:120) at org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304) at org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296) at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162) ``` It appears `uk.co.real_logic.artio.engine.logger.ReplayIndex.SessionIndex#reset` loops over `segmentBufferFiles` and deletes them on a sequence reset; however, files are added to `segmentBufferFiles` lazily in `onRecord`. Therefore, it will only delete all segments files only when the indexer has written records to _all_ segments since it started. One of the tests I've added (the ignored one) demonstrates this behaviour. --- .../artio/engine/logger/AbstractLogTest.java | 2 +- .../artio/engine/logger/ReplayIndexTest.java | 118 ++++++++++++++++-- 2 files changed, 109 insertions(+), 11 deletions(-) diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/AbstractLogTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/AbstractLogTest.java index 14616681d4..b3fbbdd6c0 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/AbstractLogTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/AbstractLogTest.java @@ -127,7 +127,7 @@ protected void bufferContainsTestRequest(final int sequenceNumber) SESSION_ID, sequenceNumber, SEQUENCE_INDEX, testRequestEncoder, header, TestRequestDecoder.MESSAGE_TYPE); } - private void bufferContainsMessage( + protected void bufferContainsMessage( final long sessionId, final int sequenceNumber, final int sequenceIndex, diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayIndexTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayIndexTest.java index fa685adada..87e7189dc6 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayIndexTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayIndexTest.java @@ -24,6 +24,15 @@ import io.aeron.archive.codecs.SourceLocation; import io.aeron.logbuffer.ControlledFragmentHandler; import io.aeron.logbuffer.Header; +import uk.co.real_logic.artio.CommonConfiguration; +import uk.co.real_logic.artio.TestFixtures; +import uk.co.real_logic.artio.dictionary.generation.Exceptions; +import uk.co.real_logic.artio.engine.EngineConfiguration; +import uk.co.real_logic.artio.engine.SequenceNumberExtractor; +import uk.co.real_logic.artio.messages.FixPProtocolType; +import uk.co.real_logic.artio.messages.ResetSequenceNumberEncoder; +import uk.co.real_logic.artio.protocol.GatewayPublication; +import uk.co.real_logic.artio.session.Session; import org.agrona.DirectBuffer; import org.agrona.ErrorHandler; import org.agrona.IoUtil; @@ -33,16 +42,9 @@ import org.agrona.concurrent.status.AtomicCounter; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; -import uk.co.real_logic.artio.CommonConfiguration; -import uk.co.real_logic.artio.TestFixtures; -import uk.co.real_logic.artio.dictionary.generation.Exceptions; -import uk.co.real_logic.artio.engine.EngineConfiguration; -import uk.co.real_logic.artio.engine.SequenceNumberExtractor; -import uk.co.real_logic.artio.messages.FixPProtocolType; -import uk.co.real_logic.artio.protocol.GatewayPublication; -import uk.co.real_logic.artio.session.Session; import java.io.File; import java.nio.ByteBuffer; @@ -50,14 +52,18 @@ import static io.aeron.Aeron.NULL_VALUE; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasEntry; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; import static uk.co.real_logic.artio.CommonConfiguration.DEFAULT_INBOUND_MAX_CLAIM_ATTEMPTS; import static uk.co.real_logic.artio.LogTag.REPLAY; +import static uk.co.real_logic.artio.TestFixtures.aeronArchiveContext; import static uk.co.real_logic.artio.TestFixtures.cleanupMediaDriver; import static uk.co.real_logic.artio.TestFixtures.largeTestReqId; -import static uk.co.real_logic.artio.TestFixtures.aeronArchiveContext; import static uk.co.real_logic.artio.engine.EngineConfiguration.*; import static uk.co.real_logic.artio.engine.logger.Replayer.MOST_RECENT_MESSAGE; @@ -403,6 +409,93 @@ public void shouldQueryStartPositionsInPresenceOfDuplicateSequenceIndices() assertEquals(position3, startPositions.get(recordingId)); } + @Test(timeout = 20_000L) + public void shouldDeleteIndexSegmentsOnSequenceResetWhenNoSegmentsHaveBeenTouched() + { + int seqNo = 1; + for (int i = 0; i < DEFAULT_REPLAY_INDEX_RECORD_CAPACITY; i++) + { + indexExampleMessage(SESSION_ID, seqNo++, SEQUENCE_INDEX); + } + + assertTrue(logFile(SESSION_ID).exists()); + + assertTrue(segmentFile(SESSION_ID, 0).exists()); + assertTrue(segmentFile(SESSION_ID, 1).exists()); + assertTrue(segmentFile(SESSION_ID, 2).exists()); + assertTrue(segmentFile(SESSION_ID, 3).exists()); + + assertFalse(segmentFile(SESSION_ID, 4).exists()); + + replayIndex.close(); + + newReplayIndex(); + + resetSequenceNumber(); + + assertFalse(logFile(SESSION_ID).exists()); + + assertFalse(segmentFile(SESSION_ID, 0).exists()); + assertFalse(segmentFile(SESSION_ID, 1).exists()); + assertFalse(segmentFile(SESSION_ID, 2).exists()); + assertFalse(segmentFile(SESSION_ID, 3).exists()); + } + + @Test(timeout = 20_000L) + @Ignore + public void shouldDeleteIndexSegmentsOnSequenceResetWhenASegmentHasBeenTouched() + { + int seqNo = 1; + for (int i = 0; i < DEFAULT_REPLAY_INDEX_RECORD_CAPACITY; i++) + { + indexExampleMessage(SESSION_ID, seqNo++, SEQUENCE_INDEX); + } + + assertTrue(logFile(SESSION_ID).exists()); + + assertTrue(segmentFile(SESSION_ID, 0).exists()); + assertTrue(segmentFile(SESSION_ID, 1).exists()); + assertTrue(segmentFile(SESSION_ID, 2).exists()); + assertTrue(segmentFile(SESSION_ID, 3).exists()); + + assertFalse(segmentFile(SESSION_ID, 4).exists()); + + replayIndex.close(); + + newReplayIndex(); + + indexExampleMessage(SESSION_ID, seqNo++, SEQUENCE_INDEX); + + resetSequenceNumber(); + + assertFalse(logFile(SESSION_ID).exists()); + + assertFalse(segmentFile(SESSION_ID, 0).exists()); + assertFalse(segmentFile(SESSION_ID, 1).exists()); + assertFalse(segmentFile(SESSION_ID, 2).exists()); + assertFalse(segmentFile(SESSION_ID, 3).exists()); + } + + private void resetSequenceNumber() + { + final int offset = START; + + final ResetSequenceNumberEncoder resetEncoder = new ResetSequenceNumberEncoder(); + resetEncoder + .wrapAndApplyHeader(buffer, offset, header) + .session(SESSION_ID); + + final int limit = resetEncoder.limit(); + final int length = limit - offset; + + while (publication.offer(buffer, offset, length) <= 0) + { + Thread.yield(); + } + + indexRecord(); + } + private void captureRecordingIds() { final int recordingCount = captureRecordingId(); @@ -471,6 +564,11 @@ private File logFile(final long sessionId) return ReplayIndexDescriptor.replayIndexHeaderFile(DEFAULT_LOG_FILE_DIR, sessionId, STREAM_ID); } + private File segmentFile(final long sessionId, final int segmentIndex) + { + return ReplayIndexDescriptor.replayIndexSegmentFile(DEFAULT_LOG_FILE_DIR, sessionId, STREAM_ID, segmentIndex); + } + private void indexRecord() { indexRecord(1);