From 704634a38c992d49e6b6a5142b02bc01fcf203e8 Mon Sep 17 00:00:00 2001 From: Eduardo Ramirez Date: Fri, 5 Jan 2024 07:28:55 -0800 Subject: [PATCH] fix: resharding header tag snowballing --- .../api/producer/AbstractHollowProducer.java | 20 ++++++++++++------- .../AbstractProducerMetricsListener.java | 1 - .../write/HollowWriteStateEngineTest.java | 13 ++++++++++++ 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 2a2c7a3e8f..8b7b0e3e18 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -17,6 +17,7 @@ package com.netflix.hollow.api.producer; import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED; import static java.lang.System.currentTimeMillis; import static java.util.stream.Collectors.toList; @@ -393,15 +394,9 @@ long runCycle( // 3. Produce a new state if there's work to do if (writeEngine.hasChangedSinceLastCycle()) { - writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_HASH, new HollowSchemaHash(writeEngine.getSchemas()).getHash()); boolean schemaChangedFromPriorVersion = readStates.hasCurrent() && !writeEngine.hasIdenticalSchemas(readStates.current().getStateEngine()); - if (schemaChangedFromPriorVersion) { - writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString()); - } else { - writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE); - } - writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion)); + updateHeaderTags(writeEngine, toVersion, schemaChangedFromPriorVersion); // 3a. Publish, run checks & validation, then announce new state consumers publish(listeners, toVersion, artifacts); @@ -520,6 +515,17 @@ public void removeListener(HollowProducerEventListener listener) { listeners.removeListener(listener); } + private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion, boolean schemaChangedFromPriorVersion) { + writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_HASH, new HollowSchemaHash(writeEngine.getSchemas()).getHash()); + if (schemaChangedFromPriorVersion) { + writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString()); + } else { + writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE); + } + writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion)); + writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED); + } + void populate( ProducerListeners listeners, HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator, diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java b/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java index 4b87fbb51f..33c6430a8b 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java @@ -88,7 +88,6 @@ public void onAnnouncementComplete(com.netflix.hollow.api.producer.Status status HollowReadStateEngine stateEngine = readState.getStateEngine(); dataSizeBytes = stateEngine.calcApproxDataSize(); - Map numShardsPerType = stateEngine.numShardsPerType(); Map shardSizePerType = stateEngine.calcApproxShardSizePerType(); diff --git a/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java b/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java index eed906495a..0f39affe80 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java @@ -89,6 +89,14 @@ public void testHeaderTagsWhenResharding() { ws.add(i); } + }); + assertEquals(4, producer.getWriteEngine().getTypeState("Integer").getNumShards()); + long v3 = producer.runCycle(ws -> { + // remain at 4 shards for Integer + for (int i=0;i<99;i++) { + ws.add(i); + } + }); assertEquals(4, producer.getWriteEngine().getTypeState("Integer").getNumShards()); HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) @@ -112,5 +120,10 @@ public int maxDeltasBeforeDoubleSnapshot() { assertEquals(v2, consumer.getCurrentVersionId()); assertEquals(4, consumer.getStateEngine().getTypeState("Integer").numShards()); assertEquals("Integer:(2,4)", consumer.getStateEngine().getHeaderTag(HEADER_TAG_TYPE_RESHARDING_INVOKED)); + + consumer.triggerRefreshTo(v3); + assertEquals(v3, consumer.getCurrentVersionId()); + assertEquals(4, consumer.getStateEngine().getTypeState("Integer").numShards()); + assertEquals(false, consumer.getStateEngine().getHeaderTags().containsKey(HEADER_TAG_TYPE_RESHARDING_INVOKED)); } }