diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 73675dec199..73a0cf1abcf 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include "flow/ActorCollection.h" @@ -969,6 +970,7 @@ ACTOR Future testerServerCore(TesterInterface interf, .detail("ConsistencyCheckerId", work.sharedRandomNumber) .detail("ClientId", work.clientId) .detail("ClientCount", work.clientCount); + work.reply.sendError(consistency_check_urgent_duplicate_request()); } else if (consistencyCheckerUrgentTester.second.isValid() && !consistencyCheckerUrgentTester.second.isReady()) { TraceEvent(SevWarnAlways, "ConsistencyCheckUrgent_TesterWorkloadConflict", interf.id()) @@ -976,13 +978,15 @@ ACTOR Future testerServerCore(TesterInterface interf, .detail("ArrivingConsistencyCheckerId", work.sharedRandomNumber) .detail("ClientId", work.clientId) .detail("ClientCount", work.clientCount); + work.reply.sendError(consistency_check_urgent_conflicting_request()); + } else { + consistencyCheckerUrgentTester = std::make_pair( + work.sharedRandomNumber, testerServerConsistencyCheckerUrgentWorkload(work, ccr, dbInfo)); + TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterWorkloadInitialized", interf.id()) + .detail("ConsistencyCheckerId", consistencyCheckerUrgentTester.first) + .detail("ClientId", work.clientId) + .detail("ClientCount", work.clientCount); } - consistencyCheckerUrgentTester = std::make_pair( - work.sharedRandomNumber, testerServerConsistencyCheckerUrgentWorkload(work, ccr, dbInfo)); - TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterWorkloadInitialized", interf.id()) - .detail("ConsistencyCheckerId", consistencyCheckerUrgentTester.first) - .detail("ClientId", work.clientId) - .detail("ClientCount", work.clientCount); } else { addWorkload.send(testerServerWorkload(work, ccr, dbInfo, locality)); } @@ -1737,7 +1741,13 @@ std::unordered_map> makeTaskAssignment(Database cx, std::vector shardsToCheck, int testersCount, int round) { + ASSERT(testersCount >= 1); std::unordered_map> assignment; + + std::vector shuffledIndices(testersCount); + std::iota(shuffledIndices.begin(), shuffledIndices.end(), 0); // creates [0, 1, ..., testersCount - 1] + deterministicRandom()->randomShuffle(shuffledIndices); + int batchSize = CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT; int startingPoint = 0; if (shardsToCheck.size() > batchSize * testersCount) { @@ -1752,7 +1762,17 @@ std::unordered_map> makeTaskAssignment(Database cx, if (testerIdx > testersCount - 1) { break; // Have filled up all testers } - assignment[testerIdx].push_back(shardsToCheck[i]); + // When assigning a shards/batch to a tester idx, there are certain edge cases which can result in urgent + // consistency checker being infinetely stuck in a loop. Examples: + // 1. if there is 1 remaining shard, and tester 0 consistently fails, we will still always pick tester 0 + // 2. if there are 10 remaining shards, and batch size is 10, and tester 0 consistently fails, we will + // still always pick tester 0 + // 3. if there are 20 remaining shards, and batch size is 10, and testers {0, 1} consistently fail, we will + // keep picking testers {0, 1} + // To avoid repeatedly picking the same testers even though they could be failing, shuffledIndices provides an + // indirection to a random tester idx. That way, each invocation of makeTaskAssignment won't + // result in the same task assignment for the class of edge cases mentioned above. + assignment[shuffledIndices[testerIdx]].push_back(shardsToCheck[i]); } std::unordered_map>::iterator assignIt; for (assignIt = assignment.begin(); assignIt != assignment.end(); assignIt++) { diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index 559f7f0ea6a..6b30fdb34a6 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -108,6 +108,8 @@ ERROR( duplicate_snapshot_request, 1083, "A duplicate snapshot request has been ERROR( dd_config_changed, 1084, "DataDistribution configuration changed." ) ERROR( consistency_check_urgent_task_failed, 1085, "Consistency check urgent task is failed") ERROR( data_move_conflict, 1086, "Data move conflict in SS") +ERROR( consistency_check_urgent_duplicate_request, 1087, "Consistency check urgent got a duplicate request") +ERROR( consistency_check_urgent_conflicting_request, 1088, "Consistency check urgent can process 1 workload at a time") ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )