Skip to content

Commit

Permalink
Update usages of Metadata to conform to kafka 3.7 interface
Browse files Browse the repository at this point in the history
  • Loading branch information
david-simon authored and akatona84 committed Nov 28, 2024
1 parent 472b1b0 commit 5a9a0b1
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ public synchronized ClusterAndGeneration refreshMetadata(long timeoutMs) {
}

private void doRefreshMetadata(long timeoutMs) {
int updateVersion = _metadata.requestUpdate();
int updateVersion = _metadata.requestUpdate(true);
long remaining = timeoutMs;
Cluster beforeUpdate = cluster();
boolean isMetadataUpdated = _metadata.updateVersion() > updateVersion;
while (!isMetadataUpdated && remaining > 0) {
_metadata.requestUpdate();
_metadata.requestUpdate(false);
long start = _time.milliseconds();
_networkClient.poll(remaining, start);
remaining -= (_time.milliseconds() - start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
Expand All @@ -53,6 +54,7 @@ public final class ExecutionUtils {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionUtils.class);
public static final int DEFAULT_RETRY_BACKOFF_BASE = 2;
public static final long METADATA_REFRESH_BACKOFF = 100L;
public static final long METADATA_REFRESH_BACKOFF_MAX = CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS;
public static final long METADATA_EXPIRY_MS = Long.MAX_VALUE;
public static final Duration MIN_ISR_CACHE_CLEANER_PERIOD = Duration.ofMinutes(10);
public static final Duration MIN_ISR_CACHE_CLEANER_INITIAL_DELAY = Duration.ofMinutes(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public Executor(KafkaCruiseControlConfig config,
_metadataClient = metadataClient != null ? metadataClient
: new MetadataClient(config,
new Metadata(ExecutionUtils.METADATA_REFRESH_BACKOFF,
ExecutionUtils.METADATA_REFRESH_BACKOFF_MAX,
ExecutionUtils.METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class LoadMonitor {
// Metadata TTL is set based on experience -- i.e. a short TTL with large metadata may cause excessive load on brokers.
private static final long METADATA_TTL = TimeUnit.SECONDS.toMillis(10);
private static final long METADATA_REFRESH_BACKOFF = TimeUnit.SECONDS.toMillis(5);
private static final long METADATA_REFRESH_BACKOFF_MAX = TimeUnit.SECONDS.toMillis(60);
public static final String KAFKA_ADMIN_CLIENT_OBJECT_CONFIG = "kafka.admin.client.object";
// The maximum time allowed to make a state update. If the state value cannot be updated in time it will be invalidated.
// TODO: Make this configurable.
Expand Down Expand Up @@ -125,6 +126,7 @@ public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dr
this(config,
new MetadataClient(config,
new Metadata(METADATA_REFRESH_BACKOFF,
METADATA_REFRESH_BACKOFF_MAX,
config.getLong(MonitorConfig.METADATA_MAX_AGE_MS_CONFIG),
new LogContext(),
new ClusterResourceListeners()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
Expand All @@ -30,6 +31,7 @@

public final class MonitorUnitTestUtils {
public static final long METADATA_REFRESH_BACKOFF = 10L;
public static final long METADATA_REFRESH_BACKOFF_MAX = CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS;
public static final long METADATA_EXPIRY_MS = 10L;
public static final Node NODE_0 = new Node(0, "localhost", 100, "rack0");
public static final Node NODE_1 = new Node(1, "localhost", 100, "rack1");
Expand Down Expand Up @@ -69,6 +71,7 @@ public static Metadata getMetadata(Collection<TopicPartition> partitions) {
}

Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
METADATA_REFRESH_BACKOFF_MAX,
METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;

import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_REFRESH_BACKOFF;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_REFRESH_BACKOFF_MAX;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_EXPIRY_MS;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.NODE_0;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.nodes;
Expand Down Expand Up @@ -62,6 +63,7 @@ public void testAssignment() {
}
Cluster cluster = new Cluster("cluster", Arrays.asList(nodes()), partitions, Collections.emptySet(), Collections.emptySet());
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
METADATA_REFRESH_BACKOFF_MAX,
METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_EXPIRY_MS;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_REFRESH_BACKOFF;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_REFRESH_BACKOFF_MAX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -106,6 +107,7 @@ public int clusterSize() {
public void testSimpleFetch() throws InterruptedException {
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getLoadMonitorProperties());
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
METADATA_REFRESH_BACKOFF_MAX,
METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners());
Expand Down Expand Up @@ -156,6 +158,7 @@ public void testSimpleFetch() throws InterruptedException {
public void testSamplingError() {
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getLoadMonitorProperties());
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
METADATA_REFRESH_BACKOFF_MAX,
METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners());
Expand Down

0 comments on commit 5a9a0b1

Please sign in to comment.