Skip to content

Commit

Permalink
Consumer- report delta chain version counter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Nov 18, 2024
1 parent 028b616 commit ff6fa9e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.hollow.api.consumer.metrics;

import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_ANNOUNCEMENT;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START;

Expand Down Expand Up @@ -59,12 +60,15 @@ public abstract class AbstractRefreshMetricsListener extends AbstractRefreshList
private final Map<Long, Long> announcementTimestamps;
private volatile boolean namespacePinnedPreviously;

private final Map<Long, Long> cycleVersionDeltaCounters; // delta chain version counter for each cycle version

public AbstractRefreshMetricsListener() {
lastRefreshTimeNanoOptional = OptionalLong.empty();
consecutiveFailures = 0l;
cycleVersionStartTimes = new HashMap<>();
announcementTimestamps = new HashMap<>();
namespacePinnedPreviously = false;
cycleVersionDeltaCounters = new HashMap<>();
}

public void refreshStarted(long currentVersion, long requestedVersion) {
Expand All @@ -73,7 +77,9 @@ public void refreshStarted(long currentVersion, long requestedVersion) {
refreshMetricsBuilder = new ConsumerRefreshMetrics.Builder();
refreshMetricsBuilder.setIsInitialLoad(currentVersion == VERSION_NONE);
refreshMetricsBuilder.setUpdatePlanDetails(updatePlanDetails);
cycleVersionStartTimes.clear(); // clear map to avoid accumulation over time
// clear maps to avoid accumulation over time
cycleVersionStartTimes.clear();
cycleVersionDeltaCounters.clear();
}

@Override
Expand All @@ -91,7 +97,7 @@ public void versionDetected(HollowConsumer.VersionInfo requestedVersionInfo) {
// or for the newVersion). Don't record this metric when a namespace was pinned previously and gets unpinned
// in the next cycle because this metric will record the refresh duration from the latest announced version.
if (!(namespacePinnedPreviously || isPinned)) {
trackTimestampsFromHeaders(requestedVersionInfo.getVersion(),
trackHeaderTagInVersion(requestedVersionInfo.getVersion(),
requestedVersionInfo.getAnnouncementMetadata().get(), HEADER_TAG_METRIC_ANNOUNCEMENT, announcementTimestamps);
}
namespacePinnedPreviously = isPinned;
Expand Down Expand Up @@ -160,6 +166,9 @@ public void refreshSuccessful(long beforeVersion, long afterVersion, long reques
if (cycleVersionStartTimes.containsKey(afterVersion)) {
refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion));
}
if (cycleVersionDeltaCounters.containsKey(afterVersion)) {
refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion));
}

if (afterVersion == requestedVersion && announcementTimestamps.containsKey(afterVersion)) {
refreshMetricsBuilder.setAnnouncementTimestamp(announcementTimestamps.get(afterVersion));
Expand All @@ -186,32 +195,37 @@ public void refreshFailed(long beforeVersion, long afterVersion, long requestedV
if (cycleVersionStartTimes.containsKey(afterVersion)) {
refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion));
}
if (cycleVersionDeltaCounters.containsKey(afterVersion)) {
refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion));
}

noFailRefreshEndMetricsReporting(refreshMetricsBuilder.build());
}

@Override
public void snapshotUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) {
trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters);
}

@Override
public void deltaUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) {
trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters);
}

/**
* If the blob header contains the timestamps like producer cycle start and announcement then save those values in
* the maps tracking version to cycle start time and version to announcement respectively.
* If the blob header contains a value for the given header tag (like producer cycle start time) then save that value in
* a maps tracking the value per version in this refresh.
*/
private void trackTimestampsFromHeaders(long version, Map<String, String> headers, String headerTag, Map<Long, Long> timestampsMap) {
private void trackHeaderTagInVersion(long version, Map<String, String> headers, String headerTag, Map<Long, Long> tracker) {
if (headers != null) {
String headerTagValue = headers.get(headerTag);
if (headerTagValue != null && !headerTagValue.isEmpty()) {
try {
Long timestamp = Long.valueOf(headerTagValue);
if (timestamp != null) {
timestampsMap.put(version, timestamp);
Long val = Long.valueOf(headerTagValue);
if (val != null) {
tracker.put(version, val);
}
} catch (NumberFormatException e) {
log.log(Level.WARNING, "Blob header contained " + headerTag + " but its value could"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ConsumerRefreshMetrics {
private long refreshEndTimeNano; // monotonic system time when refresh ended
private OptionalLong cycleStartTimestamp; // timestamp in millis of when cycle started for the loaded data version
private OptionalLong announcementTimestamp; // timestamp in milliseconds to mark announcement for the loaded data version
private OptionalLong deltaChainVersionCounter; // the sequence number of a version in a delta chain

/**
* A class that contains details of the consumer refresh update plan that may be useful to report as metrics or logs.
Expand Down Expand Up @@ -84,8 +85,12 @@ public long getRefreshEndTimeNano() {
public OptionalLong getCycleStartTimestamp() {
return cycleStartTimestamp;
}

public OptionalLong getAnnouncementTimestamp() { return announcementTimestamp; }
public OptionalLong getAnnouncementTimestamp() {
return announcementTimestamp;
}
public OptionalLong getDeltaChainVersionCounter() {
return deltaChainVersionCounter;
}

private ConsumerRefreshMetrics(Builder builder) {
this.durationMillis = builder.durationMillis;
Expand All @@ -98,6 +103,7 @@ private ConsumerRefreshMetrics(Builder builder) {
this.refreshEndTimeNano = builder.refreshEndTimeNano;
this.cycleStartTimestamp = builder.cycleStartTimestamp;
this.announcementTimestamp = builder.announcementTimestamp;
this.deltaChainVersionCounter = builder.deltaChainVersionCounter;
}

public static final class Builder {
Expand All @@ -111,11 +117,13 @@ public static final class Builder {
private long refreshEndTimeNano;
private OptionalLong cycleStartTimestamp;
private OptionalLong announcementTimestamp;
private OptionalLong deltaChainVersionCounter;

public Builder() {
refreshSuccessAgeMillisOptional = OptionalLong.empty();
cycleStartTimestamp = OptionalLong.empty();
announcementTimestamp = OptionalLong.empty();
deltaChainVersionCounter = OptionalLong.empty();
}

public Builder setDurationMillis(long durationMillis) {
Expand Down Expand Up @@ -160,6 +168,10 @@ public Builder setAnnouncementTimestamp(long announcementTimestamp) {
this.announcementTimestamp = OptionalLong.of(announcementTimestamp);
return this;
}
public Builder setDeltaChainVersionCounter(long deltaChainVersionCounter) {
this.deltaChainVersionCounter = OptionalLong.of(deltaChainVersionCounter);
return this;
}

public ConsumerRefreshMetrics build() {
return new ConsumerRefreshMetrics(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.netflix.hollow.api.consumer.metrics;

import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_ANNOUNCEMENT;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START;
import static org.junit.Assert.assertEquals;
Expand All @@ -11,13 +12,12 @@
import com.netflix.hollow.api.producer.HollowProducer;
import com.netflix.hollow.api.producer.fs.HollowInMemoryBlobStager;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import com.netflix.hollow.test.InMemoryBlobStore;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.netflix.hollow.test.InMemoryBlobStore;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -129,13 +129,15 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
Assert.assertNotEquals(0l, refreshMetrics.getRefreshEndTimeNano());
assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong());
assertEquals(TEST_ANNOUNCEMENT_TIMESTAMP, refreshMetrics.getAnnouncementTimestamp().getAsLong());
assertEquals(1l, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
}
}
SuccessTestRefreshMetricsListener successTestRefreshMetricsListener = new SuccessTestRefreshMetricsListener();
successTestRefreshMetricsListener.refreshStarted(TEST_VERSION_LOW, TEST_VERSION_HIGH);

testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP));
testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP));
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l));
successTestRefreshMetricsListener.snapshotUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH);

successTestRefreshMetricsListener.refreshSuccessful(TEST_VERSION_LOW, TEST_VERSION_HIGH, TEST_VERSION_HIGH);
Expand All @@ -152,6 +154,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
Assert.assertNotEquals(0l, refreshMetrics.getRefreshEndTimeNano());
Assert.assertFalse(refreshMetrics.getCycleStartTimestamp().isPresent());
Assert.assertFalse(refreshMetrics.getAnnouncementTimestamp().isPresent());
Assert.assertFalse(refreshMetrics.getDeltaChainVersionCounter().isPresent());
}
}
FailureTestRefreshMetricsListener failTestRefreshMetricsListener = new FailureTestRefreshMetricsListener();
Expand All @@ -168,6 +171,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
assertEquals(3, refreshMetrics.getUpdatePlanDetails().getNumSuccessfulTransitions());
assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong());
assertEquals(TEST_ANNOUNCEMENT_TIMESTAMP, refreshMetrics.getAnnouncementTimestamp().getAsLong());
assertEquals(3l, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
}
}
List<HollowConsumer.Blob.BlobType> testTransitionSequence = new ArrayList<HollowConsumer.Blob.BlobType>() {{
Expand All @@ -183,16 +187,19 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
successTestRefreshMetricsListener.blobLoaded(null);
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP-2));
testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP-2));
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l));
successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH-2);

successTestRefreshMetricsListener.blobLoaded(null);
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP-1));
testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP-1));
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(2l));
successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH-1);

successTestRefreshMetricsListener.blobLoaded(null);
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP));
testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP));
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(3l));
successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH);

successTestRefreshMetricsListener.refreshSuccessful(TEST_VERSION_LOW, TEST_VERSION_HIGH, TEST_VERSION_HIGH);
Expand All @@ -205,6 +212,7 @@ class FailureTestRefreshMetricsListener extends AbstractRefreshMetricsListener {
public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
assertEquals(1, refreshMetrics.getUpdatePlanDetails().getNumSuccessfulTransitions());
assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong());
assertEquals(1l, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
}
}
List<HollowConsumer.Blob.BlobType> testTransitionSequence = new ArrayList<HollowConsumer.Blob.BlobType>() {{
Expand All @@ -219,14 +227,15 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {

failureTestRefreshMetricsListener.blobLoaded(null);
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP));
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l));
failureTestRefreshMetricsListener.snapshotUpdateOccurred(null, mockStateEngine, TEST_VERSION_LOW);

failureTestRefreshMetricsListener.refreshFailed(TEST_VERSION_LOW-1, TEST_VERSION_LOW, TEST_VERSION_HIGH, null);

}

@Test
public void testCycleStart() { // also exercises reverse delta transition
public void testCycles() { // also exercises reverse delta transition
InMemoryBlobStore blobStore = new InMemoryBlobStore();
HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager();
HollowProducer p = HollowProducer
Expand All @@ -252,16 +261,20 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
switch (run) {
case 1:
assertEquals(1L, refreshMetrics.getCycleStartTimestamp().getAsLong());
assertEquals(1L, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
break;
case 2:
case 5:
assertEquals(2L, refreshMetrics.getCycleStartTimestamp().getAsLong());
assertEquals(2L, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
break;
case 3:
assertEquals(1L, refreshMetrics.getCycleStartTimestamp().getAsLong());
assertEquals(1L, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
break;
case 4:
assertEquals(3L, refreshMetrics.getCycleStartTimestamp().getAsLong());
assertEquals(3L, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
break;
}
}
Expand Down

0 comments on commit ff6fa9e

Please sign in to comment.