Skip to content

Commit

Permalink
Producer- report a monotonically increasing delta chain version count…
Browse files Browse the repository at this point in the history
… (starting from 1) in blob header and metrics (#708)

* Producer- write monotonically increasing delta chain version counter

* Producer- report metric for delta chain version counter
  • Loading branch information
Sunjeet authored Nov 5, 2024
1 parent 85f7038 commit 45197ba
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.hollow.api.producer;

import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
import static java.lang.System.currentTimeMillis;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -524,6 +525,20 @@ private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion
}
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED);

long prevDeltaChainVersionCounter = 0l;
if (readStates.hasCurrent()) {
String str = readStates.current().getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER);
if (str != null) {
try {
prevDeltaChainVersionCounter = Long.valueOf(str);
} catch (NumberFormatException e) {
// ignore, prevDeltaChainVersionCounter remains 0
}
}
}
long deltaChainVersionCounter = prevDeltaChainVersionCounter + 1;
writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(deltaChainVersionCounter));
}

void populate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.netflix.hollow.api.producer.AbstractHollowProducerListener;
import com.netflix.hollow.api.producer.HollowProducer;
import com.netflix.hollow.core.HollowStateEngine;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -99,6 +100,15 @@ public void onAnnouncementComplete(com.netflix.hollow.api.producer.Status status
.setAnnouncementDurationMillis(elapsed.toMillis());
lastAnnouncementSuccessTimeNanoOptional.ifPresent(announcementMetricsBuilder::setLastAnnouncementSuccessTimeNano);

if (stateEngine.getHeaderTag(HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER) != null) {
try {
long deltaChainVersionCounter = Long.parseLong(stateEngine.getHeaderTag(HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));
announcementMetricsBuilder.setDeltaChainVersionCounter(deltaChainVersionCounter);
} catch (NumberFormatException e) {
// ignore
}
}

announcementMetricsReporting(announcementMetricsBuilder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class AnnouncementMetrics {
private long announcementDurationMillis; // Announcement duration in ms, only applicable to completed cycles (skipped cycles dont announce)
private boolean isAnnouncementSuccess; // true if announcement was successful, false if announcement failed
private OptionalLong lastAnnouncementSuccessTimeNano; // monotonic time of last successful announcement (no relation to wall clock), N/A until first successful announcement

private OptionalLong deltaChainVersionCounter;

public long getDataSizeBytes() {
return dataSizeBytes;
Expand All @@ -47,6 +47,10 @@ public boolean getIsAnnouncementSuccess() {
public OptionalLong getLastAnnouncementSuccessTimeNano() {
return lastAnnouncementSuccessTimeNano;
}
public OptionalLong getDeltaChainVersionCounter() {
return deltaChainVersionCounter;
}


private AnnouncementMetrics(Builder builder) {
this.dataSizeBytes = builder.dataSizeBytes;
Expand All @@ -55,18 +59,21 @@ private AnnouncementMetrics(Builder builder) {
this.announcementDurationMillis = builder.announcementDurationMillis;
this.isAnnouncementSuccess = builder.isAnnouncementSuccess;
this.lastAnnouncementSuccessTimeNano = builder.lastAnnouncementSuccessTimeNano;
this.deltaChainVersionCounter = builder.deltaChainVersionCounter;
}

public static final class Builder {
private long dataSizeBytes;
private long announcementDurationMillis;
private boolean isAnnouncementSuccess;
private OptionalLong lastAnnouncementSuccessTimeNano;
private OptionalLong deltaChainVersionCounter;
private Map<String, Integer> numShardsPerType;
private Map<String, Long> shardSizePerType;

public Builder() {
lastAnnouncementSuccessTimeNano = OptionalLong.empty();
deltaChainVersionCounter = OptionalLong.empty();
}

public Builder setDataSizeBytes(long dataSizeBytes) {
Expand All @@ -93,6 +100,10 @@ public Builder setLastAnnouncementSuccessTimeNano(long lastAnnouncementSuccessTi
this.lastAnnouncementSuccessTimeNano = OptionalLong.of(lastAnnouncementSuccessTimeNano);
return this;
}
public Builder setDeltaChainVersionCounter(long deltaChainVersionCounter) {
this.deltaChainVersionCounter = OptionalLong.of(deltaChainVersionCounter);
return this;
}

public AnnouncementMetrics build() {
return new AnnouncementMetrics(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public interface HollowStateEngine extends HollowDataset {
*/
String HEADER_TAG_PRODUCER_TO_VERSION = "hollow.blob.to.version";

/**
* A header tag indicating monotonically increasing version in the same delta chain
*/
String HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER = "hollow.delta.chain.version.counter";

@Override
List<HollowSchema> getSchemas();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.netflix.hollow.api.producer.metrics;

import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

import com.netflix.hollow.api.producer.HollowProducer;
Expand Down Expand Up @@ -114,6 +116,7 @@ public void cycleMetricsReporting(CycleMetrics cycleMetrics) {

@Test
public void testAnnouncementCompleteWithSuccess() {
when(mockStateEngine.getHeaderTag(eq(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER))).thenReturn("1");
final class TestProducerMetricsListener extends AbstractProducerMetricsListener {
@Override
public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics) {
Expand All @@ -124,6 +127,8 @@ public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics
announcementMetrics.getAnnouncementDurationMillis());
Assert.assertNotEquals(OptionalLong.of(TEST_LAST_ANNOUNCEMENT_NANOS),
announcementMetrics.getLastAnnouncementSuccessTimeNano());
Assert.assertEquals(OptionalLong.of(1l),
announcementMetrics.getDeltaChainVersionCounter());
}
}

Expand All @@ -146,6 +151,8 @@ public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics
announcementMetrics.getAnnouncementDurationMillis());
Assert.assertEquals(OptionalLong.of(TEST_LAST_ANNOUNCEMENT_NANOS),
announcementMetrics.getLastAnnouncementSuccessTimeNano());
Assert.assertEquals(OptionalLong.empty(),
announcementMetrics.getDeltaChainVersionCounter());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.hollow.core.util;

import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -46,6 +47,7 @@ public void recreatesUsingReadEngine() throws IOException {
}
writeEngine.addHeaderTag("CopyTag", "copied");
writeEngine.addHeaderTag(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(System.currentTimeMillis()));
writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, "1");
String toVersion = String.valueOf(System.currentTimeMillis());
writeEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, toVersion);

Expand All @@ -55,6 +57,7 @@ public void recreatesUsingReadEngine() throws IOException {
HollowWriteStateEngine recreatedWriteEngine = HollowWriteStateCreator.recreateAndPopulateUsingReadEngine(readEngine);
assertEquals(cycleStartTime, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_METRIC_CYCLE_START));
assertEquals(readEngineToVersion, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_PRODUCER_TO_VERSION));
assertEquals("1", recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));
assertEquals(8, recreatedWriteEngine.getTypeState("Integer").getNumShards());

HollowReadStateEngine recreatedReadEngine = StateEngineRoundTripper.roundTripSnapshot(recreatedWriteEngine);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.hollow.core.write;

import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -61,10 +62,12 @@ public void testHeaderTagsOnDeltaAndReverseDelta() {
consumer.triggerRefreshTo(version3); // delta transition
assertEquals("3", consumer.getStateEngine().getHeaderTag(TEST_TAG));
assertEquals(String.valueOf(version3), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION));
assertEquals("3", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));

consumer.triggerRefreshTo(version2); // reverse delta transition
assertEquals("2", consumer.getStateEngine().getHeaderTag(TEST_TAG));
assertEquals(String.valueOf(version2), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION));
assertEquals("2", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));

}

Expand Down

0 comments on commit 45197ba

Please sign in to comment.