Skip to content

Commit

Permalink
Merge pull request #73 from jeffchao/master
Browse files Browse the repository at this point in the history
Use timer for commit latency.
  • Loading branch information
jeffchao authored Oct 30, 2020
2 parents 86c776b + b397373 commit 0a1d193
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public Observable<Map<String, Object>> call(Observable<DataFile> source) {
long start = scheduler.now();
Map<String, Object> summary = committer.commit(dataFiles);
long now = scheduler.now();
metrics.setGauge(CommitterMetrics.COMMIT_LATENCY_MSEC, now - start);
metrics.record(CommitterMetrics.COMMIT_LATENCY_MSEC, now - start, TimeUnit.MILLISECONDS);
metrics.setGauge(CommitterMetrics.COMMIT_BATCH_SIZE, dataFiles.size());
return summary;
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package io.mantisrx.connector.iceberg.sink.committer.metrics;

import java.util.concurrent.TimeUnit;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.Timer;

public class CommitterMetrics {

Expand All @@ -33,7 +36,7 @@ public class CommitterMetrics {
private final Counter commitFailureCount;

public static final String COMMIT_LATENCY_MSEC = "commitLatencyMsec";
private final Gauge commitLatencyMsec;
private final Timer commitLatencyMsec;

public static final String COMMIT_BATCH_SIZE = "commitBatchSize";
private final Gauge commitBatchSize;
Expand All @@ -44,7 +47,7 @@ public CommitterMetrics() {
.addCounter(INVOCATION_COUNT)
.addCounter(COMMIT_SUCCESS_COUNT)
.addCounter(COMMIT_FAILURE_COUNT)
.addGauge(COMMIT_LATENCY_MSEC)
.addTimer(COMMIT_LATENCY_MSEC)
.addGauge(COMMIT_BATCH_SIZE)
.build();

Expand All @@ -53,15 +56,12 @@ public CommitterMetrics() {
invocationCount = metrics.getCounter(INVOCATION_COUNT);
commitSuccessCount = metrics.getCounter(COMMIT_SUCCESS_COUNT);
commitFailureCount = metrics.getCounter(COMMIT_FAILURE_COUNT);
commitLatencyMsec = metrics.getGauge(COMMIT_LATENCY_MSEC);
commitLatencyMsec = metrics.getTimer(COMMIT_LATENCY_MSEC);
commitBatchSize = metrics.getGauge(COMMIT_BATCH_SIZE);
}

public void setGauge(final String metric, final long value) {
switch (metric) {
case COMMIT_LATENCY_MSEC:
commitLatencyMsec.set(value);
break;
case COMMIT_BATCH_SIZE:
commitBatchSize.set(value);
break;
Expand All @@ -70,6 +70,16 @@ public void setGauge(final String metric, final long value) {
}
}

public void record(final String metric, final long amount, TimeUnit unit) {
switch (metric) {
case COMMIT_LATENCY_MSEC:
commitLatencyMsec.record(amount, unit);
break;
default:
break;
}
}

public void increment(final String metric) {
switch (metric) {
case INVOCATION_COUNT:
Expand Down

0 comments on commit 0a1d193

Please sign in to comment.