Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duty cycle tracker #536

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ private static LogTag lookupLogTag(final String name)
public static final long DEFAULT_MIN_FIXP_KEEPALIVE_TIMEOUT_IN_MS = 1;
public static final long DEFAULT_ACCEPTOR_FIXP_KEEPALIVE_TIMEOUT_IN_MS = SECONDS.toMillis(30);

public static final long DEFAULT_CYCLE_THRESHOLD_NS = SECONDS.toNanos(1);

public static final boolean RUNNING_ON_WINDOWS = System.getProperty("os.name").startsWith("Windows");

private long reasonableTransmissionTimeInMs = DEFAULT_REASONABLE_TRANSMISSION_TIME_IN_MS;
Expand Down
40 changes: 39 additions & 1 deletion artio-core/src/main/java/uk/co/real_logic/artio/FixCounters.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.aeron.Aeron;
import io.aeron.Counter;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.driver.status.DutyCycleStallTracker;
import org.agrona.collections.IntHashSet;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;
Expand Down Expand Up @@ -47,7 +49,13 @@ public enum FixCountersId
CURRENT_REPLAY_COUNT_TYPE_ID(10_008),
NEGATIVE_TIMESTAMP_TYPE_ID(10_009),
FAILED_ADMIN_TYPE_ID(10_010),
FAILED_ADMIN_REPLY_TYPE_ID(10_011);
FAILED_ADMIN_REPLY_TYPE_ID(10_011),
FRAMER_MAX_CYCLE_TIME_TYPE_ID(10_012),
FRAMER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_013),
INDEXER_MAX_CYCLE_TIME_TYPE_ID(10_014),
INDEXER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_015),
LIBRARY_MAX_CYCLE_TIME_TYPE_ID(10_016),
LIBRARY_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_017);

final int id;

Expand Down Expand Up @@ -149,6 +157,36 @@ public AtomicCounter negativeTimestamps()
return negativeTimestamps;
}

public DutyCycleTracker getFramerDutyCycleTracker(final long threshold)
{
return new DutyCycleStallTracker(
marc-adaptive marked this conversation as resolved.
Show resolved Hide resolved
newCounter(FRAMER_MAX_CYCLE_TIME_TYPE_ID.id(), "framer max cycle time in ns"),
newCounter(FRAMER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(),
"framer work cycle time exceeded count: threshold=" + threshold),
threshold
);
}

public DutyCycleTracker getIndexerDutyCycleTracker(final long threshold)
{
return new DutyCycleStallTracker(
newCounter(INDEXER_MAX_CYCLE_TIME_TYPE_ID.id(), "indexer max cycle time in ns"),
newCounter(INDEXER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(),
"indexer work cycle time exceeded count: threshold=" + threshold),
threshold
);
}

public DutyCycleTracker getLibraryDutyCycleTracker(final int libraryId, final long threshold)
{
return new DutyCycleStallTracker(
newCounter(LIBRARY_MAX_CYCLE_TIME_TYPE_ID.id(), "library " + libraryId + " max cycle time in ns"),
newCounter(LIBRARY_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(),
"library " + libraryId + " work cycle time exceeded count: threshold=" + threshold),
threshold
);
}

public AtomicCounter messagesRead(final long connectionId, final String address)
{
return newCounter(FixCountersId.MESSAGES_READ_TYPE_ID.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ public final class EngineConfiguration extends CommonConfiguration implements Au
private long timeIndexReplayFlushIntervalInNs = DEFAULT_TIME_INDEX_FLUSH_INTERVAL_IN_NS;
private CancelOnDisconnectOption cancelOnDisconnectOption = DO_NOT_CANCEL_ON_DISCONNECT_OR_LOGOUT;
private int cancelOnDisconnectTimeoutWindowInMs = DEFAULT_CANCEL_ON_DISCONNECT_TIMEOUT_WINDOW_IN_MS;
private long framerCycleThresholdNs = DEFAULT_CYCLE_THRESHOLD_NS;
private long indexerCycleThresholdNs = DEFAULT_CYCLE_THRESHOLD_NS;

private EngineReproductionConfiguration reproductionConfiguration;
private ReproductionMessageHandler reproductionMessageHandler = (connectionId, bytes) ->
Expand Down Expand Up @@ -1294,6 +1296,32 @@ public EngineConfiguration cancelOnDisconnectTimeoutWindowInMs(final int cancelO
return this;
}

/**
* Set a threshold for the framer work cycle time which when exceeded it will increment the
* framer cycle time exceeded count.
*
* @param framerCycleThresholdNs value in nanoseconds
* @return this for fluent API.
*/
public EngineConfiguration framerCycleThresholdNs(final long framerCycleThresholdNs)
{
this.framerCycleThresholdNs = framerCycleThresholdNs;
return this;
}

/**
* Set a threshold for the indexer work cycle time which when exceeded it will increment the
* indexer cycle time exceeded count.
*
* @param indexerCycleThresholdNs value in nanoseconds
* @return this for fluent API.
*/
public EngineConfiguration indexerCycleThresholdNs(final long indexerCycleThresholdNs)
{
this.indexerCycleThresholdNs = indexerCycleThresholdNs;
return this;
}

// ---------------------
// END SETTERS
// ---------------------
Expand Down Expand Up @@ -2025,6 +2053,16 @@ public int cancelOnDisconnectTimeoutWindowInMs()
return cancelOnDisconnectTimeoutWindowInMs;
}

public long framerCycleThresholdNs()
{
return framerCycleThresholdNs;
}

public long indexerCycleThresholdNs()
{
return indexerCycleThresholdNs;
}

public boolean indexChecksumEnabled()
{
return indexChecksumEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,16 @@ private void newArchivingAgent()
clock);
}

final Agent dutyCycleTrackingAgent = new IndexerDutyCycleTracker(
configuration.agentNamePrefix(),
clock,
fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs()));

final List<Agent> agents = new ArrayList<>();
agents.add(inboundIndexer);
agents.add(outboundIndexer);
agents.add(replayer);
agents.add(dutyCycleTrackingAgent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While your solution is cleaner, I'd like to avoid changing thread names, can we squeeze that tracking into one of the existing agents?


indexingAgent = new CompositeAgent(agents);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.engine.framer;

import io.aeron.*;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.ControlledFragmentHandler.Action;
import io.aeron.logbuffer.Header;
Expand Down Expand Up @@ -138,6 +139,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
private final EpochNanoClock clock;
private final Timer outboundTimer;
private final Timer sendTimer;
private final DutyCycleTracker dutyCycleTracker;

private final ControlledFragmentHandler librarySubscriber;
private final ControlledFragmentHandler replaySubscriber;
Expand Down Expand Up @@ -278,6 +280,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
this.acceptsFixP = configuration.acceptsFixP();
this.fixPContexts = fixPContexts;
this.fixCounters = fixCounters;
this.dutyCycleTracker = fixCounters.getFramerDutyCycleTracker(configuration.framerCycleThresholdNs());

replyTimeoutInNs = TimeUnit.MILLISECONDS.toNanos(configuration.replyTimeoutInMs());
timerEventHandler = new TimerEventHandler(errorHandler);
Expand Down Expand Up @@ -376,11 +379,17 @@ public Action onFixPMessage(final long connectionId, final DirectBuffer buffer,
MILLISECONDS, epochClock.time(), 128, 512);
}

public void onStart()
{
dutyCycleTracker.update(clock.nanoTime());
}

public int doWork() throws Exception
{
final long timeInNs = clock.nanoTime();
final long timeInMs = epochClock.time();

dutyCycleTracker.measureAndUpdate(timeInNs);
fixSenderEndPoints.timeInMs(timeInMs);

checkOutboundTimestampSender(timeInNs);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package uk.co.real_logic.artio.engine.logger;

import io.aeron.driver.DutyCycleTracker;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochNanoClock;

public class IndexerDutyCycleTracker implements Agent
{
final String agentNamePrefix;
final EpochNanoClock clock;
final DutyCycleTracker dutyCycleTracker;

public IndexerDutyCycleTracker(
final String agentNamePrefix,
final EpochNanoClock clock,
final DutyCycleTracker dutyCycleTracker)
{
this.agentNamePrefix = agentNamePrefix;
this.clock = clock;
this.dutyCycleTracker = dutyCycleTracker;
}

@Override
public void onStart()
{
dutyCycleTracker.update(clock.nanoTime());
}

@Override
public int doWork() throws Exception
{
dutyCycleTracker.measureAndUpdate(clock.nanoTime());
return 0;
}

@Override
public String roleName()
{
return agentNamePrefix + "IndexerDutyCycleTracker";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private FixLibrary connect()
{
poller.startConnecting();
scheduler.launch(configuration, errorHandler, monitoringCompositeAgent, conductorAgent());
poller.onStart();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void onDisconnect(final FixLibrary library)
private FixPConnectionExistsHandler fixPConnectionExistsHandler;
private FixPConnectionAcquiredHandler fixPConnectionAcquiredHandler;
private LibraryReproductionConfiguration reproductionConfiguration;
private long libraryCycleThresholdNs = DEFAULT_CYCLE_THRESHOLD_NS;

/**
* When a new FIX session connects to the gateway you register a callback handler to find
Expand Down Expand Up @@ -287,6 +288,19 @@ public LibraryConfiguration reproduceInbound(
return this;
}

/**
* Set a threshold for the library work cycle time which when exceeded it will increment the
* library cycle time exceeded count.
*
* @param libraryCycleThresholdNs value in nanoseconds
* @return this for fluent API.
*/
public LibraryConfiguration libraryCycleThresholdNs(final long libraryCycleThresholdNs)
{
this.libraryCycleThresholdNs = libraryCycleThresholdNs;
return this;
}

// ------------------------
// BEGIN INHERITED SETTERS
// ------------------------
Expand Down Expand Up @@ -430,6 +444,11 @@ boolean isReproductionEnabled()
return reproductionConfiguration != null;
}

public long libraryCycleThresholdNs()
{
return libraryCycleThresholdNs;
}

public LibraryConfiguration libraryName(final String libraryName)
{
this.libraryName = libraryName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.aeron.CommonContext;
import io.aeron.ControlledFragmentAssembler;
import io.aeron.Subscription;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.exceptions.RegistrationException;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.ControlledFragmentHandler.Action;
Expand Down Expand Up @@ -143,6 +144,7 @@ final class LibraryPoller implements LibraryEndPointHandler, ProtocolHandler, Au
private final boolean enginesAreClustered;
private final ErrorHandler errorHandler;
private final FixCounters fixCounters;
private final DutyCycleTracker dutyCycleTracker;

private final boolean isReproductionEnabled;
private final ReproductionClock reproductionClock;
Expand Down Expand Up @@ -258,6 +260,9 @@ public Reply<ThrottleConfigurationStatus> messageThrottle(
epochClock, configuration.epochNanoClock(), configuration.sessionEpochFractionFormat());
this.isReproductionEnabled = configuration.isReproductionEnabled();
this.reproductionClock = isReproductionEnabled ? configuration.reproductionConfiguration().clock() : null;

this.dutyCycleTracker = fixCounters.getLibraryDutyCycleTracker(
configuration.libraryId(), configuration.libraryCycleThresholdNs());
}

boolean isConnected()
Expand Down Expand Up @@ -589,39 +594,41 @@ long saveFollowerSessionRequest(

int poll(final int fragmentLimit)
{
final long timeInNs = epochNanoClock.nanoTime();
final long timeInMs = timeInMs();

dutyCycleTracker.measureAndUpdate(timeInNs);

switch (state)
{
case CONNECTED:
return pollWithoutReconnect(timeInMs, fragmentLimit);
return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit);

case ATTEMPT_CONNECT:
startConnecting();
return pollWithoutReconnect(timeInMs, fragmentLimit);
startConnecting(timeInMs);
return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit);

case CONNECTING:
nextConnectingStep(timeInMs);
return pollWithoutReconnect(timeInMs, fragmentLimit);
return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit);

case ATTEMPT_CURRENT_NODE:
connectToNewEngine(timeInMs);
state = CONNECTING;
return pollWithoutReconnect(timeInMs, fragmentLimit);
return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit);

case ENGINE_DISCONNECT:
attemptEngineCloseBasedLogout();
return pollWithoutReconnect(timeInMs, fragmentLimit);
return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit);

case CLOSED:
default:
return 0;
}
}

private int pollWithoutReconnect(final long timeInMs, final int fragmentLimit)
private int pollWithoutReconnect(final long timeInNs, final long timeInMs, final int fragmentLimit)
{
final long timeInNs = epochNanoClock.nanoTime();
int operations = 0;
operations += inboundSubscription.controlledPoll(outboundSubscription, fragmentLimit);
operations += livenessDetector.poll(timeInMs);
Expand All @@ -635,6 +642,11 @@ private int pollWithoutReconnect(final long timeInMs, final int fragmentLimit)
// BEGIN CONNECTION LOGIC
// -----------------------------------------------------------------------

void onStart()
{
dutyCycleTracker.update(epochNanoClock.nanoTime());
}

void startConnecting()
{
startConnecting(timeInMs());
Expand Down
Loading
Loading