From 90829f3f633edf4ade8690b89fa757d391b2c208 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 22 Nov 2024 18:04:16 -0800 Subject: [PATCH] WIP --- .../executor/ExecutionTaskPlanner.java | 8 ++-- .../cruisecontrol/executor/Executor.java | 6 ++- .../async/runnable/RemoveBrokersRunnable.java | 7 ++- .../executor/ExecutionTaskPlannerTest.java | 45 +++++++++++++++++++ 4 files changed, 60 insertions(+), 6 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 32db67f0d..8b6f7137a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -388,7 +388,7 @@ public List getInterBrokerReplicaMovementTasks(Map proposalsForBroker = _interPartMoveTasksByBrokerId.get(brokerId); + SortedSet proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId)); LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker); for (ExecutionTask task : proposalsForBroker) { // Break if max cap reached @@ -434,8 +434,10 @@ public List getInterBrokerReplicaMovementTasks(Map proposals = new ArrayList<>(); + proposals.add(_rf4PartitionMovement0); + proposals.add(_rf4PartitionMovement1); + proposals.add(_rf4PartitionMovement2); + proposals.add(_rf4PartitionMovement3); + // Test PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy execution strategies. + // Create prioritizeOneAboveMinIsrMovementPlanner, chain after prioritizeMinIsr strategy + Properties prioritizeOneAboveMinIsrMovementProps = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties(); + prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG, + String.format("%s,%s", PrioritizeMinIsrWithOfflineReplicasStrategy.class.getName(), + PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy.class.getName())); + prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG, "false"); + ExecutionTaskPlanner prioritizeOneAboveMinIsrMovementPlanner + = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(prioritizeOneAboveMinIsrMovementProps)); + + Set partitions = new HashSet<>(); + partitions.add(generatePartitionInfo(_rf4PartitionMovement0, false)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement1, 2)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement2, 3)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement3, 1)); + + Cluster expectedCluster = new Cluster(null, _rf4ExpectedNodes, partitions, Collections.emptySet(), Collections.emptySet()); + // Setting topic min ISR to 2 + Map minIsrWithTimeByTopic + = Collections.singletonMap(TOPIC3, new MinIsrWithTime((short) 2, 0)); + StrategyOptions strategyOptions = new StrategyOptions.Builder(expectedCluster).minIsrWithTimeByTopic(minIsrWithTimeByTopic).build(); + + Map readyBrokers = new HashMap<>(); + readyBrokers.put(0, 5); + readyBrokers.put(1, 6); + readyBrokers.put(2, 6); + readyBrokers.put(3, 6); + readyBrokers.put(4, 5); + readyBrokers.put(5, 6); + prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); + List partitionMovementTasks + = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); + assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal()); + assertEquals("Second task", _rf4PartitionMovement1, partitionMovementTasks.get(1).proposal()); + assertEquals("Third task", _rf4PartitionMovement3, partitionMovementTasks.get(2).proposal()); + assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal()); + } + @Test public void testDynamicConfigReplicaMovementStrategy() { List proposals = new ArrayList<>();