From c7b25743411fc591e0fb58cf740dda8b86e3e1b8 Mon Sep 17 00:00:00 2001 From: Philip Whitehouse Date: Wed, 31 May 2023 17:03:07 +0100 Subject: [PATCH] Only retrieve one batch of messages at a time when responding to a large resend request --- .../src/main/java/quickfix/Session.java | 116 ++++++++++-------- 1 file changed, 66 insertions(+), 50 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 7e52be07bf..9c70894c42 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -470,6 +470,8 @@ public class Session implements Closeable { protected static final Logger LOG = LoggerFactory.getLogger(Session.class); + private final int MAX_RESEND_BATCH_RETRIEVAL_SIZE = 1000; + Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID, DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule, LogFactory logFactory, MessageFactory messageFactory, int heartbeatInterval) { @@ -2320,71 +2322,85 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo) throws IOException, InvalidMessage, FieldNotFound { - final ArrayList messages = new ArrayList<>(); - try { - state.get(beginSeqNo, endSeqNo, messages); - } catch (final IOException e) { - if (forceResendWhenCorruptedStore) { - LOG.error("Cannot read messages from stores, resend HeartBeats", e); - for (int i = beginSeqNo; i < endSeqNo; i++) { - final Message heartbeat = messageFactory.create(sessionID.getBeginString(), - MsgType.HEARTBEAT); - initializeHeader(heartbeat.getHeader()); - heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i); - messages.add(heartbeat.toString()); - } - } else { - throw e; - } - } - int msgSeqNum = 0; int begin = 0; int current = beginSeqNo; boolean appMessageJustSent = false; - for (final String message : messages) { - appMessageJustSent = false; - final Message msg; + int curBatchStartSeqNo = beginSeqNo; + while (curBatchStartSeqNo <= endSeqNo) { + int endCurBatchSeqNo = endSeqNo; + if (curBatchStartSeqNo + MAX_RESEND_BATCH_RETRIEVAL_SIZE < endSeqNo) { + endCurBatchSeqNo = curBatchStartSeqNo + MAX_RESEND_BATCH_RETRIEVAL_SIZE; + } + final ArrayList messages = new ArrayList<>(); try { - // QFJ-626 - msg = parseMessage(message); - msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD); - } catch (final Exception e) { - getLog().onErrorEvent( - "Error handling ResendRequest: failed to parse message (" + e.getMessage() - + "): " + message); - // Note: a SequenceReset message will be generated to fill the gap - continue; + state.get(curBatchStartSeqNo, endCurBatchSeqNo, messages); + } catch (final IOException e) { + if (forceResendWhenCorruptedStore) { + LOG.error("Cannot read messages from stores, resend HeartBeats", e); + for (int i = beginSeqNo; i < endSeqNo; i++) { + final Message heartbeat = messageFactory.create(sessionID.getBeginString(), + MsgType.HEARTBEAT); + initializeHeader(heartbeat.getHeader()); + heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i); + messages.add(heartbeat.toString()); + } + } else { + throw e; + } } + for (final String message : messages) { + appMessageJustSent = false; + final Message msg; + try { + // QFJ-626 + msg = parseMessage(message); + if (msg.getException() != null) { + getLog().onErrorEvent( + "Error handling ResendRequest: failed to parse message (" + msg.getException().getMessage() + + "): " + message); + // Note: a SequenceReset message will be generated to fill the gap + continue; + } + msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD); + } catch (final Exception e) { + getLog().onErrorEvent( + "Error handling ResendRequest: failed to parse message (" + e.getMessage() + + "): " + message); + // Note: a SequenceReset message will be generated to fill the gap + continue; + } - if ((current != msgSeqNum) && begin == 0) { - begin = current; - } + if ((current != msgSeqNum) && begin == 0) { + begin = current; + } - final String msgType = msg.getHeader().getString(MsgType.FIELD); + final String msgType = msg.getHeader().getString(MsgType.FIELD); - if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) { - if (begin == 0) { - begin = msgSeqNum; - } - } else { - initializeResendFields(msg); - if (resendApproved(msg)) { - if (begin != 0) { - generateSequenceReset(receivedMessage, begin, msgSeqNum); - } - getLog().onEvent("Resending message: " + msgSeqNum); - send(msg.toString()); - begin = 0; - appMessageJustSent = true; - } else { + if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) { if (begin == 0) { begin = msgSeqNum; } + } else { + initializeResendFields(msg); + if (resendApproved(msg)) { + if (begin != 0) { + generateSequenceReset(receivedMessage, begin, msgSeqNum); + } + getLog().onEvent("Resending message: " + msgSeqNum); + send(msg.toString()); + begin = 0; + appMessageJustSent = true; + } else { + if (begin == 0) { + begin = msgSeqNum; + } + } } + current = msgSeqNum + 1; } - current = msgSeqNum + 1; + curBatchStartSeqNo = endCurBatchSeqNo+1; } int newBegin = beginSeqNo;