Skip to content

Commit

Permalink
Fix compile
Browse files Browse the repository at this point in the history
Remove unnecessary endpoints
  • Loading branch information
Ledostuff committed Sep 25, 2019
1 parent fe7913f commit 21627e9
Show file tree
Hide file tree
Showing 25 changed files with 95 additions and 546 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<!-- TODO after add offset monitoring -->
<suppress
checks="(ClassDataAbstractionCoupling)"
files="(AdminClientWrapper|MetadataObserver|GroupMetadataObserver).java"
files="(AdminClientWrapper|GroupMetadataObserver).java"
/>

<suppress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@

package io.confluent.kafkarest;

import io.confluent.kafkarest.entities.EndPointEntity;
import io.confluent.kafkarest.entities.BrokerEntity;
import kafka.cluster.EndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
Expand All @@ -40,7 +36,6 @@
import kafka.utils.ZkUtils;
import scala.Option;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Map;
import scala.collection.Seq;
import scala.math.Ordering;
Expand Down Expand Up @@ -83,29 +78,6 @@ public boolean topicExists(String topicName) {
return false;
}

/**
* <p>Get broker metadata</p>
*
* @param brokerId - broker ID
* @return metadata about broker by ID or null if broker not found
*/
public BrokerEntity getBroker(final int brokerId) {
try {
Broker broker = getBrokerById(brokerId);
List<EndPointEntity> endpoints = new ArrayList<>();
for (EndPoint endPoint :
JavaConverters.seqAsJavaListConverter(broker.endPoints()).asJava()) {
endpoints.add(new EndPointEntity(endPoint.securityProtocol().name,
endPoint.host(),
endPoint.port()));
}
return new BrokerEntity(broker.id(), endpoints);
} catch (RestNotFoundException e) {
log.warn("Broker with id = {} not found. Return empty broker information.", brokerId);
return BrokerEntity.empty();
}
}

private Collection<String> getTopicNames() {
Seq<String> topicNames = zkUtils.getAllTopics().sorted(Ordering.String$.MODULE$);
return JavaConversions.asJavaCollection(topicNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,53 +33,28 @@
import java.util.Vector;
import java.util.concurrent.TimeUnit;

import io.confluent.kafkarest.entities.NodeState;
import io.confluent.kafkarest.entities.Partition;
import io.confluent.kafkarest.entities.PartitionReplica;
import io.confluent.kafkarest.entities.Topic;

public class AdminClientWrapper {

private AdminClient adminClient;
private int initTimeOut;
private final AdminClient adminClient;
private final int initTimeOut;

public AdminClientWrapper(KafkaRestConfig kafkaRestConfig, AdminClient adminClient) {
this.adminClient = adminClient;
this.initTimeOut = kafkaRestConfig.getInt(KafkaRestConfig.KAFKACLIENT_INIT_TIMEOUT_CONFIG);
}

public static Properties adminProperties(KafkaRestConfig kafkaRestConfig) {
static Properties adminProperties(KafkaRestConfig kafkaRestConfig) {
Properties properties = new Properties();
properties.putAll(kafkaRestConfig.getAdminProperties());
properties.put(KafkaRestConfig.BOOTSTRAP_SERVERS_CONFIG,
RestConfigUtils.bootstrapBrokers(kafkaRestConfig));
return properties;
}

/**
* <p>Check if broker is available</p>
*
* @param brokerId - broker ID for check
* @return true if brokerInfo by ID not null
*/
public NodeState getBrokerState(Integer brokerId) {
DescribeClusterResult clusterResults = adminClient.describeCluster();
try {
Collection<Node> nodeCollection =
clusterResults.nodes().get(initTimeOut, TimeUnit.MILLISECONDS);
for (Node eachNode: nodeCollection) {
if (brokerId != null && brokerId.equals(eachNode.id())) {
return new NodeState(true);
}
}
return new NodeState(false);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RestServerErrorException(Errors.KAFKA_ERROR_MESSAGE,
Errors.KAFKA_ERROR_ERROR_CODE, e
);
}
}

public List<Integer> getBrokerIds() throws Exception {
List<Integer> brokerIds = new Vector<>();
DescribeClusterResult clusterResults = adminClient.describeCluster();
Expand All @@ -92,31 +67,27 @@ public List<Integer> getBrokerIds() throws Exception {
}

public Collection<String> getTopicNames() throws Exception {
Collection<String> allTopics = null;
allTopics = new TreeSet<>(
return new TreeSet<>(
adminClient.listTopics().names().get(initTimeOut, TimeUnit.MILLISECONDS));
return allTopics;
}

public boolean topicExists(String topic) throws Exception {
Collection<String> allTopics = getTopicNames();
return allTopics.contains(topic);
return getTopicNames().contains(topic);
}

public Topic getTopic(String topicName) throws Exception {
Topic topic = null;
if (topicExists(topicName)) {
TopicDescription topicDescription = getTopicDescription(topicName);

topic = buildTopic(topicName, topicDescription);
return buildTopic(topicName, topicDescription);
} else {
return null;
}
return topic;
}

public List<Partition> getTopicPartitions(String topicName) throws Exception {
TopicDescription topicDescription = getTopicDescription(topicName);
List<Partition> partitions = buildPartitionsData(topicDescription.partitions(), null);
return partitions;
return buildPartitionsData(topicDescription.partitions(), null);
}

public Partition getTopicPartition(String topicName, int partition) throws Exception {
Expand All @@ -138,14 +109,13 @@ private Topic buildTopic(String topicName, TopicDescription topicDescription) th

ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Config config = adminClient.describeConfigs(
Collections.unmodifiableList(Arrays.asList(topicResource))
Collections.unmodifiableList(Collections.singletonList(topicResource))
).values().get(topicResource).get();
Properties topicProps = new Properties();
for (ConfigEntry configEntry : config.entries()) {
topicProps.put(configEntry.name(), configEntry.value());
}
Topic topic = new Topic(topicName, topicProps, partitions);
return topic;
return new Topic(topicName, topicProps, partitions);
}

private List<Partition> buildPartitionsData(
Expand Down Expand Up @@ -178,7 +148,7 @@ private List<Partition> buildPartitionsData(
}

private TopicDescription getTopicDescription(String topicName) throws Exception {
return adminClient.describeTopics(Collections.unmodifiableList(Arrays.asList(topicName)))
return adminClient.describeTopics(Collections.unmodifiableList(Collections.singletonList(topicName)))
.values().get(topicName).get(initTimeOut, TimeUnit.MILLISECONDS);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class DefaultKafkaRestContext implements KafkaRestContext {
private ProducerPool producerPool;
private KafkaConsumerManager kafkaConsumerManager;
private AdminClientWrapper adminClientWrapper;
private final ClusterInformationObserver clusterInformationObserver;
private final GroupMetadataObserver groupMetadataObserver;


Expand All @@ -39,7 +38,6 @@ public DefaultKafkaRestContext(
ProducerPool producerPool,
KafkaConsumerManager kafkaConsumerManager,
AdminClientWrapper adminClientWrapper,
ClusterInformationObserver clusterInformationObserver,
GroupMetadataObserver groupMetadataObserver,
ScalaConsumersContext scalaConsumersContext
) {
Expand All @@ -48,7 +46,6 @@ public DefaultKafkaRestContext(
this.producerPool = producerPool;
this.kafkaConsumerManager = kafkaConsumerManager;
this.adminClientWrapper = adminClientWrapper;
this.clusterInformationObserver = clusterInformationObserver;
this.groupMetadataObserver = groupMetadataObserver;
this.scalaConsumersContext = scalaConsumersContext;
}
Expand Down Expand Up @@ -99,11 +96,6 @@ public AdminClientWrapper getAdminClientWrapper() {
return adminClientWrapper;
}

@Override
public ClusterInformationObserver getClusterInformationObserver() {
return clusterInformationObserver;
}

@Override
public GroupMetadataObserver getGroupMetadataObserver() {
return groupMetadataObserver;
Expand Down
Loading

0 comments on commit 21627e9

Please sign in to comment.