Skip to content

Commit

Permalink
Merge pull request #618 from Netflix/cycle-count-primary-status
Browse files Browse the repository at this point in the history
Add a field to track cycle count of a producer with primary status
  • Loading branch information
nayanika-u authored Jun 8, 2023
2 parents 937a71f + e1f3c5b commit 4011b2b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ abstract class AbstractHollowProducer {
long lastSuccessfulCycle = 0;
final HollowObjectHashCodeFinder hashCodeFinder;
final boolean doIntegrityCheck;
// Count to track number of cycles run by a primary producer. In the future, this can be useful in determining stickiness of a
// producer instance.
int cycleCountSincePrimaryStatus = 0;

boolean isInitialized;

Expand Down Expand Up @@ -317,6 +320,9 @@ public HollowObjectMapper getObjectMapper() {
* @return true if the intended action was successful
*/
public boolean enablePrimaryProducer(boolean doEnable) {
if (!singleProducerEnforcer.isPrimary()) {
cycleCountSincePrimaryStatus = 0;
}
if (doEnable) {
singleProducerEnforcer.enable();
} else {
Expand All @@ -332,6 +338,7 @@ long runCycle(HollowProducer.Incremental.IncrementalPopulator incrementalPopulat
// TODO: minimum time spacing between cycles
log.log(Level.INFO, "cycle not executed -- not primary (aka leader)");
localListeners.fireCycleSkipped(CycleListener.CycleSkipReason.NOT_PRIMARY_PRODUCER);
cycleCountSincePrimaryStatus = 0;
return lastSuccessfulCycle;
}

Expand Down Expand Up @@ -434,6 +441,7 @@ long runCycle(
throw new RuntimeException(th);
} finally {
artifacts.cleanup();
cycleCountSincePrimaryStatus ++;
}
return lastSuccessfulCycle;
}
Expand Down Expand Up @@ -952,4 +960,15 @@ public void cleanDeltas() {
}
}

/**
* This determines the latest number of cycles completed by a producer with a primary status. This value is 0 by default and
* increments by 1 only for a successful cycle and gets reset to 0 whenever a producer loses its primary status at
* any time during the cycle is being run.
*
* @return cycle count of a producer with primary status.
* */
public int getCycleCountWithPrimaryStatus() {
return this.cycleCountSincePrimaryStatus;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ public void testPopulateNoChangesVersion() {
long v1 = producer.runCycle(ws -> {
ws.add(1);
});

Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 1);
// Run cycle with no changes
long v2 = producer.runCycle(ws -> {
ws.add(1);
});

Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 2);
long v3 = producer.runCycle(ws -> {
ws.add(2);
});
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 3);

Assert.assertEquals(v1, v2);
Assert.assertTrue(v3 > v2);
Expand All @@ -135,7 +136,7 @@ public void testNotPrimaryProducerVersion() {
long v2 = producer.runCycle(ws -> {
ws.add(1);
});

Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 0);
// Run cycle as the primary producer
enforcer.enable();
long v3 = producer.runCycle(ws -> {
Expand All @@ -144,6 +145,7 @@ public void testNotPrimaryProducerVersion() {

Assert.assertEquals(v1, v2);
Assert.assertTrue(v3 > v2);
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 1);
}

@Test
Expand All @@ -167,6 +169,7 @@ public void testNonPrimaryCantPublish() {
} catch (IllegalStateException e) {
Assert.assertTrue(e instanceof HollowProducer.NotPrimaryMidCycleException);
Assert.assertEquals("Publish failed primary (aka leader) check", e.getMessage());
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 2); // counted as cycle ran for the producer with primary status but lost status mid cycle. Doesn't matter as the next cycle result in a no-op.
return;
}
Assert.fail();
Expand All @@ -187,6 +190,7 @@ public void testNonPrimaryProducerCantAnnounce() {
});
} catch (HollowProducer.NotPrimaryMidCycleException e) {
Assert.assertEquals("Announcement failed primary (aka leader) check", e.getMessage());
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 1); // counted as cycle ran for producer with primary status
return;
}
Assert.fail();
Expand All @@ -204,6 +208,7 @@ public void testRestoreFailure() {

Assert.assertNotNull(lastRestoreStatus);
Assert.assertEquals(Status.FAIL, lastRestoreStatus.getStatus());
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 0);
}

@Test
Expand All @@ -215,6 +220,7 @@ public void testPublishAndRestore() {
Assert.assertNotNull(lastRestoreStatus);
Assert.assertEquals(Status.SUCCESS, lastRestoreStatus.getStatus());
Assert.assertEquals("Version should be the same", version, lastRestoreStatus.getDesiredVersion());
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 1);
}

@Test
Expand Down Expand Up @@ -242,6 +248,7 @@ public void testMultipleRestores() throws Exception {
long version = testPublishV1(producer, size, valueMultiplier);
versions.add(version);
}
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 5);

System.out.println("\n\n------------ Restore and validate ------------\n");
for (int i = 0; i < versions.size(); i++) {
Expand Down Expand Up @@ -286,6 +293,7 @@ public void testPublishAndRestoreWithSchemaChanges() throws Exception {
{ // Publish V1
HollowProducer producer = createProducer(tmpFolder, schema);
v1 = testPublishV1(producer, sizeV1, valueMultiplierV1);
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 1);
}

// Publish V2;
Expand Down Expand Up @@ -317,6 +325,7 @@ public void testPublishAndRestoreWithSchemaChanges() throws Exception {
int valueFieldCount = 2;
restoreAndAssert(producerV2, v3, sizeV3, valueMultiplierV3, valueFieldCount);
}
Assert.assertEquals(producerV2.getCycleCountWithPrimaryStatus(), 2);
}

@Test
Expand All @@ -332,6 +341,7 @@ public void testRestoreToNonExact() {
lastRestoreStatus.getVersionReached());
Assert.assertEquals("Should have correct desired version", version + 1,
lastRestoreStatus.getDesiredVersion());
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 0); // no cycle run
}

@Test
Expand All @@ -347,6 +357,7 @@ public void testRollsBackStateEngineOnPublishFailure() throws Exception {
}
Assert.assertEquals("Should still have no populated ordinals", 0,
producer.getWriteEngine().getTypeState("TestPojo").getPopulatedBitSet().cardinality());
Assert.assertEquals(producer.getCycleCountWithPrimaryStatus(), 1); // counted as cycle ran for producer with primary status
}

private long testPublishV1(HollowProducer producer, final int size, final int valueMultiplier) {
Expand Down

0 comments on commit 4011b2b

Please sign in to comment.