Skip to content

Commit

Permalink
Close recording stream on shutdown (#9705)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjquinno authored Jan 28, 2025
1 parent 7d1d452 commit 1897dec
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.helidon.metrics.systemmeters;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -23,6 +24,7 @@
import java.util.Optional;
import java.util.function.Consumer;

import io.helidon.Main;
import io.helidon.common.LazyValue;
import io.helidon.metrics.api.Gauge;
import io.helidon.metrics.api.Meter;
Expand All @@ -32,6 +34,7 @@
import io.helidon.metrics.api.SystemTagsManager;
import io.helidon.metrics.api.Timer;
import io.helidon.metrics.spi.MetersProvider;
import io.helidon.spi.HelidonShutdownHandler;

import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordingStream;
Expand All @@ -49,7 +52,7 @@
* JFR delivers events in batches. For performance the values we track are stored as longs without
* concern for concurrent updates which should not happen anyway.
*/
public class VThreadSystemMetersProvider implements MetersProvider {
public class VThreadSystemMetersProvider implements MetersProvider, HelidonShutdownHandler {

// Parts of the meter names.
static final String METER_NAME_PREFIX = "vthreads.";
Expand All @@ -68,6 +71,8 @@ public class VThreadSystemMetersProvider implements MetersProvider {
private long virtualThreads;
private long virtualThreadStarts;
private long pinnedVirtualThreadsThresholdMillis;
private RecordingStream recordingStream;
private MetricsConfig metricsConfig;

/**
* For service loading.
Expand All @@ -78,17 +83,15 @@ public VThreadSystemMetersProvider() {
@Override
public Collection<Meter.Builder<?, ?>> meterBuilders(MetricsFactory metricsFactory) {

MetricsConfig metricsConfig = metricsFactory.metricsConfig();
metricsConfig = metricsFactory.metricsConfig();
if (!metricsConfig.virtualThreadsEnabled()) {
return List.of();
}

var recordingStream = new RecordingStream();
Main.addShutdownHandler(this);
pinnedVirtualThreadsThresholdMillis = metricsConfig.virtualThreadsPinnedThreshold().toMillis();
recordingStream.setSettings(Map.of("jdk.VirtualThreadPinned#threshold",
pinnedVirtualThreadsThresholdMillis + " ms"));

List<Meter.Builder<?, ?>> meterBuilders = new ArrayList<>(List.of(
var meterBuilders = new ArrayList<>(List.of(
Gauge.builder(METER_NAME_PREFIX + SUBMIT_FAILURES, () -> virtualThreadSubmitFails)
.description("Virtual thread submit failures")
.scope(METER_SCOPE),
Expand All @@ -97,30 +100,56 @@ public VThreadSystemMetersProvider() {
.scope(METER_SCOPE),
Timer.builder(METER_NAME_PREFIX + RECENT_PINNED)
.description("Pinned virtual thread durations")
.scope(METER_SCOPE)));

listenFor(recordingStream, Map.of("jdk.VirtualThreadSubmitFailed", this::recordSubmitFail,
"jdk.VirtualThreadPinned", this::recordThreadPin));

meterBuilders.add(Gauge.builder(METER_NAME_PREFIX + COUNT, () -> virtualThreads)
.description("Active virtual threads")
.scope(METER_SCOPE));
meterBuilders.add(Gauge.builder(METER_NAME_PREFIX + STARTS, () -> virtualThreadStarts)
.description("Number of virtual thread starts")
.scope(METER_SCOPE));

listenFor(recordingStream, Map.of("jdk.VirtualThreadStart", this::recordThreadStart,
"jdk.VirtualThreadEnd", this::recordThreadEnd));
.scope(METER_SCOPE),
Gauge.builder(METER_NAME_PREFIX + COUNT, () -> virtualThreads)
.description("Active virtual threads")
.scope(METER_SCOPE),
Gauge.builder(METER_NAME_PREFIX + STARTS, () -> virtualThreadStarts)
.description("Number of virtual thread starts")
.scope(METER_SCOPE)
));

recordingStream.startAsync();
startRecordingStream();
return meterBuilders;
}

@Override
public void shutdown() {
if (recordingStream != null) {
stopRecordingStream();
}
}

// For testing
long pinnedVirtualThreadsThresholdMillis() {
return pinnedVirtualThreadsThresholdMillis;
}

private void startRecordingStream() {

this.recordingStream = new RecordingStream();
recordingStream.setSettings(Map.of("jdk.VirtualThreadPinned#threshold",
pinnedVirtualThreadsThresholdMillis + " ms"));

listenFor(recordingStream, Map.of("jdk.VirtualThreadSubmitFailed", this::recordSubmitFail,
"jdk.VirtualThreadPinned", this::recordThreadPin,
"jdk.VirtualThreadStart", this::recordThreadStart,
"jdk.VirtualThreadEnd", this::recordThreadEnd));

recordingStream.startAsync();
}

private void stopRecordingStream() {
try {
LOGGER.log(System.Logger.Level.INFO, "Stopping recording stream");
recordingStream.close();
recordingStream.awaitTermination(Duration.ofSeconds(10));
recordingStream = null;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private static void listenFor(RecordingStream rs, Map<String, Consumer<RecordedEvent>> events) {
// Enable events of interest explicitly (as well as registering the callback) to be sure we receive the events we need.

Expand Down
2 changes: 1 addition & 1 deletion metrics/system-meters/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
path = {"Metrics", "System Meters"}
)
module io.helidon.metrics.systemmeters {
requires io.helidon;
requires io.helidon.common.features.api;
requires io.helidon.common;
requires io.helidon.metrics.api;
requires java.management;
requires jdk.jfr;
Expand Down

0 comments on commit 1897dec

Please sign in to comment.