Skip to content

Commit

Permalink
Fix scenario in framer where initiator disconnects before follower se…
Browse files Browse the repository at this point in the history
…ssion in library can respond to logon leading to sequence index bump
  • Loading branch information
marc-adaptive committed Aug 29, 2024
1 parent 0147841 commit 4513120
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -830,17 +830,14 @@ void assertReplayReceivedMessages()

void sleep(final int timeInMs)
{
testSystem.awaitBlocking(() ->
try
{
try
{
Thread.sleep(timeInMs);
}
catch (final InterruptedException e)
{
e.printStackTrace();
}
});
Thread.sleep(timeInMs);
}
catch (final InterruptedException e)
{
e.printStackTrace();
}
}

void assertResendsCompleted(final int count, final Matcher<Iterable<Integer>> items)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ public void shouldSupportLogonBasedSequenceNumberResetWithMessagesSentBeforeLogo
public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse()
throws IOException
{
setup(true, true);
setup(false, true);
setupLibrary();

final List<SessionInfo> noSessionContext = engine.allSessions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import uk.co.real_logic.artio.builder.*;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.FixEngine;
import uk.co.real_logic.artio.engine.SessionInfo;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.fields.UtcTimestampEncoder;
import uk.co.real_logic.artio.library.DynamicLibraryScheduler;
import uk.co.real_logic.artio.messages.DisconnectReason;
Expand All @@ -42,6 +44,7 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
Expand Down Expand Up @@ -1015,6 +1018,69 @@ private void connectPersistingSessions()
connectPersistingSessions(AUTOMATIC_INITIAL_SEQUENCE_NUMBER, false);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse()
throws IOException
{
launch(this::nothing);

final List<SessionInfo> noSessionContext = acceptingEngine.allSessions();
assertEquals(0, noSessionContext.size());

final SessionWriter sessionWriter = createFollowerSession(
TEST_TIMEOUT_IN_MS, testSystem, acceptingLibrary, INITIATOR_ID, ACCEPTOR_ID);
final SessionReplyStatus requestSessionReply = requestSession(acceptingLibrary, sessionWriter.id(), testSystem);
assertEquals(SessionReplyStatus.OK, requestSessionReply);

try (FixConnection connection = FixConnection.initiate(port))
{
connection.logon(true);
Timing.assertEventuallyTrue("Library did not transition session to connected",
() ->
{
acceptingLibrary.poll(1);
final List<Session> sessions = acceptingLibrary.sessions();
return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED;
}
);
}

Timing.assertEventuallyTrue("Fix connection was not disconnected",
() ->
{
final Reply<List<LibraryInfo>> libraryReply = acceptingEngine.libraries();
while (!libraryReply.hasCompleted())
{
sleep(500);
}

final List<LibraryInfo> allLibraryInfo = libraryReply.resultIfPresent();
for (final LibraryInfo libraryInfo : allLibraryInfo) {
if (libraryInfo.libraryId() == acceptingLibrary.libraryId()) {
return libraryInfo.sessions().isEmpty();
}
}
return false;
}
);

Timing.assertEventuallyTrue("Library did not transition session to active",
() ->
{
acceptingLibrary.poll(1);
final List<Session> sessions = acceptingLibrary.sessions();
return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE;
}
);

assertEngineSubscriptionCaughtUpToLibraryPublication(
testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), acceptingEngine, acceptingLibrary);

final List<SessionInfo> sessionContextAfterLogonNoSenderEndpoint = acceptingEngine.allSessions();
assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size());
assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex());
}

private void resetSequenceNumbers()
{
testSystem.resetSequenceNumber(initiatingEngine, initiatingSession.id());
Expand Down

0 comments on commit 4513120

Please sign in to comment.