Skip to content

Commit

Permalink
Add failing test for deletion of index segments on sequence reset.
Browse files Browse the repository at this point in the history
Users have spotted exceptions like the following:

```
java.lang.IllegalStateException: Replay index header file did not exist, but the following segment files did: [/hydra/data/log/replay-index-3-1-1, /hydra/data/log/replay-index-3-1-2]
at uk.co.real_logic.artio.engine.logger.ReplayIndex$SessionIndex.checkSegmentFilesDoNotExist(ReplayIndex.java:561)
at uk.co.real_logic.artio.engine.logger.ReplayIndex$SessionIndex.<init>(ReplayIndex.java:536)
at uk.co.real_logic.artio.engine.logger.ReplayIndex.lambda$new$0(ReplayIndex.java:63)
at org.agrona.collections.Long2ObjectHashMap.computeIfAbsent(Long2ObjectHashMap.java:382)
at uk.co.real_logic.artio.engine.logger.ReplayIndex.sessionIndex(ReplayIndex.java:475)
at uk.co.real_logic.artio.engine.logger.ReplayIndex.onFixMessage(ReplayIndex.java:417)
at uk.co.real_logic.artio.engine.logger.ReplayIndex.onFragment(ReplayIndex.java:287)
at uk.co.real_logic.artio.engine.logger.ReplayIndex.onFragment(ReplayIndex.java:253)
at uk.co.real_logic.artio.engine.logger.Indexer.onFragment(Indexer.java:180)
at io.aeron.Image.controlledPoll(Image.java:369)
at io.aeron.Subscription.controlledPoll(Subscription.java:240)
at uk.co.real_logic.artio.engine.logger.Indexer.doWork(Indexer.java:78)
at org.agrona.concurrent.CompositeAgent.doWork(CompositeAgent.java:120)
at org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
at org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
```

It appears
`uk.co.real_logic.artio.engine.logger.ReplayIndex.SessionIndex#reset`
loops over `segmentBufferFiles` and deletes them on a sequence reset;
however, files are added to `segmentBufferFiles` lazily in `onRecord`.
Therefore, it will only delete all segments files only when the indexer
has written records to _all_ segments since it started.

One of the tests I've added (the ignored one) demonstrates this
behaviour.
  • Loading branch information
ZachBray committed Apr 2, 2024
1 parent d179ab9 commit ac2f5d4
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected void bufferContainsTestRequest(final int sequenceNumber)
SESSION_ID, sequenceNumber, SEQUENCE_INDEX, testRequestEncoder, header, TestRequestDecoder.MESSAGE_TYPE);
}

private void bufferContainsMessage(
protected void bufferContainsMessage(
final long sessionId,
final int sequenceNumber,
final int sequenceIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import uk.co.real_logic.artio.CommonConfiguration;
import uk.co.real_logic.artio.TestFixtures;
import uk.co.real_logic.artio.dictionary.generation.Exceptions;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.SequenceNumberExtractor;
import uk.co.real_logic.artio.messages.FixPProtocolType;
import uk.co.real_logic.artio.messages.ResetSequenceNumberEncoder;
import uk.co.real_logic.artio.protocol.GatewayPublication;
import uk.co.real_logic.artio.session.Session;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.IoUtil;
Expand All @@ -33,31 +42,28 @@
import org.agrona.concurrent.status.AtomicCounter;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import uk.co.real_logic.artio.CommonConfiguration;
import uk.co.real_logic.artio.TestFixtures;
import uk.co.real_logic.artio.dictionary.generation.Exceptions;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.SequenceNumberExtractor;
import uk.co.real_logic.artio.messages.FixPProtocolType;
import uk.co.real_logic.artio.protocol.GatewayPublication;
import uk.co.real_logic.artio.session.Session;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.stream.IntStream;

import static io.aeron.Aeron.NULL_VALUE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
import static uk.co.real_logic.artio.CommonConfiguration.DEFAULT_INBOUND_MAX_CLAIM_ATTEMPTS;
import static uk.co.real_logic.artio.LogTag.REPLAY;
import static uk.co.real_logic.artio.TestFixtures.aeronArchiveContext;
import static uk.co.real_logic.artio.TestFixtures.cleanupMediaDriver;
import static uk.co.real_logic.artio.TestFixtures.largeTestReqId;
import static uk.co.real_logic.artio.TestFixtures.aeronArchiveContext;
import static uk.co.real_logic.artio.engine.EngineConfiguration.*;
import static uk.co.real_logic.artio.engine.logger.Replayer.MOST_RECENT_MESSAGE;

Expand Down Expand Up @@ -403,6 +409,93 @@ public void shouldQueryStartPositionsInPresenceOfDuplicateSequenceIndices()
assertEquals(position3, startPositions.get(recordingId));
}

@Test(timeout = 20_000L)
public void shouldDeleteIndexSegmentsOnSequenceResetWhenNoSegmentsHaveBeenTouched()
{
int seqNo = 1;
for (int i = 0; i < DEFAULT_REPLAY_INDEX_RECORD_CAPACITY; i++)
{
indexExampleMessage(SESSION_ID, seqNo++, SEQUENCE_INDEX);
}

assertTrue(logFile(SESSION_ID).exists());

assertTrue(segmentFile(SESSION_ID, 0).exists());
assertTrue(segmentFile(SESSION_ID, 1).exists());
assertTrue(segmentFile(SESSION_ID, 2).exists());
assertTrue(segmentFile(SESSION_ID, 3).exists());

assertFalse(segmentFile(SESSION_ID, 4).exists());

replayIndex.close();

newReplayIndex();

resetSequenceNumber();

assertFalse(logFile(SESSION_ID).exists());

assertFalse(segmentFile(SESSION_ID, 0).exists());
assertFalse(segmentFile(SESSION_ID, 1).exists());
assertFalse(segmentFile(SESSION_ID, 2).exists());
assertFalse(segmentFile(SESSION_ID, 3).exists());
}

@Test(timeout = 20_000L)
@Ignore
public void shouldDeleteIndexSegmentsOnSequenceResetWhenASegmentHasBeenTouched()
{
int seqNo = 1;
for (int i = 0; i < DEFAULT_REPLAY_INDEX_RECORD_CAPACITY; i++)
{
indexExampleMessage(SESSION_ID, seqNo++, SEQUENCE_INDEX);
}

assertTrue(logFile(SESSION_ID).exists());

assertTrue(segmentFile(SESSION_ID, 0).exists());
assertTrue(segmentFile(SESSION_ID, 1).exists());
assertTrue(segmentFile(SESSION_ID, 2).exists());
assertTrue(segmentFile(SESSION_ID, 3).exists());

assertFalse(segmentFile(SESSION_ID, 4).exists());

replayIndex.close();

newReplayIndex();

indexExampleMessage(SESSION_ID, seqNo++, SEQUENCE_INDEX);

resetSequenceNumber();

assertFalse(logFile(SESSION_ID).exists());

assertFalse(segmentFile(SESSION_ID, 0).exists());
assertFalse(segmentFile(SESSION_ID, 1).exists());
assertFalse(segmentFile(SESSION_ID, 2).exists());
assertFalse(segmentFile(SESSION_ID, 3).exists());
}

private void resetSequenceNumber()
{
final int offset = START;

final ResetSequenceNumberEncoder resetEncoder = new ResetSequenceNumberEncoder();
resetEncoder
.wrapAndApplyHeader(buffer, offset, header)
.session(SESSION_ID);

final int limit = resetEncoder.limit();
final int length = limit - offset;

while (publication.offer(buffer, offset, length) <= 0)
{
Thread.yield();
}

indexRecord();
}

private void captureRecordingIds()
{
final int recordingCount = captureRecordingId();
Expand Down Expand Up @@ -471,6 +564,11 @@ private File logFile(final long sessionId)
return ReplayIndexDescriptor.replayIndexHeaderFile(DEFAULT_LOG_FILE_DIR, sessionId, STREAM_ID);
}

private File segmentFile(final long sessionId, final int segmentIndex)
{
return ReplayIndexDescriptor.replayIndexSegmentFile(DEFAULT_LOG_FILE_DIR, sessionId, STREAM_ID, segmentIndex);
}

private void indexRecord()
{
indexRecord(1);
Expand Down

0 comments on commit ac2f5d4

Please sign in to comment.