Skip to content

Commit

Permalink
[FLINK-36344][runtime/metrics] Introduce lastCheckpointCompletedTimes…
Browse files Browse the repository at this point in the history
…tamp metric
  • Loading branch information
xingsuo-zbz authored and Myasuka committed Sep 30, 2024
1 parent f4cc8e8 commit e5ce2c3
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/content.zh/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,11 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
<td>The identifier of the last completed checkpoint.</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointCompletedTimestamp</td>
<td>The timestamp of the last completed checkpoint (in milliseconds).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointFullSize</td>
<td>The full size of the last checkpoint (in bytes).</td>
Expand Down
5 changes: 5 additions & 0 deletions docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,11 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
<td>The identifier of the last completed checkpoint.</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointCompletedTimestamp</td>
<td>The timestamp of the last completed checkpoint (in milliseconds).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointFullSize</td>
<td>The full size of the last checkpoint (in bytes).</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ private void setDurationSpanAttribute(
@VisibleForTesting
static final String LATEST_COMPLETED_CHECKPOINT_ID_METRIC = "lastCompletedCheckpointId";

@VisibleForTesting
static final String LATEST_CHECKPOINT_COMPLETED_TIMESTAMP = "lastCheckpointCompletedTimestamp";

/**
* Register the exposed metrics.
*
Expand Down Expand Up @@ -468,6 +471,9 @@ private void registerMetrics(MetricGroup metricGroup) {
new LatestCompletedCheckpointExternalPathGauge());
metricGroup.gauge(
LATEST_COMPLETED_CHECKPOINT_ID_METRIC, new LatestCompletedCheckpointIdGauge());
metricGroup.gauge(
LATEST_CHECKPOINT_COMPLETED_TIMESTAMP,
new LatestCheckpointCompletedTimestampGauge());
}

private class CheckpointsCounter implements Gauge<Long> {
Expand Down Expand Up @@ -590,4 +596,16 @@ public Long getValue() {
}
}
}

private class LatestCheckpointCompletedTimestampGauge implements Gauge<Long> {
@Override
public Long getValue() {
CompletedCheckpointStats completed = latestCompletedCheckpoint;
if (completed != null) {
return completed.getLatestAckTimestamp();
} else {
return -1L;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,10 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
.LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC,
DefaultCheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC,
DefaultCheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_ID_METRIC,
DefaultCheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_ID_METRIC));
assertThat(registeredGaugeNames).hasSize(12);
.LATEST_CHECKPOINT_COMPLETED_TIMESTAMP));
assertThat(registeredGaugeNames).hasSize(13);
}

/**
Expand Down Expand Up @@ -565,7 +566,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
CheckpointStatsTracker stats = new DefaultCheckpointStatsTracker(0, metricGroup);

// Make sure to adjust this test if metrics are added/removed
assertThat(registeredGauges).hasSize(12);
assertThat(registeredGauges).hasSize(13);

// Check initial values
Gauge<Long> numCheckpoints =
Expand Down Expand Up @@ -626,6 +627,11 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
registeredGauges.get(
DefaultCheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_ID_METRIC);
Gauge<Long> latestCompletedTimestamp =
(Gauge<Long>)
registeredGauges.get(
DefaultCheckpointStatsTracker
.LATEST_CHECKPOINT_COMPLETED_TIMESTAMP);

assertThat(numCheckpoints.getValue()).isZero();
assertThat(numInProgressCheckpoints.getValue()).isZero();
Expand All @@ -639,6 +645,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
assertThat(latestPersistedData.getValue()).isEqualTo(-1);
assertThat(latestCompletedExternalPath.getValue()).isEqualTo("n/a");
assertThat(latestCompletedId.getValue()).isEqualTo(-1);
assertThat(latestCompletedTimestamp.getValue()).isEqualTo(-1);

PendingCheckpointStats pending =
stats.reportPendingCheckpoint(
Expand Down Expand Up @@ -694,6 +701,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
assertThat(latestCompletedDuration.getValue()).isEqualTo(ackTimestamp);
assertThat(latestCompletedExternalPath.getValue()).isEqualTo(externalPath);
assertThat(latestCompletedId.getValue()).isZero();
assertThat(latestCompletedTimestamp.getValue()).isEqualTo(ackTimestamp);

// Check failed
PendingCheckpointStats nextPending =
Expand Down

0 comments on commit e5ce2c3

Please sign in to comment.