From 3a55613bd569eaf659e87bdffbf0da0d12f07e44 Mon Sep 17 00:00:00 2001 From: Luciano Viana Date: Mon, 23 Sep 2024 17:09:49 +0200 Subject: [PATCH] race condition on AbstractGatewayToGatewaySystemTest for Session obj shared accross different threads --- .../AbstractGatewayToGatewaySystemTest.java | 157 ++++++++++++------ 1 file changed, 103 insertions(+), 54 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java index d170db0575..5f667a322a 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java @@ -197,13 +197,17 @@ void assertOriginalLibraryDoesNotReceiveMessages(final int initiatorMessageCount void assertSequenceFromInitToAcceptAt(final int expectedInitToAccSeqNum, final int expectedAccToInitSeqNum) { - assertEquals(expectedInitToAccSeqNum, initiatingSession.lastSentMsgSeqNum()); - assertEquals(expectedInitToAccSeqNum, acceptingSession.lastReceivedMsgSeqNum()); + synchronized (LOCK) + { + + assertEquals(expectedInitToAccSeqNum, initiatingSession.lastSentMsgSeqNum()); + assertEquals(expectedInitToAccSeqNum, acceptingSession.lastReceivedMsgSeqNum()); - awaitMessage(expectedAccToInitSeqNum, initiatingSession); + awaitMessage(expectedAccToInitSeqNum, initiatingSession); - assertEquals(expectedAccToInitSeqNum, initiatingSession.lastReceivedMsgSeqNum()); - assertEquals(expectedAccToInitSeqNum, acceptingSession.lastSentMsgSeqNum()); + assertEquals(expectedAccToInitSeqNum, initiatingSession.lastReceivedMsgSeqNum()); + assertEquals(expectedAccToInitSeqNum, acceptingSession.lastSentMsgSeqNum()); + } } void awaitMessage(final int sequenceNumber, final Session session) @@ -222,7 +226,11 @@ void disconnectSessions() long logoutAcceptingSession() { - return logoutSession(testSystem, acceptingSession); + synchronized (LOCK) + { + + return logoutSession(testSystem, acceptingSession); + } } void logoutInitiatingSession() @@ -232,16 +240,20 @@ void logoutInitiatingSession() void assertSessionsDisconnected() { - assertSessionDisconnected(initiatingSession); - assertSessionDisconnected(acceptingSession); + synchronized (LOCK) + { - assertEventuallyTrue("libraries receive disconnect messages", - () -> - { - testSystem.poll(); - assertNotSession(acceptingHandler, acceptingSession); - assertNotSession(initiatingHandler, initiatingSession); - }); + assertSessionDisconnected(initiatingSession); + assertSessionDisconnected(acceptingSession); + + assertEventuallyTrue("libraries receive disconnect messages", + () -> + { + testSystem.poll(); + assertNotSession(acceptingHandler, acceptingSession); + assertNotSession(initiatingHandler, initiatingSession); + }); + } } void sessionNoLongerManaged(final FakeHandler handler, final Session session) @@ -277,12 +289,17 @@ void acquireAcceptingSession() void acquireAcceptingSession(final String initiatorId) { - final long sessionId = acceptingHandler.awaitSessionId(testSystem::poll); + synchronized (LOCK) + { + + final long sessionId = acceptingHandler.awaitSessionId(testSystem::poll); + + acceptingSession = acquireSession(acceptingHandler, acceptingLibrary, sessionId, testSystem); + assertEquals(initiatorId, acceptingHandler.lastInitiatorCompId()); + assertEquals(ACCEPTOR_ID, acceptingHandler.lastAcceptorCompId()); + assertNotNull("unable to acquire accepting session", acceptingSession); + } - acceptingSession = acquireSession(acceptingHandler, acceptingLibrary, sessionId, testSystem); - assertEquals(initiatorId, acceptingHandler.lastInitiatorCompId()); - assertEquals(ACCEPTOR_ID, acceptingHandler.lastAcceptorCompId()); - assertNotNull("unable to acquire accepting session", acceptingSession); } void connectSessions() @@ -347,8 +364,12 @@ FixMessage assertMessageResent(final int sequenceNumber, final String msgType, f int acceptorSendsResendRequest() { - final int seqNum = acceptingSession.lastReceivedMsgSeqNum(); - return acceptorSendsResendRequest(seqNum); + synchronized (LOCK) + { + + final int seqNum = acceptingSession.lastReceivedMsgSeqNum(); + return acceptorSendsResendRequest(seqNum); + } } int acceptorSendsResendRequest(final int seqNum) @@ -366,16 +387,20 @@ void acceptorSendsResendRequest(final int beginSeqNo, final int endSeqNo) void sendResendRequest( final int beginSeqNo, final int endSeqNo, final FakeOtfAcceptor otfAcceptor, final Session session) { - final ResendRequestEncoder resendRequest = new ResendRequestEncoder() - .beginSeqNo(beginSeqNo) - .endSeqNo(endSeqNo); + synchronized (LOCK) + { - otfAcceptor.messages().clear(); + final ResendRequestEncoder resendRequest = new ResendRequestEncoder() + .beginSeqNo(beginSeqNo) + .endSeqNo(endSeqNo); - while (session.trySend(resendRequest) < 0) - { - Thread.yield(); - testSystem.poll(); + otfAcceptor.messages().clear(); + + while (session.trySend(resendRequest) < 0) + { + Thread.yield(); + testSystem.poll(); + } } } @@ -391,7 +416,11 @@ void messagesCanBeExchanged(final Session session) void acceptingMessagesCanBeExchanged() { - messagesCanBeExchanged(acceptingSession, acceptingOtfAcceptor); + synchronized (LOCK) + { + + messagesCanBeExchanged(acceptingSession, acceptingOtfAcceptor); + } } long messagesCanBeExchanged(final Session sendingSession, final FakeOtfAcceptor receivingAcceptor) @@ -431,12 +460,16 @@ void assertSequenceIndicesAre(final int sequenceIndex) void assertAcceptingSessionHasSequenceIndex(final int sequenceIndex) { - if (acceptingSession != null) + synchronized (LOCK) { - assertThat(acceptingSession, hasSequenceIndex(sequenceIndex)); - } - assertEngineSequenceIndexIs(acceptingEngine, sequenceIndex); + if (acceptingSession != null) + { + assertThat(acceptingSession, hasSequenceIndex(sequenceIndex)); + } + + assertEngineSequenceIndexIs(acceptingEngine, sequenceIndex); + } } void assertInitiatingSequenceIndexIs(final int sequenceIndex) @@ -462,19 +495,23 @@ void assertAllMessagesHaveSequenceIndex(final int sequenceIndex) void sessionsCanReconnect() { - acquireAcceptingSession(); + synchronized (LOCK) + { - logoutSession(acceptingSession); - assertSessionsDisconnected(); + acquireAcceptingSession(); + + logoutSession(acceptingSession); + assertSessionsDisconnected(); - assertAllMessagesHaveSequenceIndex(0); - clearMessages(); + assertAllMessagesHaveSequenceIndex(0); + clearMessages(); - wireSessions(); + wireSessions(); - messagesCanBeExchanged(); + messagesCanBeExchanged(); - assertSequenceIndicesAre(1); + assertSequenceIndicesAre(1); + } } void releaseSessionToEngine(final Session session, final FixLibrary library, final FixEngine engine) @@ -744,7 +781,11 @@ void assertInitSeqNum( void assertAccSeqNum( final int lastReceivedMsgSeqNum, final int lastSentMsgSeqNum, final int sequenceIndex) { - assertSeqNum(lastReceivedMsgSeqNum, lastSentMsgSeqNum, sequenceIndex, acceptingSession); + synchronized (LOCK) + { + + assertSeqNum(lastReceivedMsgSeqNum, lastSentMsgSeqNum, sequenceIndex, acceptingSession); + } } void assertSeqNum( @@ -757,8 +798,12 @@ void assertSeqNum( void acceptingEngineHasSessionAndLibraryIsNotified() { - final LibraryDriver driver = LibraryDriver.accepting(testSystem, nanoClock); - engineHasSessionAndLibraryIsNotified(driver, acceptingEngine, acceptingSession); + synchronized (LOCK) + { + + final LibraryDriver driver = LibraryDriver.accepting(testSystem, nanoClock); + engineHasSessionAndLibraryIsNotified(driver, acceptingEngine, acceptingSession); + } } void initiatingEngineHasSessionAndLibraryIsNotified() @@ -823,15 +868,19 @@ void assertAllSessionsOnlyContains(final FixEngine engine, final Session session void assertReplayReceivedMessages() { - final Reply reply = acceptingSession.replayReceivedMessages( - 1, 0, 2, 0, 5_000L); - testSystem.awaitCompletedReplies(reply); + synchronized (LOCK) + { + + final Reply reply = acceptingSession.replayReceivedMessages( + 1, 0, 2, 0, 5_000L); + testSystem.awaitCompletedReplies(reply); - final FixMessage testRequest = acceptingOtfAcceptor - .receivedMessage(TEST_REQUEST_MESSAGE_AS_STR) - .findFirst() - .get(); - assertEquals(CATCHUP_REPLAY, testRequest.status()); + final FixMessage testRequest = acceptingOtfAcceptor + .receivedMessage(TEST_REQUEST_MESSAGE_AS_STR) + .findFirst() + .get(); + assertEquals(CATCHUP_REPLAY, testRequest.status()); + } } void sleep(final int timeInMs)