diff --git a/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java b/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java index ec540044cd..a3302438ae 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java +++ b/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java @@ -17,6 +17,7 @@ package com.netflix.hollow.api.consumer.metrics; import static com.netflix.hollow.core.HollowConstants.VERSION_NONE; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_ANNOUNCEMENT; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START; @@ -59,12 +60,15 @@ public abstract class AbstractRefreshMetricsListener extends AbstractRefreshList private final Map announcementTimestamps; private volatile boolean namespacePinnedPreviously; + private final Map cycleVersionDeltaCounters; // delta chain version counter for each cycle version + public AbstractRefreshMetricsListener() { lastRefreshTimeNanoOptional = OptionalLong.empty(); consecutiveFailures = 0l; cycleVersionStartTimes = new HashMap<>(); announcementTimestamps = new HashMap<>(); namespacePinnedPreviously = false; + cycleVersionDeltaCounters = new HashMap<>(); } public void refreshStarted(long currentVersion, long requestedVersion) { @@ -73,7 +77,9 @@ public void refreshStarted(long currentVersion, long requestedVersion) { refreshMetricsBuilder = new ConsumerRefreshMetrics.Builder(); refreshMetricsBuilder.setIsInitialLoad(currentVersion == VERSION_NONE); refreshMetricsBuilder.setUpdatePlanDetails(updatePlanDetails); - cycleVersionStartTimes.clear(); // clear map to avoid accumulation over time + // clear maps to avoid accumulation over time + cycleVersionStartTimes.clear(); + cycleVersionDeltaCounters.clear(); } @Override @@ -91,7 +97,7 @@ public void versionDetected(HollowConsumer.VersionInfo requestedVersionInfo) { // or for the newVersion). Don't record this metric when a namespace was pinned previously and gets unpinned // in the next cycle because this metric will record the refresh duration from the latest announced version. if (!(namespacePinnedPreviously || isPinned)) { - trackTimestampsFromHeaders(requestedVersionInfo.getVersion(), + trackHeaderTagInVersion(requestedVersionInfo.getVersion(), requestedVersionInfo.getAnnouncementMetadata().get(), HEADER_TAG_METRIC_ANNOUNCEMENT, announcementTimestamps); } namespacePinnedPreviously = isPinned; @@ -160,6 +166,9 @@ public void refreshSuccessful(long beforeVersion, long afterVersion, long reques if (cycleVersionStartTimes.containsKey(afterVersion)) { refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion)); } + if (cycleVersionDeltaCounters.containsKey(afterVersion)) { + refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion)); + } if (afterVersion == requestedVersion && announcementTimestamps.containsKey(afterVersion)) { refreshMetricsBuilder.setAnnouncementTimestamp(announcementTimestamps.get(afterVersion)); @@ -186,32 +195,37 @@ public void refreshFailed(long beforeVersion, long afterVersion, long requestedV if (cycleVersionStartTimes.containsKey(afterVersion)) { refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion)); } + if (cycleVersionDeltaCounters.containsKey(afterVersion)) { + refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion)); + } noFailRefreshEndMetricsReporting(refreshMetricsBuilder.build()); } @Override public void snapshotUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) { - trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes); + trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes); + trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters); } @Override public void deltaUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) { - trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes); + trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes); + trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters); } /** - * If the blob header contains the timestamps like producer cycle start and announcement then save those values in - * the maps tracking version to cycle start time and version to announcement respectively. + * If the blob header contains a value for the given header tag (like producer cycle start time) then save that value in + * a maps tracking the value per version in this refresh. */ - private void trackTimestampsFromHeaders(long version, Map headers, String headerTag, Map timestampsMap) { + private void trackHeaderTagInVersion(long version, Map headers, String headerTag, Map tracker) { if (headers != null) { String headerTagValue = headers.get(headerTag); if (headerTagValue != null && !headerTagValue.isEmpty()) { try { - Long timestamp = Long.valueOf(headerTagValue); - if (timestamp != null) { - timestampsMap.put(version, timestamp); + Long val = Long.valueOf(headerTagValue); + if (val != null) { + tracker.put(version, val); } } catch (NumberFormatException e) { log.log(Level.WARNING, "Blob header contained " + headerTag + " but its value could" diff --git a/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java b/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java index bc845fb0e5..d13c620990 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java +++ b/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java @@ -32,6 +32,7 @@ public class ConsumerRefreshMetrics { private long refreshEndTimeNano; // monotonic system time when refresh ended private OptionalLong cycleStartTimestamp; // timestamp in millis of when cycle started for the loaded data version private OptionalLong announcementTimestamp; // timestamp in milliseconds to mark announcement for the loaded data version + private OptionalLong deltaChainVersionCounter; // the sequence number of a version in a delta chain /** * A class that contains details of the consumer refresh update plan that may be useful to report as metrics or logs. @@ -84,8 +85,12 @@ public long getRefreshEndTimeNano() { public OptionalLong getCycleStartTimestamp() { return cycleStartTimestamp; } - - public OptionalLong getAnnouncementTimestamp() { return announcementTimestamp; } + public OptionalLong getAnnouncementTimestamp() { + return announcementTimestamp; + } + public OptionalLong getDeltaChainVersionCounter() { + return deltaChainVersionCounter; + } private ConsumerRefreshMetrics(Builder builder) { this.durationMillis = builder.durationMillis; @@ -98,6 +103,7 @@ private ConsumerRefreshMetrics(Builder builder) { this.refreshEndTimeNano = builder.refreshEndTimeNano; this.cycleStartTimestamp = builder.cycleStartTimestamp; this.announcementTimestamp = builder.announcementTimestamp; + this.deltaChainVersionCounter = builder.deltaChainVersionCounter; } public static final class Builder { @@ -111,11 +117,13 @@ public static final class Builder { private long refreshEndTimeNano; private OptionalLong cycleStartTimestamp; private OptionalLong announcementTimestamp; + private OptionalLong deltaChainVersionCounter; public Builder() { refreshSuccessAgeMillisOptional = OptionalLong.empty(); cycleStartTimestamp = OptionalLong.empty(); announcementTimestamp = OptionalLong.empty(); + deltaChainVersionCounter = OptionalLong.empty(); } public Builder setDurationMillis(long durationMillis) { @@ -160,6 +168,10 @@ public Builder setAnnouncementTimestamp(long announcementTimestamp) { this.announcementTimestamp = OptionalLong.of(announcementTimestamp); return this; } + public Builder setDeltaChainVersionCounter(long deltaChainVersionCounter) { + this.deltaChainVersionCounter = OptionalLong.of(deltaChainVersionCounter); + return this; + } public ConsumerRefreshMetrics build() { return new ConsumerRefreshMetrics(this); diff --git a/hollow/src/test/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListenerTest.java b/hollow/src/test/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListenerTest.java index 1048e63b53..f43d9f8253 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListenerTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListenerTest.java @@ -1,6 +1,7 @@ package com.netflix.hollow.api.consumer.metrics; import static com.netflix.hollow.core.HollowConstants.VERSION_NONE; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_ANNOUNCEMENT; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START; import static org.junit.Assert.assertEquals; @@ -11,13 +12,12 @@ import com.netflix.hollow.api.producer.HollowProducer; import com.netflix.hollow.api.producer.fs.HollowInMemoryBlobStager; import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.test.InMemoryBlobStore; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; - -import com.netflix.hollow.test.InMemoryBlobStore; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -129,6 +129,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { Assert.assertNotEquals(0l, refreshMetrics.getRefreshEndTimeNano()); assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong()); assertEquals(TEST_ANNOUNCEMENT_TIMESTAMP, refreshMetrics.getAnnouncementTimestamp().getAsLong()); + assertEquals(1l, refreshMetrics.getDeltaChainVersionCounter().getAsLong()); } } SuccessTestRefreshMetricsListener successTestRefreshMetricsListener = new SuccessTestRefreshMetricsListener(); @@ -136,6 +137,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP)); testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP)); + testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l)); successTestRefreshMetricsListener.snapshotUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH); successTestRefreshMetricsListener.refreshSuccessful(TEST_VERSION_LOW, TEST_VERSION_HIGH, TEST_VERSION_HIGH); @@ -152,6 +154,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { Assert.assertNotEquals(0l, refreshMetrics.getRefreshEndTimeNano()); Assert.assertFalse(refreshMetrics.getCycleStartTimestamp().isPresent()); Assert.assertFalse(refreshMetrics.getAnnouncementTimestamp().isPresent()); + Assert.assertFalse(refreshMetrics.getDeltaChainVersionCounter().isPresent()); } } FailureTestRefreshMetricsListener failTestRefreshMetricsListener = new FailureTestRefreshMetricsListener(); @@ -168,6 +171,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { assertEquals(3, refreshMetrics.getUpdatePlanDetails().getNumSuccessfulTransitions()); assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong()); assertEquals(TEST_ANNOUNCEMENT_TIMESTAMP, refreshMetrics.getAnnouncementTimestamp().getAsLong()); + assertEquals(3l, refreshMetrics.getDeltaChainVersionCounter().getAsLong()); } } List testTransitionSequence = new ArrayList() {{ @@ -183,16 +187,19 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { successTestRefreshMetricsListener.blobLoaded(null); testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP-2)); testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP-2)); + testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l)); successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH-2); successTestRefreshMetricsListener.blobLoaded(null); testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP-1)); testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP-1)); + testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(2l)); successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH-1); successTestRefreshMetricsListener.blobLoaded(null); testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP)); testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP)); + testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(3l)); successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH); successTestRefreshMetricsListener.refreshSuccessful(TEST_VERSION_LOW, TEST_VERSION_HIGH, TEST_VERSION_HIGH); @@ -205,6 +212,7 @@ class FailureTestRefreshMetricsListener extends AbstractRefreshMetricsListener { public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { assertEquals(1, refreshMetrics.getUpdatePlanDetails().getNumSuccessfulTransitions()); assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong()); + assertEquals(1l, refreshMetrics.getDeltaChainVersionCounter().getAsLong()); } } List testTransitionSequence = new ArrayList() {{ @@ -219,6 +227,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { failureTestRefreshMetricsListener.blobLoaded(null); testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP)); + testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l)); failureTestRefreshMetricsListener.snapshotUpdateOccurred(null, mockStateEngine, TEST_VERSION_LOW); failureTestRefreshMetricsListener.refreshFailed(TEST_VERSION_LOW-1, TEST_VERSION_LOW, TEST_VERSION_HIGH, null); @@ -226,7 +235,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { } @Test - public void testCycleStart() { // also exercises reverse delta transition + public void testCycles() { // also exercises reverse delta transition InMemoryBlobStore blobStore = new InMemoryBlobStore(); HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); HollowProducer p = HollowProducer @@ -252,16 +261,20 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) { switch (run) { case 1: assertEquals(1L, refreshMetrics.getCycleStartTimestamp().getAsLong()); + assertEquals(1L, refreshMetrics.getDeltaChainVersionCounter().getAsLong()); break; case 2: case 5: assertEquals(2L, refreshMetrics.getCycleStartTimestamp().getAsLong()); + assertEquals(2L, refreshMetrics.getDeltaChainVersionCounter().getAsLong()); break; case 3: assertEquals(1L, refreshMetrics.getCycleStartTimestamp().getAsLong()); + assertEquals(1L, refreshMetrics.getDeltaChainVersionCounter().getAsLong()); break; case 4: assertEquals(3L, refreshMetrics.getCycleStartTimestamp().getAsLong()); + assertEquals(3L, refreshMetrics.getDeltaChainVersionCounter().getAsLong()); break; } }