Skip to content

Commit

Permalink
Add detection of invalid recovery tasks on recovery nodes (#764)
Browse files Browse the repository at this point in the history
* Add detection of invalid recovery tasks on recovery nodes

* PR feedback

---------

Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Feb 7, 2024
1 parent f58714f commit 42aa91c
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 7 deletions.
37 changes: 30 additions & 7 deletions kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,30 @@ private void recoveryNodeListener(RecoveryNodeMetadata recoveryNodeMetadata) {
* fails. To break this cycle add a enqueue_count value to recovery task so we can stop recovering
* it if the task fails a certain number of times.
*/
private void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetadata) {
protected void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetadata) {
try {
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.RECOVERING);
RecoveryTaskMetadata recoveryTaskMetadata =
recoveryTaskMetadataStore.getSync(recoveryNodeMetadata.recoveryTaskName);

boolean success = handleRecoveryTask(recoveryTaskMetadata);
if (success) {
// delete the completed recovery task on success
if (!isValidRecoveryTask(recoveryTaskMetadata)) {
LOG.error(
"Invalid recovery task detected, skipping and deleting invalid task {}",
recoveryTaskMetadata);
recoveryTaskMetadataStore.deleteSync(recoveryTaskMetadata.name);
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeAssignmentSuccess.increment();
} else {
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeAssignmentFailed.increment();
} else {
boolean success = handleRecoveryTask(recoveryTaskMetadata);
if (success) {
// delete the completed recovery task on success
recoveryTaskMetadataStore.deleteSync(recoveryTaskMetadata.name);
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeAssignmentSuccess.increment();
} else {
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeAssignmentFailed.increment();
}
}
} catch (Exception e) {
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
Expand All @@ -234,6 +243,20 @@ private void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetad
}
}

/**
* Attempts a final sanity-check on the recovery task to prevent a bad task from halting the
* recovery pipeline. Bad state should be ideally prevented at the creation, as well as prior to
* assignment, but this can be considered a final fail-safe if invalid recovery tasks somehow made
* it this far.
*/
private boolean isValidRecoveryTask(RecoveryTaskMetadata recoveryTaskMetadata) {
// todo - consider adding further invalid recovery task detections
if (recoveryTaskMetadata.endOffset <= recoveryTaskMetadata.startOffset) {
return false;
}
return true;
}

/**
* This method does the recovery work from a recovery task. A recovery task indicates the start
* and end offset of a kafka partition to index. To do the recovery work, we create a recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ public long determineStartingOffset(
} else if (indexerConfig.getCreateRecoveryTasksOnStart()
&& indexerConfig.getReadFromLocationOnStart()
== KaldbConfigs.KafkaOffsetLocation.LATEST) {
// Todo - this appears to be able to create recovery tasks that have a start and end
// position of 0, which is invalid. This seems to occur when new clusters are initialized,
// and is especially problematic when indexers are created but never get assigned (ie,
// deploy 5, only assign 3).
LOG.info(
"CreateRecoveryTasksOnStart is set and ReadLocationOnStart is set to current. Reading from current and"
+ " spinning up recovery tasks");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,61 @@ public void testValidateOffsetsWhenRecoveryTaskOverlapsWithEndOfKafkaRange() {
assertThat(offsets.endOffset).isEqualTo(kafkaEndOffset);
}

@Test
public void shouldHandleInvalidRecoveryTasks() throws Exception {
KaldbConfigs.KaldbConfig kaldbCfg = makeKaldbConfig(TEST_S3_BUCKET);
curatorFramework =
CuratorBuilder.build(meterRegistry, kaldbCfg.getMetadataStoreConfig().getZookeeperConfig());

// Start recovery service
recoveryService = new RecoveryService(kaldbCfg, curatorFramework, meterRegistry, blobFs);
recoveryService.startAsync();
recoveryService.awaitRunning(DEFAULT_START_STOP_DURATION);

// Create a recovery task
RecoveryTaskMetadataStore recoveryTaskMetadataStore =
new RecoveryTaskMetadataStore(curatorFramework, false);
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()).isZero();
RecoveryTaskMetadata recoveryTask =
new RecoveryTaskMetadata("testRecoveryTask", "0", 0, 0, Instant.now().toEpochMilli());
recoveryTaskMetadataStore.createSync(recoveryTask);
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size())
.isEqualTo(1);
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).get(0))
.isEqualTo(recoveryTask);

// Assign the recovery task to node.
RecoveryNodeMetadataStore recoveryNodeMetadataStore =
new RecoveryNodeMetadataStore(curatorFramework, false);
List<RecoveryNodeMetadata> recoveryNodes =
KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore);
assertThat(recoveryNodes.size()).isEqualTo(1);
RecoveryNodeMetadata recoveryNodeMetadata = recoveryNodes.get(0);
assertThat(recoveryNodeMetadata.recoveryNodeState)
.isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeMetadataStore.updateSync(
new RecoveryNodeMetadata(
recoveryNodeMetadata.getName(),
Metadata.RecoveryNodeMetadata.RecoveryNodeState.ASSIGNED,
recoveryTask.getName(),
Instant.now().toEpochMilli()));
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size())
.isEqualTo(1);

await().until(() -> getCount(RECOVERY_NODE_ASSIGNMENT_FAILED, meterRegistry) == 1);
assertThat(getCount(RECOVERY_NODE_ASSIGNMENT_SUCCESS, meterRegistry)).isZero();
assertThat(getCount(RECOVERY_NODE_ASSIGNMENT_RECEIVED, meterRegistry)).isEqualTo(1);

// Post recovery checks
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore).size())
.isEqualTo(1);
assertThat(
KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore)
.get(0)
.recoveryNodeState)
.isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
}

// returns startOffset or endOffset based on the supplied OffsetSpec
private static AdminClient getAdminClient(long startOffset, long endOffset) {
AdminClient adminClient = mock(AdminClient.class);
Expand Down

0 comments on commit 42aa91c

Please sign in to comment.