Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only retrieve one batch of messages at a time when responding to a large resend request #643

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 66 additions & 50 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ public class Session implements Closeable {

protected static final Logger LOG = LoggerFactory.getLogger(Session.class);

private final int MAX_RESEND_BATCH_RETRIEVAL_SIZE = 1000;

Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID,
DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule, LogFactory logFactory,
MessageFactory messageFactory, int heartbeatInterval) {
Expand Down Expand Up @@ -2321,71 +2323,85 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre
private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo)
throws IOException, InvalidMessage, FieldNotFound {

final ArrayList<String> messages = new ArrayList<>();
try {
state.get(beginSeqNo, endSeqNo, messages);
} catch (final IOException e) {
if (forceResendWhenCorruptedStore) {
LOG.error("Cannot read messages from stores, resend HeartBeats", e);
for (int i = beginSeqNo; i < endSeqNo; i++) {
final Message heartbeat = messageFactory.create(sessionID.getBeginString(),
MsgType.HEARTBEAT);
initializeHeader(heartbeat.getHeader());
heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i);
messages.add(heartbeat.toString());
}
} else {
throw e;
}
}

int msgSeqNum = 0;
int begin = 0;
int current = beginSeqNo;
boolean appMessageJustSent = false;

for (final String message : messages) {
appMessageJustSent = false;
final Message msg;
int curBatchStartSeqNo = beginSeqNo;
while (curBatchStartSeqNo <= endSeqNo) {
int endCurBatchSeqNo = endSeqNo;
if (curBatchStartSeqNo + MAX_RESEND_BATCH_RETRIEVAL_SIZE < endSeqNo) {
endCurBatchSeqNo = curBatchStartSeqNo + MAX_RESEND_BATCH_RETRIEVAL_SIZE;
}
final ArrayList<String> messages = new ArrayList<>();
try {
// QFJ-626
msg = parseMessage(message);
msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);
} catch (final Exception e) {
getLog().onErrorEvent(
"Error handling ResendRequest: failed to parse message (" + e.getMessage()
+ "): " + message);
// Note: a SequenceReset message will be generated to fill the gap
continue;
state.get(curBatchStartSeqNo, endCurBatchSeqNo, messages);
} catch (final IOException e) {
if (forceResendWhenCorruptedStore) {
LOG.error("Cannot read messages from stores, resend HeartBeats", e);
for (int i = beginSeqNo; i < endSeqNo; i++) {
final Message heartbeat = messageFactory.create(sessionID.getBeginString(),
MsgType.HEARTBEAT);
initializeHeader(heartbeat.getHeader());
heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i);
messages.add(heartbeat.toString());
}
} else {
throw e;
}
}
for (final String message : messages) {
appMessageJustSent = false;
final Message msg;
try {
// QFJ-626
msg = parseMessage(message);
if (msg.getException() != null) {
getLog().onErrorEvent(
"Error handling ResendRequest: failed to parse message (" + msg.getException().getMessage()
+ "): " + message);
// Note: a SequenceReset message will be generated to fill the gap
continue;
}
msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);
} catch (final Exception e) {
getLog().onErrorEvent(
"Error handling ResendRequest: failed to parse message (" + e.getMessage()
+ "): " + message);
// Note: a SequenceReset message will be generated to fill the gap
continue;
}

if ((current != msgSeqNum) && begin == 0) {
begin = current;
}
if ((current != msgSeqNum) && begin == 0) {
begin = current;
}

final String msgType = msg.getHeader().getString(MsgType.FIELD);
final String msgType = msg.getHeader().getString(MsgType.FIELD);

if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {
if (begin == 0) {
begin = msgSeqNum;
}
} else {
initializeResendFields(msg);
if (resendApproved(msg)) {
if (begin != 0) {
generateSequenceReset(receivedMessage, begin, msgSeqNum);
}
getLog().onEvent("Resending message: " + msgSeqNum);
send(msg.toString());
begin = 0;
appMessageJustSent = true;
} else {
if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {
if (begin == 0) {
begin = msgSeqNum;
}
} else {
initializeResendFields(msg);
if (resendApproved(msg)) {
if (begin != 0) {
generateSequenceReset(receivedMessage, begin, msgSeqNum);
}
getLog().onEvent("Resending message: " + msgSeqNum);
send(msg.toString());
begin = 0;
appMessageJustSent = true;
} else {
if (begin == 0) {
begin = msgSeqNum;
}
}
}
current = msgSeqNum + 1;
}
current = msgSeqNum + 1;
curBatchStartSeqNo = endCurBatchSeqNo+1;
}

int newBegin = beginSeqNo;
Expand Down
Loading