Skip to content

Commit

Permalink
Add timeout to ResetSequenceNumberCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-adaptive committed Jan 15, 2025
1 parent 40a075e commit 40f6132
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,8 @@ public void onAdminResetSequenceNumbersRequest(final long correlationId, final l
sentSequenceNumberIndex,
inboundPublication,
outboundPublication,
clock.nanoTime());
clock,
replyTimeoutInNs);

resetSequenceNumberCommand.setupAdminReset(correlationId, adminReplyPublication);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ public Reply<?> resetSequenceNumber(final long sessionId)
sentSequenceNumberIndex,
inboundPublication,
outboundPublication,
configuration.epochNanoClock().nanoTime());
configuration.epochNanoClock(),
configuration.replyTimeoutInMs());

if (adminCommands.offer(reply))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
*/
package uk.co.real_logic.artio.engine.framer;

import org.agrona.concurrent.EpochNanoClock;
import uk.co.real_logic.artio.Pressure;
import uk.co.real_logic.artio.Reply;
import uk.co.real_logic.artio.engine.logger.SequenceNumberIndexReader;
import uk.co.real_logic.artio.messages.GatewayError;
import uk.co.real_logic.artio.protocol.GatewayPublication;
import uk.co.real_logic.artio.session.Session;

import java.util.concurrent.TimeUnit;
import java.util.function.LongToIntFunction;

import static uk.co.real_logic.artio.Reply.State.COMPLETED;
import static uk.co.real_logic.artio.Reply.State.ERRORED;
import static uk.co.real_logic.artio.Reply.State.TIMED_OUT;

class ResetSequenceNumberCommand implements Reply<Void>, AdminCommand
{
Expand All @@ -42,7 +45,9 @@ class ResetSequenceNumberCommand implements Reply<Void>, AdminCommand
private final SequenceNumberIndexReader sentSequenceNumberIndex;
private final GatewayPublication inboundPublication;
private final GatewayPublication outboundPublication;
private final EpochNanoClock clock;
private final long resetTimeInNs;
private final long timeoutInNs;
private Session session;
private LongToIntFunction libraryLookup;
private long awaitSequenceNumber = 1;
Expand Down Expand Up @@ -104,7 +109,8 @@ private enum Step
final SequenceNumberIndexReader sentSequenceNumberIndex,
final GatewayPublication inboundPublication,
final GatewayPublication outboundPublication,
final long resetTimeInNs)
final EpochNanoClock clock,
final long timeoutInMs)
{
this.sessionId = sessionId;
this.gatewaySessions = gatewaySessions;
Expand All @@ -113,7 +119,9 @@ private enum Step
this.sentSequenceNumberIndex = sentSequenceNumberIndex;
this.inboundPublication = inboundPublication;
this.outboundPublication = outboundPublication;
this.resetTimeInNs = resetTimeInNs;
this.clock = clock;
this.resetTimeInNs = clock.nanoTime();
this.timeoutInNs = TimeUnit.MILLISECONDS.toNanos(timeoutInMs);
}

public Exception error()
Expand Down Expand Up @@ -145,6 +153,12 @@ public void execute(final Framer framer)
// Only to be called on the Framer thread.
boolean poll()
{

if (clock.nanoTime() - resetTimeInNs >= timeoutInNs)
{
return onTimeout();
}

switch (step)
{
case START:
Expand Down Expand Up @@ -281,6 +295,27 @@ private boolean sessionIsUnknown()
return !sessionContexts.isKnownSessionId(sessionId);
}

private boolean onTimeout()
{
if (isAdminReset)
{
if (adminReplyPublication.saveGenericAdminReply(adminCorrelationId, GatewayError.EXCEPTION,
sessionId + " sequence numbers not reset in " + timeoutInNs + "ns") > 0)
{
state = TIMED_OUT;
return true;
}

return false;
}

else
{
state = TIMED_OUT;
return true;
}
}

public String toString()
{
return "ResetSequenceNumberReply{" +
Expand Down

0 comments on commit 40f6132

Please sign in to comment.