Skip to content

Commit

Permalink
Disconnect before library logon (#521)
Browse files Browse the repository at this point in the history
Fix race condition where session disconnect pending a library logon reply can bump sequence index
  • Loading branch information
marc-adaptive authored Sep 5, 2024
1 parent c25f4f0 commit b40b0c7
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public final class EngineStreamInfo
{
private final long inboundIndexSubscriptionRegistrationId;
private final long outboundIndexSubscriptionRegistrationId;
private final long librarySubscriptionRegistrationId;
private final int inboundPublicationSessionId;
private final long inboundPublicationPosition;
private final int outboundPublicationSessionId;
Expand All @@ -12,13 +13,15 @@ public final class EngineStreamInfo
EngineStreamInfo(
final long inboundIndexSubscriptionRegistrationId,
final long outboundIndexSubscriptionRegistrationId,
final long librarySubscriptionRegistrationId,
final int inboundPublicationSessionId,
final long inboundPublicationPosition,
final int outboundPublicationSessionId,
final long outboundPublicationPosition)
{
this.inboundIndexSubscriptionRegistrationId = inboundIndexSubscriptionRegistrationId;
this.outboundIndexSubscriptionRegistrationId = outboundIndexSubscriptionRegistrationId;
this.librarySubscriptionRegistrationId = librarySubscriptionRegistrationId;
this.inboundPublicationSessionId = inboundPublicationSessionId;
this.inboundPublicationPosition = inboundPublicationPosition;
this.outboundPublicationSessionId = outboundPublicationSessionId;
Expand All @@ -35,6 +38,11 @@ public long outboundIndexSubscriptionRegistrationId()
return outboundIndexSubscriptionRegistrationId;
}

public long librarySubscriptionRegistrationId()
{
return librarySubscriptionRegistrationId;
}

public int inboundPublicationSessionId()
{
return inboundPublicationSessionId;
Expand All @@ -60,6 +68,7 @@ public String toString()
return "EngineStreamInfo{" +
"inboundIndexSubscriptionRegistrationId=" + inboundIndexSubscriptionRegistrationId +
", outboundIndexSubscriptionRegistrationId=" + outboundIndexSubscriptionRegistrationId +
", librarySubscriptionRegistrationId=" + librarySubscriptionRegistrationId +
", inboundPublicationSessionId=" + inboundPublicationSessionId +
", inboundPublicationPosition=" + inboundPublicationPosition +
", outboundPublicationSessionId=" + outboundPublicationSessionId +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1715,24 +1715,17 @@ public Action onValidResendRequest(

private void checkOfflineSequenceReset(final long sessionId, final long messageType, final int sequenceIndex)
{
if (messageType == LOGON_MESSAGE_TYPE)
if (messageType == LOGON_MESSAGE_TYPE || messageType == SEQUENCE_RESET_MESSAGE_TYPE)
{
// Always a sequence reset
final Map.Entry<CompositeKey, SessionContext> entry = fixContexts.lookupById(sessionId);
if (entry != null)
{
final SessionContext context = entry.getValue();
context.onSequenceReset(clock.nanoTime());
}
}
else if (messageType == SEQUENCE_RESET_MESSAGE_TYPE)
{
// If it's not a gap-fill it's a sequence reset
final Map.Entry<CompositeKey, SessionContext> entry = fixContexts.lookupById(sessionId);
if (entry != null)
{
final SessionContext context = entry.getValue();
context.onSequenceIndex(clock.nanoTime(), sequenceIndex);
final int currentSequenceIndex = context.sequenceIndex();
if (sequenceIndex > currentSequenceIndex)
{
context.onSequenceIndex(clock.nanoTime(), sequenceIndex);
}
}
}
}
Expand Down Expand Up @@ -3502,6 +3495,7 @@ public void onEngineStreamInfoRequest(final EngineStreamInfoRequestCommand comma
command.complete(new EngineStreamInfo(
inboundIndexRegistrationId,
outboundIndexRegistrationId,
librarySubscription.registrationId(),
inboundPublication.sessionId(),
inboundPublication.position(),
outboundPublication.sessionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
import uk.co.real_logic.artio.builder.*;
import uk.co.real_logic.artio.decoder.*;
import uk.co.real_logic.artio.engine.SessionInfo;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.library.FixLibrary;
import uk.co.real_logic.artio.messages.DisconnectReason;
import uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner;
import uk.co.real_logic.artio.messages.SessionReplyStatus;
import uk.co.real_logic.artio.messages.ThrottleConfigurationStatus;
import uk.co.real_logic.artio.messages.*;
import uk.co.real_logic.artio.session.Session;
import uk.co.real_logic.artio.session.SessionWriter;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;

import java.io.IOException;
Expand All @@ -55,8 +54,7 @@
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE;
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY;
import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK;
import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.LONG_TEST_TIMEOUT_IN_MS;
import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.TEST_TIMEOUT_IN_MS;
import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.*;
import static uk.co.real_logic.artio.system_tests.FixConnection.BUFFER_SIZE;
import static uk.co.real_logic.artio.system_tests.MessageBasedInitiatorSystemTest.assertConnectionDisconnects;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*;
Expand Down Expand Up @@ -924,6 +922,72 @@ public void shouldDisconnectConnectionTryingToSendOversizedMessage() throws IOEx
}
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse()
throws IOException
{
setup(false, true);
setupLibrary();

final List<SessionInfo> noSessionContext = engine.allSessions();
assertEquals(0, noSessionContext.size());

final SessionWriter sessionWriter = createFollowerSession(
TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID);
final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem);
assertEquals(SessionReplyStatus.OK, requestSessionReply);

try (FixConnection connection = FixConnection.initiate(port))
{
connection.logon(false);
Timing.assertEventuallyTrue("Library did not transition session to connected",
() ->
{
library.poll(1);
final List<Session> sessions = library.sessions();
return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED;
}
);
}

Timing.assertEventuallyTrue("Fix connection was not disconnected",
() ->
{
final Reply<List<LibraryInfo>> libraryReply = engine.libraries();
while (!libraryReply.hasCompleted())
{
sleep(100);
}

final List<LibraryInfo> allLibraryInfo = libraryReply.resultIfPresent();
for (final LibraryInfo libraryInfo : allLibraryInfo)
{
if (libraryInfo.libraryId() == library.libraryId())
{
return libraryInfo.sessions().isEmpty();
}
}
return false;
}
);

Timing.assertEventuallyTrue("Library did not transition session to active",
() ->
{
library.poll(1);
final List<Session> sessions = library.sessions();
return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE;
}
);

assertEngineSubscriptionCaughtUpToLibraryPublication(
testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library);

final List<SessionInfo> sessionContextAfterLogonNoSenderEndpoint = engine.allSessions();
assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size());
assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex());
}

private void assertSell(final ExecutionReportDecoder executionReport)
{
assertEquals(executionReport.toString(), Side.SELL, executionReport.sideAsEnum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,4 +707,55 @@ static void awaitIndexerCaughtUp(
() -> {});
}
}

static void assertEngineSubscriptionCaughtUpToLibraryPublication(
final TestSystem testSystem,
final String aeronDirectoryName,
final FixEngine engine,
final FixLibrary library)
{
final EngineStreamInfo engineStreamInfo =
testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent();

final LibraryStreamInfo libraryStreamInfo = FixLibraryInternals.libraryStreamInfo(library);

final Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(aeronDirectoryName);
try (Aeron aeron = Aeron.connect(aeronCtx))
{
final CountersReader countersReader = aeron.countersReader();

final SubPosMatcher subPosMatcher = new SubPosMatcher(
countersReader,
engineStreamInfo.librarySubscriptionRegistrationId(),
libraryStreamInfo.outboundPublicationSessionId(),
libraryStreamInfo.outboundPublicationPosition());

countersReader.forEach((counterId, typeId, keyBuffer, label) ->
subPosMatcher.tryMatch(counterId, typeId, keyBuffer));

if (!subPosMatcher.hasCounterId())
{
throw new IllegalStateException("did not match counter: " + subPosMatcher);
}

assertEventuallyTrue(
() ->
{
final StringBuilder builder = new StringBuilder();
builder.append("expected sub-pos counters:\n");
builder.append(subPosMatcher).append('\n');
builder.append("\nbut counters were:\n");
countersReader.forEach((value, counterId, label) ->
builder.append(String.format("%d: %d - %s%n", counterId, value, label)));
return builder.toString();
},
() ->
{
testSystem.poll();
return subPosMatcher.isCaughtUp();
},
DEFAULT_TIMEOUT_IN_MS,
() -> {});
}
}
}

0 comments on commit b40b0c7

Please sign in to comment.