Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing flaky tests #517

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public class AbstractBinaryEntryPointSystemTest
public static final long TEST_TIMEOUT_IN_MS = 20_000L;

static final int AWAIT_TIMEOUT_IN_MS = 10_000;
static final int TIMEOUT_EPSILON_IN_MS = 10;
static final int TEST_NO_LOGON_DISCONNECT_TIMEOUT_IN_MS = 200;
static final int TIMEOUT_EPSILON_IN_MS = 50;
static final int TEST_NO_LOGON_DISCONNECT_TIMEOUT_IN_MS = 2000;

final EpochNanoClock nanoClock = new OffsetEpochNanoClock();
final int port = unusedPort();
Expand Down Expand Up @@ -152,7 +152,7 @@ void setupJustArtio(
.epochNanoClock(nanoClock)
.senderMaxBytesInBuffer(senderMaxBytesInBuffer);

engineConfig.errorHandlerFactory(ffs -> Throwable::printStackTrace);
engineConfig.errorHandlerFactory(ffs -> Throwable::printStackTrace).printAeronStreamIdentifiers(true);
engineConfig.acceptorFixPKeepaliveTimeoutInMs(artioKeepAliveIntervalInMs);
configureAeronArchive(engineConfig.aeronArchiveContext());

Expand Down Expand Up @@ -192,7 +192,7 @@ FixLibrary launchLibrary(
.fixPAcceptedSessionMaxRetransmissionRange(fixPAcceptedSessionMaxRetransmissionRange)
.epochNanoClock(nanoClock);

libraryConfig.acceptorFixPKeepaliveTimeoutInMs(artioKeepAliveIntervalInMs);
libraryConfig.acceptorFixPKeepaliveTimeoutInMs(artioKeepAliveIntervalInMs).printAeronStreamIdentifiers(true);

if (!printErrors)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import uk.co.real_logic.artio.session.Session;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
Expand All @@ -65,6 +66,7 @@
import static uk.co.real_logic.artio.system_tests.BinaryEntryPointClient.*;
import static uk.co.real_logic.artio.system_tests.FakeBinaryEntrypointConnectionHandler.sendExecutionReportNew;
import static uk.co.real_logic.artio.system_tests.FakeFixPConnectionExistsHandler.requestSession;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.awaitIndexerCaughtUp;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiate;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.libraries;

Expand Down Expand Up @@ -402,13 +404,15 @@ public void shouldDisconnectIfNegotiateTimeout() throws IOException
{
setupArtio(TEST_NO_LOGON_DISCONNECT_TIMEOUT_IN_MS, 1);

final long timeInMs = System.currentTimeMillis();
// taking from the same clock used by the framer which gives more accuracy
final long timeInNs = nanoClock.nanoTime();
try (BinaryEntryPointClient client = newClient())
{
client.assertDisconnected();
final long durationInMs = System.currentTimeMillis() - timeInMs;
final long acceptableLowerBoundInMs = TEST_NO_LOGON_DISCONNECT_TIMEOUT_IN_MS - TIMEOUT_EPSILON_IN_MS;
assertThat(durationInMs, Matchers.greaterThanOrEqualTo(acceptableLowerBoundInMs));
final long acceptableLowerBoundInMs = Duration.ofMillis(TEST_NO_LOGON_DISCONNECT_TIMEOUT_IN_MS)
.minusMillis(TIMEOUT_EPSILON_IN_MS).toNanos();
final long durationInNs = nanoClock.nanoTime() - timeInNs;
assertThat(durationInNs, Matchers.greaterThanOrEqualTo(acceptableLowerBoundInMs));
}

// Test that we can still establish the connection after this
Expand Down Expand Up @@ -446,6 +450,7 @@ public void shouldAcceptReEstablishmentOfSession() throws IOException
reEstablishConnection(2, 2);

restartArtio();

reEstablishConnection(3, 3);
}

Expand Down Expand Up @@ -1333,6 +1338,7 @@ private void shouldPruneAwayOldArchivePositions(

exchangeOverASegmentOfMessages(finishSending);

testSystem.await("connection is still on", () -> !connection.isConnected());
resetOp.reset();

assertPruneWorks();
Expand Down Expand Up @@ -1391,6 +1397,7 @@ private void exchangeOverASegmentOfMessages(final boolean finishSending) throws
acceptorInitiatedFinishSending(client, overASegmentOfMessages);
}
}
assertConnectionDisconnected();
}

// ----------------------------------
Expand Down Expand Up @@ -2186,6 +2193,10 @@ private void withReEstablishedConnection(
final int nextSeqNo = alreadyRecvMsgCount + 1;
client.writeEstablish(nextSeqNo);

// if not this, then sometimes the 'alreadyRecvMsgCount' does not match 'nextSeqNo' below when calling
// readEstablishAck
awaitIndexerCaughtUp(testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library);

libraryAcquiresConnection(client, connectionExistsHandler, connectionAcquiredHandler, offlineOwned);

client.readEstablishAck(nextSeqNo, alreadyRecvMsgCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,15 @@ private void indexRedactSequenceMessage(final long fixMessageToRedactPosition)
publication, mock(AtomicCounter.class), YieldingIdleStrategy.INSTANCE,
mock(EpochNanoClock.class), 1);

final long redactMessagePosition = gatewayPublication.saveRedactSequenceUpdate(
SESSION_ID, SEQUENCE_NUMBER, fixMessageToRedactPosition);
long position = 0L;
while (position < 1)
{
position = gatewayPublication.saveRedactSequenceUpdate(
SESSION_ID, SEQUENCE_NUMBER, fixMessageToRedactPosition);
Thread.yield();
}

indexToPosition(publication.sessionId(), redactMessagePosition);
indexToPosition(publication.sessionId(), position);
}
}

Expand Down Expand Up @@ -427,16 +432,18 @@ public void shouldResetSequenceNumberForSessionAfterRestart()
indexFixMessage();
assertLastKnownSequenceNumberIs(SESSION_ID, SEQUENCE_NUMBER);

bufferContainsExampleMessage(false, SESSION_ID + 1, SEQUENCE_NUMBER + 5,
bufferContainsExampleMessage(false, SESSION_ID_2, SEQUENCE_NUMBER + 5,
SEQUENCE_INDEX);
indexRecord();
assertLastKnownSequenceNumberIs(SESSION_ID + 1, SEQUENCE_NUMBER + 5);
assertLastKnownSequenceNumberIs(SESSION_ID_2, SEQUENCE_NUMBER + 5);
assertLastKnownSequenceNumberIs(SESSION_ID, SEQUENCE_NUMBER);

writer.close();
writer = newWriter(inMemoryBuffer);

resetSequenceNumber(SESSION_ID);
assertLastKnownSequenceNumberIs(SESSION_ID, 0);
assertLastKnownSequenceNumberIs(SESSION_ID_2, SEQUENCE_NUMBER + 5);

// this should write to old session place and not to same as previous call
resetSequenceNumber(SESSION_ID_2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ void launchGatewayToGateway()

final EngineConfiguration initiatingConfig = initiatingConfig(libraryAeronPort, nanoClock);
initiatingConfig.deleteLogFileDirOnStart(true);
initiatingConfig.monitoringAgentFactory(MonitoringAgentFactory.none());
initiatingConfig.monitoringAgentFactory(MonitoringAgentFactory.none()).printAeronStreamIdentifiers(true);
initiatingEngine = FixEngine.launch(initiatingConfig);

final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock);
acceptingLibrary = connect(acceptingLibraryConfig);

final LibraryConfiguration initiatingLibraryConfig = initiatingLibraryConfig(
libraryAeronPort, initiatingHandler, nanoClock);
initiatingLibraryConfig.resendRequestController(fakeResendRequestController);
initiatingLibraryConfig.resendRequestController(fakeResendRequestController).printAeronStreamIdentifiers(true);
initiatingLibrary = connect(initiatingLibraryConfig);
testSystem = new TestSystem(acceptingLibrary, initiatingLibrary);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static uk.co.real_logic.artio.CancelOnDisconnectType.*;
import static uk.co.real_logic.artio.CommonConfiguration.RUNNING_ON_WINDOWS;
import static uk.co.real_logic.artio.Constants.LOGON_MESSAGE_AS_STR;
import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*;

public class CancelOnDisconnectSystemTest extends AbstractGatewayToGatewaySystemTest
{
public static final int COD_TEST_TIMEOUT_IN_MS = 500;
public static final int LONG_COD_TEST_TIMEOUT_IN_MS = RUNNING_ON_WINDOWS ? 3_000 : COD_TEST_TIMEOUT_IN_MS;
public static final int LONG_COD_TEST_TIMEOUT_IN_MS = 3_000;
public static final Class<FixDictionaryImpl> FIX_DICTIONARY_WITHOUT_COD = FixDictionaryImpl.class;
private long now;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

public class EngineAndLibraryIntegrationTest
{
private static final int SHORT_TIMEOUT_IN_MS = 100;
private static final int SHORT_TIMEOUT_IN_MS = 500;

private final EpochNanoClock nanoClock = new OffsetEpochNanoClock();
private ArchivingMediaDriver mediaDriver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void messagesCanBeSentFromInitiatorToAcceptingLibrary()

assertSequenceIndicesAre(0);

testSystem.await("messageTimingHandler.count() is not 2", () -> messageTimingHandler.count() == 2);
messageTimingHandler.verifyConsecutiveSequenceNumbers(2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,10 @@ public void shouldSupportLogonBasedSequenceNumberResetWithMessagesSentBeforeLogo
testSystem.awaitMessageOf(otfAcceptor, EXECUTION_REPORT_MESSAGE_AS_STR,
msg -> msg.messageSequenceNumber() == seqNum2 && msg.sequenceIndex() == 1);

// to ensure the indexes are processed before resend request is received - which would mean a gapfill
// responded rather than a replay msg
awaitIndexerCaughtUp(testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library);

connection.sendResendRequest(2, 2);
testSystem.awaitBlocking(
() -> assertEquals(Side.SELL, connection.readResentExecutionReport(2).sideAsEnum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,15 @@ public void shouldReplayMessageBeforeARestart()
public void shouldNotBeAbleToReplayMessagesFromBeforeReset1()
{
// reset when ReplayIndex instances exist
shouldNotBeAbleToReplayMessagesFromBeforeReset0(() -> {});
shouldNotBeAbleToReplayMessagesFromBeforeReset0(() ->
{
// this is to ensure the logout msg notification is received by the indexer before the session disconnect
// as they both come from different publications
awaitIndexerCaughtUp(testSystem,
mediaDriver.mediaDriver().aeronDirectoryName(),
acceptingEngine,
acceptingLibrary);
});
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final class SystemTestUtil

private static final String HI_ID = "hi";

public static final long TEST_REPLY_TIMEOUT_IN_MS = RUNNING_ON_WINDOWS ? 3_000 : 1_000;
public static final long TEST_REPLY_TIMEOUT_IN_MS = 3_000;

private static final int TEST_COMPACTION_SIZE = 1024 * 1024;

Expand Down Expand Up @@ -261,7 +261,8 @@ static EngineConfiguration initiatingConfig(final int libraryAeronPort, final Ep
.logFileDir(CLIENT_LOGS)
.scheduler(new LowResourceEngineScheduler())
.slowConsumerTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS)
.replyTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS);
.replyTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS)
.printAeronStreamIdentifiers(true);
configuration.epochNanoClock(nanoClock);
configuration.agentNamePrefix("init-");
configureAeronArchive(configuration.aeronArchiveContext());
Expand Down Expand Up @@ -309,7 +310,8 @@ static EngineConfiguration acceptingConfig(
.logFileDir(acceptorLogs)
.scheduler(new LowResourceEngineScheduler())
.slowConsumerTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS)
.replyTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS);
.replyTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS)
.printAeronStreamIdentifiers(true);
}

static String acceptorMonitoringFile(final String countersSuffix)
Expand All @@ -328,7 +330,8 @@ static LibraryConfiguration acceptingLibraryConfig(
.sessionAcquireHandler(sessionHandler)
.libraryAeronChannels(singletonList(IPC_CHANNEL))
.libraryName("accepting")
.replyTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS);
.replyTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS)
.printAeronStreamIdentifiers(true);

return libraryConfiguration;
}
Expand Down Expand Up @@ -411,7 +414,7 @@ static LibraryConfiguration initiatingLibraryConfig(
.libraryAeronChannels(singletonList("aeron:udp?endpoint=localhost:" + libraryAeronPort))
.libraryName("initiating")
.replyTimeoutInMs(TEST_REPLY_TIMEOUT_IN_MS);
config.epochNanoClock(nanoClock);
config.epochNanoClock(nanoClock).printAeronStreamIdentifiers(true);
return config;
}

Expand Down
17 changes: 17 additions & 0 deletions run-build-in-containers.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
docker rm -f artio1
docker rm -f artio2
docker rm -f artio3
docker rm -f artio4
docker build -t artio .
docker run -d --cpus 1 --name artio1 artio
docker run -d --cpus 1 --name artio2 artio
docker run -d --cpus 1 --name artio3 artio
docker run -d --cpus 1 --name artio4 artio

echo
echo 'Execute the following to monitor the logs for test failures :'

echo 'docker logs artio1 -f | grep FAILED'
echo 'docker logs artio2 -f | grep FAILED'
echo 'docker logs artio3 -f | grep FAILED'
echo 'docker logs artio4 -f | grep FAILED'