Skip to content

Commit

Permalink
race condition on AbstractGatewayToGatewaySystemTest for Session obj …
Browse files Browse the repository at this point in the history
…shared accross different threads
  • Loading branch information
lucianoviana committed Sep 25, 2024
1 parent 609bb44 commit 3a55613
Showing 1 changed file with 103 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,17 @@ void assertOriginalLibraryDoesNotReceiveMessages(final int initiatorMessageCount

void assertSequenceFromInitToAcceptAt(final int expectedInitToAccSeqNum, final int expectedAccToInitSeqNum)
{
assertEquals(expectedInitToAccSeqNum, initiatingSession.lastSentMsgSeqNum());
assertEquals(expectedInitToAccSeqNum, acceptingSession.lastReceivedMsgSeqNum());
synchronized (LOCK)
{

assertEquals(expectedInitToAccSeqNum, initiatingSession.lastSentMsgSeqNum());
assertEquals(expectedInitToAccSeqNum, acceptingSession.lastReceivedMsgSeqNum());

awaitMessage(expectedAccToInitSeqNum, initiatingSession);
awaitMessage(expectedAccToInitSeqNum, initiatingSession);

assertEquals(expectedAccToInitSeqNum, initiatingSession.lastReceivedMsgSeqNum());
assertEquals(expectedAccToInitSeqNum, acceptingSession.lastSentMsgSeqNum());
assertEquals(expectedAccToInitSeqNum, initiatingSession.lastReceivedMsgSeqNum());
assertEquals(expectedAccToInitSeqNum, acceptingSession.lastSentMsgSeqNum());
}
}

void awaitMessage(final int sequenceNumber, final Session session)
Expand All @@ -222,7 +226,11 @@ void disconnectSessions()

long logoutAcceptingSession()
{
return logoutSession(testSystem, acceptingSession);
synchronized (LOCK)
{

return logoutSession(testSystem, acceptingSession);
}
}

void logoutInitiatingSession()
Expand All @@ -232,16 +240,20 @@ void logoutInitiatingSession()

void assertSessionsDisconnected()
{
assertSessionDisconnected(initiatingSession);
assertSessionDisconnected(acceptingSession);
synchronized (LOCK)
{

assertEventuallyTrue("libraries receive disconnect messages",
() ->
{
testSystem.poll();
assertNotSession(acceptingHandler, acceptingSession);
assertNotSession(initiatingHandler, initiatingSession);
});
assertSessionDisconnected(initiatingSession);
assertSessionDisconnected(acceptingSession);

assertEventuallyTrue("libraries receive disconnect messages",
() ->
{
testSystem.poll();
assertNotSession(acceptingHandler, acceptingSession);
assertNotSession(initiatingHandler, initiatingSession);
});
}
}

void sessionNoLongerManaged(final FakeHandler handler, final Session session)
Expand Down Expand Up @@ -277,12 +289,17 @@ void acquireAcceptingSession()

void acquireAcceptingSession(final String initiatorId)
{
final long sessionId = acceptingHandler.awaitSessionId(testSystem::poll);
synchronized (LOCK)
{

final long sessionId = acceptingHandler.awaitSessionId(testSystem::poll);

acceptingSession = acquireSession(acceptingHandler, acceptingLibrary, sessionId, testSystem);
assertEquals(initiatorId, acceptingHandler.lastInitiatorCompId());
assertEquals(ACCEPTOR_ID, acceptingHandler.lastAcceptorCompId());
assertNotNull("unable to acquire accepting session", acceptingSession);
}

acceptingSession = acquireSession(acceptingHandler, acceptingLibrary, sessionId, testSystem);
assertEquals(initiatorId, acceptingHandler.lastInitiatorCompId());
assertEquals(ACCEPTOR_ID, acceptingHandler.lastAcceptorCompId());
assertNotNull("unable to acquire accepting session", acceptingSession);
}

void connectSessions()
Expand Down Expand Up @@ -347,8 +364,12 @@ FixMessage assertMessageResent(final int sequenceNumber, final String msgType, f

int acceptorSendsResendRequest()
{
final int seqNum = acceptingSession.lastReceivedMsgSeqNum();
return acceptorSendsResendRequest(seqNum);
synchronized (LOCK)
{

final int seqNum = acceptingSession.lastReceivedMsgSeqNum();
return acceptorSendsResendRequest(seqNum);
}
}

int acceptorSendsResendRequest(final int seqNum)
Expand All @@ -366,16 +387,20 @@ void acceptorSendsResendRequest(final int beginSeqNo, final int endSeqNo)
void sendResendRequest(
final int beginSeqNo, final int endSeqNo, final FakeOtfAcceptor otfAcceptor, final Session session)
{
final ResendRequestEncoder resendRequest = new ResendRequestEncoder()
.beginSeqNo(beginSeqNo)
.endSeqNo(endSeqNo);
synchronized (LOCK)
{

otfAcceptor.messages().clear();
final ResendRequestEncoder resendRequest = new ResendRequestEncoder()
.beginSeqNo(beginSeqNo)
.endSeqNo(endSeqNo);

while (session.trySend(resendRequest) < 0)
{
Thread.yield();
testSystem.poll();
otfAcceptor.messages().clear();

while (session.trySend(resendRequest) < 0)
{
Thread.yield();
testSystem.poll();
}
}
}

Expand All @@ -391,7 +416,11 @@ void messagesCanBeExchanged(final Session session)

void acceptingMessagesCanBeExchanged()
{
messagesCanBeExchanged(acceptingSession, acceptingOtfAcceptor);
synchronized (LOCK)
{

messagesCanBeExchanged(acceptingSession, acceptingOtfAcceptor);
}
}

long messagesCanBeExchanged(final Session sendingSession, final FakeOtfAcceptor receivingAcceptor)
Expand Down Expand Up @@ -431,12 +460,16 @@ void assertSequenceIndicesAre(final int sequenceIndex)

void assertAcceptingSessionHasSequenceIndex(final int sequenceIndex)
{
if (acceptingSession != null)
synchronized (LOCK)
{
assertThat(acceptingSession, hasSequenceIndex(sequenceIndex));
}

assertEngineSequenceIndexIs(acceptingEngine, sequenceIndex);
if (acceptingSession != null)
{
assertThat(acceptingSession, hasSequenceIndex(sequenceIndex));
}

assertEngineSequenceIndexIs(acceptingEngine, sequenceIndex);
}
}

void assertInitiatingSequenceIndexIs(final int sequenceIndex)
Expand All @@ -462,19 +495,23 @@ void assertAllMessagesHaveSequenceIndex(final int sequenceIndex)

void sessionsCanReconnect()
{
acquireAcceptingSession();
synchronized (LOCK)
{

logoutSession(acceptingSession);
assertSessionsDisconnected();
acquireAcceptingSession();

logoutSession(acceptingSession);
assertSessionsDisconnected();

assertAllMessagesHaveSequenceIndex(0);
clearMessages();
assertAllMessagesHaveSequenceIndex(0);
clearMessages();

wireSessions();
wireSessions();

messagesCanBeExchanged();
messagesCanBeExchanged();

assertSequenceIndicesAre(1);
assertSequenceIndicesAre(1);
}
}

void releaseSessionToEngine(final Session session, final FixLibrary library, final FixEngine engine)
Expand Down Expand Up @@ -744,7 +781,11 @@ void assertInitSeqNum(
void assertAccSeqNum(
final int lastReceivedMsgSeqNum, final int lastSentMsgSeqNum, final int sequenceIndex)
{
assertSeqNum(lastReceivedMsgSeqNum, lastSentMsgSeqNum, sequenceIndex, acceptingSession);
synchronized (LOCK)
{

assertSeqNum(lastReceivedMsgSeqNum, lastSentMsgSeqNum, sequenceIndex, acceptingSession);
}
}

void assertSeqNum(
Expand All @@ -757,8 +798,12 @@ void assertSeqNum(

void acceptingEngineHasSessionAndLibraryIsNotified()
{
final LibraryDriver driver = LibraryDriver.accepting(testSystem, nanoClock);
engineHasSessionAndLibraryIsNotified(driver, acceptingEngine, acceptingSession);
synchronized (LOCK)
{

final LibraryDriver driver = LibraryDriver.accepting(testSystem, nanoClock);
engineHasSessionAndLibraryIsNotified(driver, acceptingEngine, acceptingSession);
}
}

void initiatingEngineHasSessionAndLibraryIsNotified()
Expand Down Expand Up @@ -823,15 +868,19 @@ void assertAllSessionsOnlyContains(final FixEngine engine, final Session session

void assertReplayReceivedMessages()
{
final Reply<ReplayMessagesStatus> reply = acceptingSession.replayReceivedMessages(
1, 0, 2, 0, 5_000L);
testSystem.awaitCompletedReplies(reply);
synchronized (LOCK)
{

final Reply<ReplayMessagesStatus> reply = acceptingSession.replayReceivedMessages(
1, 0, 2, 0, 5_000L);
testSystem.awaitCompletedReplies(reply);

final FixMessage testRequest = acceptingOtfAcceptor
.receivedMessage(TEST_REQUEST_MESSAGE_AS_STR)
.findFirst()
.get();
assertEquals(CATCHUP_REPLAY, testRequest.status());
final FixMessage testRequest = acceptingOtfAcceptor
.receivedMessage(TEST_REQUEST_MESSAGE_AS_STR)
.findFirst()
.get();
assertEquals(CATCHUP_REPLAY, testRequest.status());
}
}

void sleep(final int timeInMs)
Expand Down

0 comments on commit 3a55613

Please sign in to comment.