Skip to content

Commit

Permalink
fix FixReplayerSession state scope
Browse files Browse the repository at this point in the history
Static state won't work if there are multiple Artio instances in a
single JVM.
  • Loading branch information
wojciech-adaptive committed May 31, 2024
1 parent 231fd88 commit 761c059
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,7 @@ private enum State
CLOSING
}

// Safe to share between multiple instances due to single threaded nature of the replayer
private static final FixMessageEncoder FIX_MESSAGE_ENCODER = new FixMessageEncoder();
private static final FixMessageDecoder FIX_MESSAGE = new FixMessageDecoder();
private static final ThrottleRejectDecoder THROTTLE_REJECT = new ThrottleRejectDecoder();
private static final AsciiBuffer ASCII_BUFFER = new MutableAsciiBuffer();

private final GapFillEncoder gapFillEncoder;

private final PossDupEnabler possDupEnabler;
private final EpochNanoClock clock;
private final String message;
Expand Down Expand Up @@ -147,7 +140,7 @@ MessageTracker messageTracker()
private void onPreCommit(final MutableDirectBuffer buffer, final int offset)
{
final int frameOffset = offset + MessageHeaderEncoder.ENCODED_LENGTH;
FIX_MESSAGE_ENCODER
replayer.fixMessageEncoder
.wrap(buffer, frameOffset)
.connection(connectionId)
.sequenceNumber(headerSeqNum);
Expand Down Expand Up @@ -199,31 +192,33 @@ private Action onFixMessage(
final int offset,
final int version)
{
FIX_MESSAGE.wrap(
final FixMessageDecoder fixMessageDecoder = replayer.fixMessageDecoder;
fixMessageDecoder.wrap(
srcBuffer,
offset,
actingBlockLength,
version);

if (FIX_MESSAGE.status() == MessageStatus.OK)
if (fixMessageDecoder.status() == MessageStatus.OK)
{
final int metaDataAdjustment = version >= metaDataSinceVersion() ?
metaDataHeaderLength() + FIX_MESSAGE.metaDataLength() : 0;
metaDataHeaderLength() + fixMessageDecoder.metaDataLength() : 0;
final int messageFrameBlockLength = MESSAGE_FRAME_BLOCK_LENGTH + metaDataAdjustment;
final int messageOffset = srcOffset + messageFrameBlockLength;
final int messageLength = srcLength - messageFrameBlockLength;

final int msgSeqNum = sequenceNumberExtractor.extract(srcBuffer, messageOffset, messageLength);
final long messageType = MessageTypeExtractor.getMessageType(FIX_MESSAGE);
final long messageType = MessageTypeExtractor.getMessageType(fixMessageDecoder);

ASCII_BUFFER.wrap(srcBuffer);
final AsciiBuffer asciiBuffer = replayer.sessionAsciiBuffer;
asciiBuffer.wrap(srcBuffer);
replayHandler.onReplayedMessage(
ASCII_BUFFER,
asciiBuffer,
messageOffset,
messageLength,
FIX_MESSAGE.libraryId(),
FIX_MESSAGE.session(),
FIX_MESSAGE.sequenceIndex(),
fixMessageDecoder.libraryId(),
fixMessageDecoder.session(),
fixMessageDecoder.sequenceIndex(),
messageType);

if (gapFillMessageTypes.contains(messageType))
Expand Down Expand Up @@ -271,12 +266,13 @@ else if (msgSeqNum > lastSeqNo + 1)
private Action onThrottleReject(
final DirectBuffer srcBuffer, final int actingBlockLength, final int offset, final int version)
{
THROTTLE_REJECT.wrap(
final ThrottleRejectDecoder throttleRejectDecoder = replayer.throttleRejectDecoder;
throttleRejectDecoder.wrap(
srcBuffer,
offset,
actingBlockLength,
version);
final int msgSeqNum = THROTTLE_REJECT.sequenceNumber();
final int msgSeqNum = throttleRejectDecoder.sequenceNumber();

if (gapFillMessageTypes.contains(BUSINESS_MESSAGE_REJECT_MESSAGE_TYPE))
{
Expand All @@ -299,15 +295,15 @@ else if (msgSeqNum > lastSeqNo + 1)
sendGapFill(lastSeqNo, msgSeqNum, false);
}

final int businessRejectRefIDOffset = THROTTLE_REJECT.limit() +
final int businessRejectRefIDOffset = throttleRejectDecoder.limit() +
ThrottleNotificationDecoder.businessRejectRefIDHeaderLength();
throttleRejectBuilder.build(
THROTTLE_REJECT.refMsgType(),
THROTTLE_REJECT.refSeqNum(),
THROTTLE_REJECT.sequenceNumber(),
throttleRejectDecoder.refMsgType(),
throttleRejectDecoder.refSeqNum(),
throttleRejectDecoder.sequenceNumber(),
srcBuffer,
businessRejectRefIDOffset,
THROTTLE_REJECT.businessRejectRefIDLength(),
throttleRejectDecoder.businessRejectRefIDLength(),
true);

final Action action = sendFixMessage(
Expand Down Expand Up @@ -351,7 +347,7 @@ private Action sendFixMessage(
final int destOffset = bufferClaim.offset();
final MutableDirectBuffer destBuffer = bufferClaim.buffer();

FIX_MESSAGE_ENCODER
replayer.fixMessageEncoder
.wrapAndApplyHeader(destBuffer, destOffset, replayer.messageHeaderEncoder)
.session(this.sessionId)
.connection(this.connectionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public class Replayer extends AbstractReplayer
final CharFormatter completeNotRecentFormatter = new CharFormatter(
"ReplayerSession: completeReplay-!upToMostRecent replayedMessages=%s " +
"endSeqNo=%s beginSeqNo=%s expectedCount=%s connId=%s");
final FixMessageEncoder fixMessageEncoder = new FixMessageEncoder();
final FixMessageDecoder fixMessageDecoder = new FixMessageDecoder();
final ThrottleRejectDecoder throttleRejectDecoder = new ThrottleRejectDecoder();
final AsciiBuffer sessionAsciiBuffer = new MutableAsciiBuffer();

// Binary FIXP specific state
private final IntHashSet gapfillOnRetransmitILinkTemplateIds;
Expand Down

0 comments on commit 761c059

Please sign in to comment.