diff --git a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/committer/IcebergCommitterStage.java b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/committer/IcebergCommitterStage.java index bf0566f49..6576a9378 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/committer/IcebergCommitterStage.java +++ b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/committer/IcebergCommitterStage.java @@ -166,7 +166,7 @@ public Observable> call(Observable source) { long start = scheduler.now(); Map summary = committer.commit(dataFiles); long now = scheduler.now(); - metrics.setGauge(CommitterMetrics.COMMIT_LATENCY_MSEC, now - start); + metrics.record(CommitterMetrics.COMMIT_LATENCY_MSEC, now - start, TimeUnit.MILLISECONDS); metrics.setGauge(CommitterMetrics.COMMIT_BATCH_SIZE, dataFiles.size()); return summary; } catch (RuntimeException e) { diff --git a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/committer/metrics/CommitterMetrics.java b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/committer/metrics/CommitterMetrics.java index 26e61e83a..7aab935da 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/committer/metrics/CommitterMetrics.java +++ b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/committer/metrics/CommitterMetrics.java @@ -16,10 +16,13 @@ package io.mantisrx.connector.iceberg.sink.committer.metrics; +import java.util.concurrent.TimeUnit; + import io.mantisrx.common.metrics.Counter; import io.mantisrx.common.metrics.Gauge; import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; +import io.mantisrx.common.metrics.Timer; public class CommitterMetrics { @@ -33,7 +36,7 @@ public class CommitterMetrics { private final Counter commitFailureCount; public static final String COMMIT_LATENCY_MSEC = "commitLatencyMsec"; - private final Gauge commitLatencyMsec; + private final Timer commitLatencyMsec; public static final String COMMIT_BATCH_SIZE = "commitBatchSize"; private final Gauge commitBatchSize; @@ -44,7 +47,7 @@ public CommitterMetrics() { .addCounter(INVOCATION_COUNT) .addCounter(COMMIT_SUCCESS_COUNT) .addCounter(COMMIT_FAILURE_COUNT) - .addGauge(COMMIT_LATENCY_MSEC) + .addTimer(COMMIT_LATENCY_MSEC) .addGauge(COMMIT_BATCH_SIZE) .build(); @@ -53,15 +56,12 @@ public CommitterMetrics() { invocationCount = metrics.getCounter(INVOCATION_COUNT); commitSuccessCount = metrics.getCounter(COMMIT_SUCCESS_COUNT); commitFailureCount = metrics.getCounter(COMMIT_FAILURE_COUNT); - commitLatencyMsec = metrics.getGauge(COMMIT_LATENCY_MSEC); + commitLatencyMsec = metrics.getTimer(COMMIT_LATENCY_MSEC); commitBatchSize = metrics.getGauge(COMMIT_BATCH_SIZE); } public void setGauge(final String metric, final long value) { switch (metric) { - case COMMIT_LATENCY_MSEC: - commitLatencyMsec.set(value); - break; case COMMIT_BATCH_SIZE: commitBatchSize.set(value); break; @@ -70,6 +70,16 @@ public void setGauge(final String metric, final long value) { } } + public void record(final String metric, final long amount, TimeUnit unit) { + switch (metric) { + case COMMIT_LATENCY_MSEC: + commitLatencyMsec.record(amount, unit); + break; + default: + break; + } + } + public void increment(final String metric) { switch (metric) { case INVOCATION_COUNT: