diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/FixConnection.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/FixConnection.java index f3f88e239e..a8cd2366b2 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/FixConnection.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/FixConnection.java @@ -162,38 +162,50 @@ public FixConnection(final SocketChannel socket, // Can read data public boolean isConnected() { - try + synchronized (LOCK) { - final int read = socket.read(readBuffer); - final boolean isConnected = read != -1; - if (isConnected) + try { - final String ascii = asciiReadBuffer.getAscii(readBuffer.position() - read, read); - DebugLogger.log(FIX_TEST, "< [" + ascii + "] for isConnected()"); - } + final int read = socket.read(readBuffer); + final boolean isConnected = read != -1; - return isConnected; - } - catch (final IOException e) - { - return false; + if (isConnected) + { + final String ascii = asciiReadBuffer.getAscii(readBuffer.position() - read, read); + DebugLogger.log(FIX_TEST, "< [" + ascii + "] for isConnected()"); + } + + return isConnected; + } + catch (final IOException e) + { + return false; + } } } void sendProxyV1Line() { - final int length = writeAsciiBuffer.putAscii( - 0, "PROXY TCP4 " + PROXY_SOURCE_IP + " 192.168.0.11 " + PROXY_SOURCE_PORT + " 443\r\n"); - send(0, length); + synchronized (LOCK) + { + + final int length = writeAsciiBuffer.putAscii( + 0, "PROXY TCP4 " + PROXY_SOURCE_IP + " 192.168.0.11 " + PROXY_SOURCE_PORT + " 443\r\n"); + send(0, length); + } } void sendProxyV1LargestLine() { - final int length = writeAsciiBuffer.putAscii( - 0, "PROXY UNKNOWN " + LARGEST_PROXY_SOURCE_IP + - " ffff:f...f:ffff " + LARGEST_PROXY_SOURCE_PORT + " 65535\r\n"); - send(0, length); + synchronized (LOCK) + { + + final int length = writeAsciiBuffer.putAscii( + 0, "PROXY UNKNOWN " + LARGEST_PROXY_SOURCE_IP + + " ffff:f...f:ffff " + LARGEST_PROXY_SOURCE_PORT + " 65535\r\n"); + send(0, length); + } } void sendProxyV2LineTcpV4() @@ -278,23 +290,31 @@ void sendProxyV2LineTcpV6Localhost() public void sendBytes(final byte[] bytes) { - final int length = bytes.length; - writeAsciiBuffer.putBytes(0, bytes); + synchronized (LOCK) + { - send(0, length); + final int length = bytes.length; + writeAsciiBuffer.putBytes(0, bytes); + + send(0, length); + } } public void sendBytesLarge(final byte[] bytes) { - int offset = 0; - int remaining = bytes.length; - while (remaining > 0) + synchronized (LOCK) { - final int length = Math.min(remaining, BUFFER_SIZE); - writeAsciiBuffer.putBytes(0, bytes, offset, length); - send(0, length); - offset += length; - remaining -= length; + + int offset = 0; + int remaining = bytes.length; + while (remaining > 0) + { + final int length = Math.min(remaining, BUFFER_SIZE); + writeAsciiBuffer.putBytes(0, bytes, offset, length); + send(0, length); + offset += length; + remaining -= length; + } } } @@ -310,53 +330,73 @@ public void logon(final boolean resetSeqNumFlag, final int heartBtIntInS) public void logon(final boolean resetSeqNumFlag, final int heartBtIntInS, final boolean possDupFlag) { - setupHeader(logon.header(), msgSeqNum++, possDupFlag); + synchronized (LOCK) + { + + setupHeader(logon.header(), msgSeqNum++, possDupFlag); - logon - .resetSeqNumFlag(resetSeqNumFlag) - .encryptMethod(0) - .heartBtInt(heartBtIntInS) - .maxMessageSize(9999); + logon + .resetSeqNumFlag(resetSeqNumFlag) + .encryptMethod(0) + .heartBtInt(heartBtIntInS) + .maxMessageSize(9999); - send(logon); + send(logon); + } } public FixConnection msgSeqNum(final int msgSeqNum) { - this.msgSeqNum = msgSeqNum; - return this; + synchronized (LOCK) + { + + this.msgSeqNum = msgSeqNum; + return this; + } } public int acquireMsgSeqNum() { - return this.msgSeqNum++; + synchronized (LOCK) + { + + return this.msgSeqNum++; + } } public void logout() { - setupHeader(logout.header(), msgSeqNum++, false); + synchronized (LOCK) + { + + setupHeader(logout.header(), msgSeqNum++, false); - send(logout); + send(logout); + } } public void setupHeader(final SessionHeaderEncoder header, final int msgSeqNum, final boolean possDupFlag) { - final long timestamp = System.currentTimeMillis(); - final int timestampLength = sendingTimeEncoder.encode(timestamp); - - header - .senderCompID(senderCompID) - .targetCompID(targetCompID) - .msgSeqNum(msgSeqNum) - .sendingTime(sendingTimeEncoder.buffer(), timestampLength); - - if (possDupFlag) + synchronized (LOCK) { - final int origSendingTimeLength = origSendingTimeEncoder.encode(timestamp - 1000); + + final long timestamp = System.currentTimeMillis(); + final int timestampLength = sendingTimeEncoder.encode(timestamp); header - .possDupFlag(true) - .origSendingTime(origSendingTimeEncoder.buffer(), origSendingTimeLength); + .senderCompID(senderCompID) + .targetCompID(targetCompID) + .msgSeqNum(msgSeqNum) + .sendingTime(sendingTimeEncoder.buffer(), timestampLength); + + if (possDupFlag) + { + final int origSendingTimeLength = origSendingTimeEncoder.encode(timestamp - 1000); + + header + .possDupFlag(true) + .origSendingTime(origSendingTimeEncoder.buffer(), origSendingTimeLength); + } } } @@ -389,68 +429,73 @@ public ExecutionReportDecoder readResentExecutionReport(final int msgSeqNum) public T readMessage(final T decoder) { - try + synchronized (LOCK) { - final int bytesToParse = bytesRemaining == 0 ? socket.read(readBuffer) : bytesRemaining; - if (bytesToParse == 0) - { - return null; - } - ascii = asciiReadBuffer.getAscii(OFFSET, bytesToParse); - DebugLogger.log(FIX_TEST, - "< [" + ascii + "] for attempted: " + decoder.getClass()); - - endOfMessage = ascii.indexOf("8=FIX.4.4", 9); - if (endOfMessage == -1) - { - endOfMessage = bytesToParse; - } - - decoder.decode(asciiReadBuffer, OFFSET, endOfMessage); - if (MsgTypeByDecoderMap.getMessageTypeOf((MessageDecoder)decoder) != decoder.header().messageType()) - { - return null; - } - - if (!decoder.validate()) - { - fail("Failed: " + RejectReason.decode(decoder.rejectReason()) + " for " + decoder.invalidTagId() + - " msg = [" + ascii + "]"); - } - - // check MsgType in case we read an unexpected message, but with a compatible structure try { - final Field messageTypeAsStringField = decoder.getClass().getDeclaredField("MESSAGE_TYPE_AS_STRING"); - final String expectedMsgType = (String)messageTypeAsStringField.get(null); - final SessionHeaderDecoder header = decoder.header(); - final String actualMsgType = new String(header.msgType(), 0, header.msgTypeLength()); - assertEquals("MsgType", expectedMsgType, actualMsgType); + final int bytesToParse = bytesRemaining == 0 ? socket.read(readBuffer) : bytesRemaining; + if (bytesToParse == 0) + { + return null; + } + ascii = asciiReadBuffer.getAscii(OFFSET, bytesToParse); + + DebugLogger.log(FIX_TEST, + "< [" + ascii + "] for attempted: " + decoder.getClass()); + + endOfMessage = ascii.indexOf("8=FIX.4.4", 9); + if (endOfMessage == -1) + { + endOfMessage = bytesToParse; + } + + decoder.decode(asciiReadBuffer, OFFSET, endOfMessage); + if (MsgTypeByDecoderMap.getMessageTypeOf((MessageDecoder)decoder) != decoder.header().messageType()) + { + return null; + } + + if (!decoder.validate()) + { + fail("Failed: " + RejectReason.decode(decoder.rejectReason()) + " for " + decoder.invalidTagId() + + " msg = [" + ascii + "]"); + } + + // check MsgType in case we read an unexpected message, but with a compatible structure + try + { + final Field messageTypeAsStringField = decoder.getClass() + .getDeclaredField("MESSAGE_TYPE_AS_STRING"); + final String expectedMsgType = (String)messageTypeAsStringField.get(null); + final SessionHeaderDecoder header = decoder.header(); + final String actualMsgType = new String(header.msgType(), 0, header.msgTypeLength()); + assertEquals("MsgType", expectedMsgType, actualMsgType); + } + catch (final NoSuchFieldException | IllegalAccessException e) + { + LangUtil.rethrowUnchecked(e); + } + + readBuffer.clear(); + if (endOfMessage != -1) + { + ascii = asciiReadBuffer.getAscii(OFFSET, endOfMessage); + bytesRemaining = bytesToParse - endOfMessage; + asciiReadBuffer.putBytes(0, asciiReadBuffer, endOfMessage, bytesRemaining); + } + else + { + bytesRemaining = 0; + } } - catch (final NoSuchFieldException | IllegalAccessException e) + catch (final IOException ex) { - LangUtil.rethrowUnchecked(e); + LangUtil.rethrowUnchecked(ex); } - readBuffer.clear(); - if (endOfMessage != -1) - { - ascii = asciiReadBuffer.getAscii(OFFSET, endOfMessage); - bytesRemaining = bytesToParse - endOfMessage; - asciiReadBuffer.putBytes(0, asciiReadBuffer, endOfMessage, bytesRemaining); - } - else - { - bytesRemaining = 0; - } + return decoder; } - catch (final IOException ex) - { - LangUtil.rethrowUnchecked(ex); - } - - return decoder; } int pollData() throws IOException @@ -470,30 +515,38 @@ int pollData() throws IOException public int send(final Encoder encoder) { - final long result = encoder.encode(writeAsciiBuffer, OFFSET); - final int offset = Encoder.offset(result); - final int length = Encoder.length(result); - encoder.reset(); + synchronized (LOCK) + { + + final long result = encoder.encode(writeAsciiBuffer, OFFSET); + final int offset = Encoder.offset(result); + final int length = Encoder.length(result); + encoder.reset(); - return send(offset, length); + return send(offset, length); + } } private int send(final int offset, final int length) { - try - { - writeBuffer.position(offset).limit(offset + length); - final int written = socket.write(writeBuffer); - assertEquals(length, written); - DebugLogger.log(FIX_TEST, "> [" + writeAsciiBuffer.getAscii(offset, length) + "]"); - writeBuffer.clear(); - return written; - } - catch (final IOException ex) + synchronized (LOCK) { - LangUtil.rethrowUnchecked(ex); + + try + { + writeBuffer.position(offset).limit(offset + length); + final int written = socket.write(writeBuffer); + assertEquals(length, written); + DebugLogger.log(FIX_TEST, "> [" + writeAsciiBuffer.getAscii(offset, length) + "]"); + writeBuffer.clear(); + return written; + } + catch (final IOException ex) + { + LangUtil.rethrowUnchecked(ex); + } + return -1; } - return -1; } public LogonDecoder readLogon() @@ -571,14 +624,18 @@ public HeartbeatDecoder exchangeTestRequestHeartbeat(final String testReqID) public int sendTestRequest(final String testReqID) { - setupHeader(testRequestEncoder.header(), msgSeqNum, false); - testRequestEncoder.testReqID(testReqID); - final int result = send(testRequestEncoder); - if (result > 0) + synchronized (LOCK) { - msgSeqNum++; + + setupHeader(testRequestEncoder.header(), msgSeqNum, false); + testRequestEncoder.testReqID(testReqID); + final int result = send(testRequestEncoder); + if (result > 0) + { + msgSeqNum++; + } + return result; } - return result; } public HeartbeatDecoder readHeartbeat(final String testReqID) @@ -667,24 +724,32 @@ public void sendExecutionReport(final int msgSeqNum, final boolean possDupFlag) public ResendRequestEncoder sendResendRequest(final int beginSeqNo, final int endSeqNo) { - final ResendRequestEncoder resendRequest = new ResendRequestEncoder(); - - resendRequest.beginSeqNo(beginSeqNo).endSeqNo(endSeqNo); - setupHeader(resendRequest.header(), msgSeqNum, false); - final int result = send(resendRequest); - if (result > 0) - { - msgSeqNum++; - return resendRequest; - } - else + synchronized (LOCK) { - return null; + + final ResendRequestEncoder resendRequest = new ResendRequestEncoder(); + + resendRequest.beginSeqNo(beginSeqNo).endSeqNo(endSeqNo); + setupHeader(resendRequest.header(), msgSeqNum, false); + final int result = send(resendRequest); + if (result > 0) + { + msgSeqNum++; + return resendRequest; + } + else + { + return null; + } } } public int msgSeqNum() { - return msgSeqNum; + synchronized (LOCK) + { + + return msgSeqNum; + } } }