From eb899fa55571d48ae8e2138dece33b051f1b7745 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Mon, 25 Nov 2024 13:41:47 -0800 Subject: [PATCH] Add execution task statistics and fix junit test --- .../executor/ExecutionProposal.java | 24 ++++++++++++++--- .../executor/ExecutionTaskManager.java | 4 +++ .../executor/ExecutionTaskPlanner.java | 24 +++++++++++++++++ .../cruisecontrol/executor/Executor.java | 27 ++++++++++++++++++- .../cruisecontrol/executor/ExecutorTest.java | 1 + 5 files changed, 75 insertions(+), 5 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java index eeb13ba59d..966a3f414f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java @@ -44,6 +44,8 @@ public class ExecutionProposal { private final Set _replicasToRemove; // Replicas to move between disks are the replicas which are to be hosted by a different disk of the same broker. private final Map _replicasToMoveBetweenDisksByBroker; + private final Set _oldReplicasSet; + private final Set _newReplicasSet; /** * Construct an execution proposals. @@ -69,10 +71,10 @@ public ExecutionProposal(TopicPartition tp, validate(); // Populate replicas to add, to remove and to move across disk. - Set newBrokerList = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); - Set oldBrokerList = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); - _replicasToAdd = _newReplicas.stream().filter(r -> !oldBrokerList.contains(r.brokerId())).collect(Collectors.toSet()); - _replicasToRemove = _oldReplicas.stream().filter(r -> !newBrokerList.contains(r.brokerId())).collect(Collectors.toSet()); + _newReplicasSet = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); + _oldReplicasSet = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); + _replicasToAdd = _newReplicas.stream().filter(r -> !_oldReplicasSet.contains(r.brokerId())).collect(Collectors.toSet()); + _replicasToRemove = _oldReplicas.stream().filter(r -> !_newReplicasSet.contains(r.brokerId())).collect(Collectors.toSet()); _replicasToMoveBetweenDisksByBroker = new HashMap<>(); newReplicas.stream().filter(r -> !_replicasToAdd.contains(r) && !_oldReplicas.contains(r)) .forEach(r -> _replicasToMoveBetweenDisksByBroker.put(r.brokerId(), r)); @@ -177,6 +179,20 @@ public List oldReplicas() { return Collections.unmodifiableList(_oldReplicas); } + /** + * @return The broker ID set of the partitions before executing the proposal. + */ + public Set oldReplicasBrokerIdSet() { + return Collections.unmodifiableSet(_oldReplicasSet); + } + + /** + * @return The broker ID set of the partitions after executing the proposal. + */ + public Set newReplicasBrokerIdSet() { + return Collections.unmodifiableSet(_newReplicasSet); + } + /** * @return The new replica list fo the partition after executing the proposal. */ diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java index dda14e9a41..66c8dbc120 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java @@ -156,6 +156,10 @@ public synchronized void addExecutionProposals(Collection pro } } + Map getSotedBrokerIdToInterBrokerMoveTaskCountMap() { + return _executionTaskPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap(); + } + /** * Set the execution mode of the tasks to keep track of the ongoing execution mode via sensors. * diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 8b6f7137a0..d7c1202550 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.LinkedHashMap; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -544,4 +545,27 @@ private Comparator brokerComparator(StrategyOptions strategyOptions, Re : broker1 - broker2; }; } + + Map getSortedBrokerIdToInterBrokerMoveTaskCountMap() { + if (_interPartMoveTasksByBrokerId == null || _interPartMoveTasksByBrokerId.isEmpty()) { + return Collections.emptyMap(); + } + Map resultMap = _interPartMoveTasksByBrokerId.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().size() + )) + .entrySet() + .stream() + .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue())) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + // maintain the order of the sorted map. + LinkedHashMap::new + )); + return resultMap; + } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 56cbc85bd6..bdf8c82ffa 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -831,6 +831,22 @@ public synchronized void executeProposals(Collection proposal requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); + if (removedBrokers != null && !removedBrokers.isEmpty()) { + int count = 0; + int totalCount = 0; + for (ExecutionProposal proposal: proposals) { + Set oldBrokers = proposal.oldReplicasBrokerIdSet(); + Set newBrokers = proposal.newReplicasBrokerIdSet(); + if (!oldBrokers.equals(newBrokers)) { + // Only count the proposals that involve partition movement. + totalCount++; + if (oldBrokers.stream().anyMatch(removedBrokers::contains) || newBrokers.stream().anyMatch(removedBrokers::contains)) { + count++; + } + } + } + LOG.info("User task {}: {} of partition move proposals are related to removed brokers.", uuid, ((float) count) / totalCount); + } startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest); } catch (Exception e) { if (e instanceof OngoingExecutionException) { @@ -1382,7 +1398,10 @@ public void run() { if (_executionException != null) { LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage()); } else { - String status = userTaskInfo.state() == COMPLETED ? "succeeded" : userTaskInfo.state().toString(); + String status = "succeeded"; + if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) { + status = userTaskInfo.state().toString(); + } LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString); } // Clear completed execution. @@ -1613,6 +1632,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); + Map map = _executionTaskManager.getSotedBrokerIdToInterBrokerMoveTaskCountMap(); + LOG.info("User task {}: Broker Id to Execution Task Count Map: {}", _uuid, map); + if (!map.isEmpty()) { + LOG.info("User task {}: Degree of task count skew towards the largest single broker", _uuid, + map.entrySet().iterator().next().getValue() / (float) numTotalPartitionMovements); + } int partitionsToMove = numTotalPartitionMovements; // Exhaust all the pending partition movements. diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 5f5f3e3a40..85385d2523 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -746,6 +746,7 @@ private static UserTaskManager.UserTaskInfo getMockUserTaskInfo() { UserTaskManager.UserTaskInfo mockUserTaskInfo = EasyMock.mock(UserTaskManager.UserTaskInfo.class); // Run it any times to enable consecutive executions in tests. EasyMock.expect(mockUserTaskInfo.requestUrl()).andReturn("mock-request").anyTimes(); + expect(mockUserTaskInfo.state()).andReturn(UserTaskManager.TaskState.COMPLETED).anyTimes(); return mockUserTaskInfo; }