Skip to content

Commit

Permalink
fix: resharding header tag snowballing
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Jan 5, 2024
1 parent 631b90d commit 704634a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.hollow.api.producer;

import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
import static java.lang.System.currentTimeMillis;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -393,15 +394,9 @@ long runCycle(

// 3. Produce a new state if there's work to do
if (writeEngine.hasChangedSinceLastCycle()) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_HASH, new HollowSchemaHash(writeEngine.getSchemas()).getHash());
boolean schemaChangedFromPriorVersion = readStates.hasCurrent() &&
!writeEngine.hasIdenticalSchemas(readStates.current().getStateEngine());
if (schemaChangedFromPriorVersion) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString());
} else {
writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE);
}
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
updateHeaderTags(writeEngine, toVersion, schemaChangedFromPriorVersion);

// 3a. Publish, run checks & validation, then announce new state consumers
publish(listeners, toVersion, artifacts);
Expand Down Expand Up @@ -520,6 +515,17 @@ public void removeListener(HollowProducerEventListener listener) {
listeners.removeListener(listener);
}

private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion, boolean schemaChangedFromPriorVersion) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_HASH, new HollowSchemaHash(writeEngine.getSchemas()).getHash());
if (schemaChangedFromPriorVersion) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString());
} else {
writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE);
}
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED);
}

void populate(
ProducerListeners listeners,
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void onAnnouncementComplete(com.netflix.hollow.api.producer.Status status

HollowReadStateEngine stateEngine = readState.getStateEngine();
dataSizeBytes = stateEngine.calcApproxDataSize();

Map<String, Integer> numShardsPerType = stateEngine.numShardsPerType();
Map<String, Long> shardSizePerType = stateEngine.calcApproxShardSizePerType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ public void testHeaderTagsWhenResharding() {
ws.add(i);
}

});
assertEquals(4, producer.getWriteEngine().getTypeState("Integer").getNumShards());
long v3 = producer.runCycle(ws -> {
// remain at 4 shards for Integer
for (int i=0;i<99;i++) {
ws.add(i);
}

});
assertEquals(4, producer.getWriteEngine().getTypeState("Integer").getNumShards());
HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore)
Expand All @@ -112,5 +120,10 @@ public int maxDeltasBeforeDoubleSnapshot() {
assertEquals(v2, consumer.getCurrentVersionId());
assertEquals(4, consumer.getStateEngine().getTypeState("Integer").numShards());
assertEquals("Integer:(2,4)", consumer.getStateEngine().getHeaderTag(HEADER_TAG_TYPE_RESHARDING_INVOKED));

consumer.triggerRefreshTo(v3);
assertEquals(v3, consumer.getCurrentVersionId());
assertEquals(4, consumer.getStateEngine().getTypeState("Integer").numShards());
assertEquals(false, consumer.getStateEngine().getHeaderTags().containsKey(HEADER_TAG_TYPE_RESHARDING_INVOKED));
}
}

0 comments on commit 704634a

Please sign in to comment.