diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 8b7b0e3e18..35f33e8889 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -17,6 +17,7 @@ package com.netflix.hollow.api.producer; import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED; import static java.lang.System.currentTimeMillis; import static java.util.stream.Collectors.toList; @@ -524,6 +525,20 @@ private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion } writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion)); writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED); + + long prevDeltaChainVersionCounter = 0l; + if (readStates.hasCurrent()) { + String str = readStates.current().getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER); + if (str != null) { + try { + prevDeltaChainVersionCounter = Long.valueOf(str); + } catch (NumberFormatException e) { + // ignore, prevDeltaChainVersionCounter remains 0 + } + } + } + long deltaChainVersionCounter = prevDeltaChainVersionCounter + 1; + writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(deltaChainVersionCounter)); } void populate( diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java b/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java index 33c6430a8b..8dce80ad63 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java @@ -18,6 +18,7 @@ import com.netflix.hollow.api.producer.AbstractHollowProducerListener; import com.netflix.hollow.api.producer.HollowProducer; +import com.netflix.hollow.core.HollowStateEngine; import com.netflix.hollow.core.read.engine.HollowReadStateEngine; import java.time.Duration; import java.util.Map; @@ -99,6 +100,15 @@ public void onAnnouncementComplete(com.netflix.hollow.api.producer.Status status .setAnnouncementDurationMillis(elapsed.toMillis()); lastAnnouncementSuccessTimeNanoOptional.ifPresent(announcementMetricsBuilder::setLastAnnouncementSuccessTimeNano); + if (stateEngine.getHeaderTag(HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER) != null) { + try { + long deltaChainVersionCounter = Long.parseLong(stateEngine.getHeaderTag(HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER)); + announcementMetricsBuilder.setDeltaChainVersionCounter(deltaChainVersionCounter); + } catch (NumberFormatException e) { + // ignore + } + } + announcementMetricsReporting(announcementMetricsBuilder.build()); } diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AnnouncementMetrics.java b/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AnnouncementMetrics.java index 4441509b97..644384e891 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AnnouncementMetrics.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AnnouncementMetrics.java @@ -27,7 +27,7 @@ public class AnnouncementMetrics { private long announcementDurationMillis; // Announcement duration in ms, only applicable to completed cycles (skipped cycles dont announce) private boolean isAnnouncementSuccess; // true if announcement was successful, false if announcement failed private OptionalLong lastAnnouncementSuccessTimeNano; // monotonic time of last successful announcement (no relation to wall clock), N/A until first successful announcement - + private OptionalLong deltaChainVersionCounter; public long getDataSizeBytes() { return dataSizeBytes; @@ -47,6 +47,10 @@ public boolean getIsAnnouncementSuccess() { public OptionalLong getLastAnnouncementSuccessTimeNano() { return lastAnnouncementSuccessTimeNano; } + public OptionalLong getDeltaChainVersionCounter() { + return deltaChainVersionCounter; + } + private AnnouncementMetrics(Builder builder) { this.dataSizeBytes = builder.dataSizeBytes; @@ -55,6 +59,7 @@ private AnnouncementMetrics(Builder builder) { this.announcementDurationMillis = builder.announcementDurationMillis; this.isAnnouncementSuccess = builder.isAnnouncementSuccess; this.lastAnnouncementSuccessTimeNano = builder.lastAnnouncementSuccessTimeNano; + this.deltaChainVersionCounter = builder.deltaChainVersionCounter; } public static final class Builder { @@ -62,11 +67,13 @@ public static final class Builder { private long announcementDurationMillis; private boolean isAnnouncementSuccess; private OptionalLong lastAnnouncementSuccessTimeNano; + private OptionalLong deltaChainVersionCounter; private Map numShardsPerType; private Map shardSizePerType; public Builder() { lastAnnouncementSuccessTimeNano = OptionalLong.empty(); + deltaChainVersionCounter = OptionalLong.empty(); } public Builder setDataSizeBytes(long dataSizeBytes) { @@ -93,6 +100,10 @@ public Builder setLastAnnouncementSuccessTimeNano(long lastAnnouncementSuccessTi this.lastAnnouncementSuccessTimeNano = OptionalLong.of(lastAnnouncementSuccessTimeNano); return this; } + public Builder setDeltaChainVersionCounter(long deltaChainVersionCounter) { + this.deltaChainVersionCounter = OptionalLong.of(deltaChainVersionCounter); + return this; + } public AnnouncementMetrics build() { return new AnnouncementMetrics(this); diff --git a/hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java b/hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java index 2b6bf016d3..f8c3b9af3a 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java +++ b/hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java @@ -75,6 +75,11 @@ public interface HollowStateEngine extends HollowDataset { */ String HEADER_TAG_PRODUCER_TO_VERSION = "hollow.blob.to.version"; + /** + * A header tag indicating monotonically increasing version in the same delta chain + */ + String HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER = "hollow.delta.chain.version.counter"; + @Override List getSchemas(); diff --git a/hollow/src/test/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListenerTest.java b/hollow/src/test/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListenerTest.java index 87d8060036..0d9cef4b9a 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListenerTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListenerTest.java @@ -1,5 +1,7 @@ package com.netflix.hollow.api.producer.metrics; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; import com.netflix.hollow.api.producer.HollowProducer; @@ -114,6 +116,7 @@ public void cycleMetricsReporting(CycleMetrics cycleMetrics) { @Test public void testAnnouncementCompleteWithSuccess() { + when(mockStateEngine.getHeaderTag(eq(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER))).thenReturn("1"); final class TestProducerMetricsListener extends AbstractProducerMetricsListener { @Override public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics) { @@ -124,6 +127,8 @@ public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics announcementMetrics.getAnnouncementDurationMillis()); Assert.assertNotEquals(OptionalLong.of(TEST_LAST_ANNOUNCEMENT_NANOS), announcementMetrics.getLastAnnouncementSuccessTimeNano()); + Assert.assertEquals(OptionalLong.of(1l), + announcementMetrics.getDeltaChainVersionCounter()); } } @@ -146,6 +151,8 @@ public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics announcementMetrics.getAnnouncementDurationMillis()); Assert.assertEquals(OptionalLong.of(TEST_LAST_ANNOUNCEMENT_NANOS), announcementMetrics.getLastAnnouncementSuccessTimeNano()); + Assert.assertEquals(OptionalLong.empty(), + announcementMetrics.getDeltaChainVersionCounter()); } } diff --git a/hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java b/hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java index 9fe82e99db..7b17a59336 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java @@ -16,6 +16,7 @@ */ package com.netflix.hollow.core.util; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; import static org.junit.Assert.assertEquals; @@ -46,6 +47,7 @@ public void recreatesUsingReadEngine() throws IOException { } writeEngine.addHeaderTag("CopyTag", "copied"); writeEngine.addHeaderTag(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(System.currentTimeMillis())); + writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, "1"); String toVersion = String.valueOf(System.currentTimeMillis()); writeEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, toVersion); @@ -55,6 +57,7 @@ public void recreatesUsingReadEngine() throws IOException { HollowWriteStateEngine recreatedWriteEngine = HollowWriteStateCreator.recreateAndPopulateUsingReadEngine(readEngine); assertEquals(cycleStartTime, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_METRIC_CYCLE_START)); assertEquals(readEngineToVersion, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_PRODUCER_TO_VERSION)); + assertEquals("1", recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER)); assertEquals(8, recreatedWriteEngine.getTypeState("Integer").getNumShards()); HollowReadStateEngine recreatedReadEngine = StateEngineRoundTripper.roundTripSnapshot(recreatedWriteEngine); diff --git a/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java b/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java index 0f39affe80..8d4829a24a 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java @@ -1,5 +1,6 @@ package com.netflix.hollow.core.write; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED; import static org.junit.Assert.assertEquals; @@ -61,10 +62,12 @@ public void testHeaderTagsOnDeltaAndReverseDelta() { consumer.triggerRefreshTo(version3); // delta transition assertEquals("3", consumer.getStateEngine().getHeaderTag(TEST_TAG)); assertEquals(String.valueOf(version3), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION)); + assertEquals("3", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER)); consumer.triggerRefreshTo(version2); // reverse delta transition assertEquals("2", consumer.getStateEngine().getHeaderTag(TEST_TAG)); assertEquals(String.valueOf(version2), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION)); + assertEquals("2", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER)); }