Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
allenxwang committed Nov 23, 2024
1 parent 6f399b6 commit 90829f3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
continue;
}
// Check the available balancing proposals of this broker to see if we can find one ready to execute.
SortedSet<ExecutionTask> proposalsForBroker = _interPartMoveTasksByBrokerId.get(brokerId);
SortedSet<ExecutionTask> proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId));
LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker);
for (ExecutionTask task : proposalsForBroker) {
// Break if max cap reached
Expand Down Expand Up @@ -434,8 +434,10 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
newTaskAdded = true;
numInProgressPartitions++;
LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}", task, brokerId, readyBrokers);
// We can stop the check for proposals for this broker because we have found a proposal.
break;
if (_preferRoundRobin) {
// We can stop the check for proposals for this broker because we have found a proposal.
break;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static com.linkedin.kafka.cruisecontrol.executor.ExecutorAdminUtils.*;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils.UNIT_INTERVAL_TO_PERCENTAGE;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler.SamplingMode.*;
import static com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager.TaskState.COMPLETED;
import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;

Expand Down Expand Up @@ -1362,8 +1363,8 @@ private class ProposalExecutionRunnable implements Runnable {
public void run() {
LOG.info("User task {}: Starting executing balancing proposals.", _uuid);
final long start = System.currentTimeMillis();
UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
try {
UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
execute(userTaskInfo);
} catch (Exception e) {
LOG.error("User task {}: ProposalExecutionRunnable got exception during run", _uuid, e);
Expand All @@ -1381,7 +1382,8 @@ public void run() {
if (_executionException != null) {
LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage());
} else {
LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString);
String status = userTaskInfo.state() == COMPLETED ? "succeeded" : userTaskInfo.state().toString();
LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString);
}
// Clear completed execution.
clearCompletedExecution();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl,

@Override
protected OptimizationResult getResult() throws Exception {
return new OptimizationResult(computeResult(), _kafkaCruiseControl.config());
try {
return new OptimizationResult(computeResult(), _kafkaCruiseControl.config());
} catch (Exception e) {
LOG.error("User task {}: failed to remove brokers due to {}", _uuid, e);
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,51 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() {
assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal());
}

@Test
public void testGetInterBrokerPartitionMovementWithMinIsrNoRoundRobinTasks() {
List<ExecutionProposal> proposals = new ArrayList<>();
proposals.add(_rf4PartitionMovement0);
proposals.add(_rf4PartitionMovement1);
proposals.add(_rf4PartitionMovement2);
proposals.add(_rf4PartitionMovement3);
// Test PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy execution strategies.
// Create prioritizeOneAboveMinIsrMovementPlanner, chain after prioritizeMinIsr strategy
Properties prioritizeOneAboveMinIsrMovementProps = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG,
String.format("%s,%s", PrioritizeMinIsrWithOfflineReplicasStrategy.class.getName(),
PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy.class.getName()));
prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG, "false");
ExecutionTaskPlanner prioritizeOneAboveMinIsrMovementPlanner
= new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(prioritizeOneAboveMinIsrMovementProps));

Set<PartitionInfo> partitions = new HashSet<>();
partitions.add(generatePartitionInfo(_rf4PartitionMovement0, false));
partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement1, 2));
partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement2, 3));
partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement3, 1));

Cluster expectedCluster = new Cluster(null, _rf4ExpectedNodes, partitions, Collections.emptySet(), Collections.emptySet());
// Setting topic min ISR to 2
Map<String, MinIsrWithTime> minIsrWithTimeByTopic
= Collections.singletonMap(TOPIC3, new MinIsrWithTime((short) 2, 0));
StrategyOptions strategyOptions = new StrategyOptions.Builder(expectedCluster).minIsrWithTimeByTopic(minIsrWithTimeByTopic).build();

Map<Integer, Integer> readyBrokers = new HashMap<>();
readyBrokers.put(0, 5);
readyBrokers.put(1, 6);
readyBrokers.put(2, 6);
readyBrokers.put(3, 6);
readyBrokers.put(4, 5);
readyBrokers.put(5, 6);
prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
List<ExecutionTask> partitionMovementTasks
= prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _rf4PartitionMovement1, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _rf4PartitionMovement3, partitionMovementTasks.get(2).proposal());
assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal());
}

@Test
public void testDynamicConfigReplicaMovementStrategy() {
List<ExecutionProposal> proposals = new ArrayList<>();
Expand Down

0 comments on commit 90829f3

Please sign in to comment.