Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature to request consumer group information #435

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2b6c745
Add feature to request consumer group information(start, end offsets,…
Ledostuff Jun 4, 2018
1295fc0
Remove unnecessary suppress
Ledostuff Jun 4, 2018
cf1ab6e
Remove unnecessary properties
Ledostuff Sep 14, 2018
aad391e
Fix typo
Ledostuff Sep 14, 2018
9d70d97
Move method with zkUtils from AdminClientWrapper
Ledostuff Sep 14, 2018
9bacad6
Minor changes in getting controller information
Ledostuff Sep 14, 2018
f300ed0
Fix conflict
Ledostuff Nov 19, 2018
09ce8ac
Fix conflict
Ledostuff Sep 23, 2019
dd65221
Fix compile
Ledostuff Sep 25, 2019
ad72c3b
Fix test-compile
Ledostuff Sep 25, 2019
5b42dd1
Close resources after using
Ledostuff Sep 25, 2019
9c26497
Reset some changes
Ledostuff Sep 25, 2019
9bd278a
Fix test
Ledostuff Sep 25, 2019
1cdb7d6
Fix test
Ledostuff Sep 25, 2019
427a4cb
Fix PR notes
Ledostuff Oct 3, 2019
ae6c7ef
Drop useless suppress
Ledostuff Oct 3, 2019
62fa884
Drop useless suppress
Ledostuff Oct 3, 2019
d74c405
Fix PR notes
Ledostuff Oct 9, 2019
71b074a
Fix build
Ledostuff Oct 9, 2019
5a6875b
Fix tests(after rename request parameter)
Ledostuff Oct 10, 2019
fa51d14
Fix style-check
Ledostuff Oct 10, 2019
e70888c
Fix test after restricting topic-partition subscription information b…
Ledostuff Oct 10, 2019
da41a47
Fix test after restricting topic-partition subscription information b…
Ledostuff Oct 10, 2019
eb24cac
Fix deprecated methods
Ledostuff Oct 10, 2019
67717ee
Revert import moves
Ledostuff Oct 25, 2019
acb1308
Fix notes(init)
Ledostuff Oct 25, 2019
badecdc
Fix notes part 1
Ledostuff Oct 28, 2019
5f3344a
Remove final class modifier
Ledostuff Oct 29, 2019
8fb0b57
Rename paging parameters
Ledostuff Nov 22, 2019
c1fdc82
Fix paging test
Ledostuff Nov 22, 2019
980df74
Fix addition notes
Ledostuff Dec 2, 2019
8aeab9d
Fix checkstyle
Ledostuff Dec 9, 2019
1b0fd6f
Fix test
Ledostuff Dec 9, 2019
016d0fe
Fix pr note
Ledostuff Jan 9, 2020
027a272
Fix style
Ledostuff Jan 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<suppress
checks="(ClassDataAbstractionCoupling)"
files="(AvroConverter|ConsumerManager|KafkaConsumerManager|KafkaRestApplication|SimpleConsumerManager|KafkaRestContextProvider|KafkaConsumerState).java"
files="(AdminClientWrapper|AvroConverter|ConsumerManager|KafkaConsumerManager|KafkaRestApplication|SimpleConsumerManager|KafkaRestContextProvider|KafkaConsumerState).java"
/>

<suppress
Expand Down
1 change: 0 additions & 1 deletion config/kafka-rest.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

#id=kafka-rest-test-server
#schema.registry.url=http://localhost:8081
#zookeeper.connect=localhost:2181
bootstrap.servers=PLAINTEXT://localhost:9092
#
# Configure interceptor classes for sending consumer and producer metrics to Confluent Control Center
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public class KafkaRestConfig extends RestConfig {
* <code>client.init.timeout.ms</code>
*/
public static final String KAFKACLIENT_INIT_TIMEOUT_CONFIG = "client.init.timeout.ms";
/**
* <code>client.request.timeout.ms</code>
*/
public static final String KAFKACLIENT_REQUEST_TIMEOUT_CONFIG = "client.request.timeout.ms";

public static final String ZOOKEEPER_SET_ACL_CONFIG = "zookeeper.set.acl";
public static final String KAFKACLIENT_SECURITY_PROTOCOL_CONFIG =
Expand Down Expand Up @@ -247,6 +251,9 @@ public class KafkaRestConfig extends RestConfig {
protected static final String KAFKACLIENT_INIT_TIMEOUT_DOC =
"The timeout for initialization of the Kafka store, including creation of the Kafka topic "
+ "that stores schema data.";
protected static final String KAFKACLIENT_REQUEST_TIMEOUT_DOC =
"The timeout for sending any admin-client request to Kafka cluster including waiting for"
+ " the response on client side.";
protected static final String KAFKACLIENT_TIMEOUT_DOC =
"The timeout for an operation on the Kafka store";
protected static final String
Expand Down Expand Up @@ -450,6 +457,14 @@ protected static ConfigDef baseKafkaRestConfigDef() {
Importance.MEDIUM,
KAFKACLIENT_INIT_TIMEOUT_DOC
)
.define(
KAFKACLIENT_REQUEST_TIMEOUT_CONFIG,
Type.INT,
60000,
Range.atLeast(0),
Importance.MEDIUM,
KAFKACLIENT_REQUEST_TIMEOUT_DOC
)
.define(
KAFKACLIENT_TIMEOUT_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.kafkarest.entities;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import org.hibernate.validator.constraints.NotEmpty;
Expand All @@ -23,17 +24,14 @@
import java.util.Objects;
import java.util.Properties;

import javax.validation.constraints.NotNull;

@JsonInclude(JsonInclude.Include.NON_NULL)
public class Topic {

@NotEmpty
private String name;

@NotNull
private Properties configs;

@NotEmpty
private List<Partition> partitions;

public Topic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
package io.confluent.kafkarest;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.Vector;
Expand All @@ -39,15 +43,18 @@

public class AdminClientWrapper {

private AdminClient adminClient;
private int initTimeOut;
private final AdminClient adminClient;
private final int initTimeOut;
private final int requestTimeOut;

public AdminClientWrapper(KafkaRestConfig kafkaRestConfig, AdminClient adminClient) {
this.adminClient = adminClient;
this.initTimeOut = kafkaRestConfig.getInt(KafkaRestConfig.KAFKACLIENT_INIT_TIMEOUT_CONFIG);
this.requestTimeOut = kafkaRestConfig.getInt(
KafkaRestConfig.KAFKACLIENT_REQUEST_TIMEOUT_CONFIG);
}

public static Properties adminProperties(KafkaRestConfig kafkaRestConfig) {
static Properties adminProperties(KafkaRestConfig kafkaRestConfig) {
Properties properties = new Properties();
properties.putAll(kafkaRestConfig.getAdminProperties());
properties.put(KafkaRestConfig.BOOTSTRAP_SERVERS_CONFIG,
Expand All @@ -67,36 +74,32 @@ public List<Integer> getBrokerIds() throws Exception {
}

public Collection<String> getTopicNames() throws Exception {
Collection<String> allTopics = null;
allTopics = new TreeSet<>(
return new TreeSet<>(
adminClient.listTopics().names().get(initTimeOut, TimeUnit.MILLISECONDS));
return allTopics;
}

public boolean topicExists(String topic) throws Exception {
Collection<String> allTopics = getTopicNames();
return allTopics.contains(topic);
return getTopicNames().contains(topic);
}

public Topic getTopic(String topicName) throws Exception {
Topic topic = null;
if (topicExists(topicName)) {
TopicDescription topicDescription = getTopicDescription(topicName);

topic = buildTopic(topicName, topicDescription);
return buildTopic(topicName, topicDescription);
} else {
return null;
}
return topic;
}

public List<Partition> getTopicPartitions(String topicName) throws Exception {
TopicDescription topicDescription = getTopicDescription(topicName);
List<Partition> partitions = buildPartitonsData(topicDescription.partitions(), null);
return partitions;
return buildPartitionsData(topicDescription.partitions(), null);
}

public Partition getTopicPartition(String topicName, int partition) throws Exception {
TopicDescription topicDescription = getTopicDescription(topicName);
List<Partition> partitions = buildPartitonsData(topicDescription.partitions(), partition);
List<Partition> partitions = buildPartitionsData(topicDescription.partitions(), partition);
if (partitions.isEmpty()) {
return null;
}
Expand All @@ -108,22 +111,33 @@ public boolean partitionExists(String topicName, int partition) throws Exception
return (partition >= 0 && partition < topic.getPartitions().size());
}

public Collection<ConsumerGroupListing> listConsumerGroups() throws Exception {
return adminClient.listConsumerGroups(new ListConsumerGroupsOptions()
.timeoutMs(requestTimeOut)).all().get(requestTimeOut, TimeUnit.MILLISECONDS);
}

public Map<String, ConsumerGroupDescription> describeConsumerGroups(
Collection<String> groupIds) throws Exception {
return adminClient.describeConsumerGroups(groupIds,
new DescribeConsumerGroupsOptions().timeoutMs(requestTimeOut))
.all().get(requestTimeOut, TimeUnit.MILLISECONDS);
}

private Topic buildTopic(String topicName, TopicDescription topicDescription) throws Exception {
List<Partition> partitions = buildPartitonsData(topicDescription.partitions(), null);
List<Partition> partitions = buildPartitionsData(topicDescription.partitions(), null);

ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Config config = adminClient.describeConfigs(
Collections.unmodifiableList(Arrays.asList(topicResource))
Collections.unmodifiableList(Collections.singletonList(topicResource))
).values().get(topicResource).get();
Properties topicProps = new Properties();
for (ConfigEntry configEntry : config.entries()) {
topicProps.put(configEntry.name(), configEntry.value());
}
Topic topic = new Topic(topicName, topicProps, partitions);
return topic;
return new Topic(topicName, topicProps, partitions);
}

private List<Partition> buildPartitonsData(
private List<Partition> buildPartitionsData(
List<TopicPartitionInfo> partitions,
Integer partitionsFilter
) {
Expand Down Expand Up @@ -153,7 +167,7 @@ private List<Partition> buildPartitonsData(
}

private TopicDescription getTopicDescription(String topicName) throws Exception {
return adminClient.describeTopics(Collections.unmodifiableList(Arrays.asList(topicName)))
return adminClient.describeTopics(Collections.singletonList(topicName))
.values().get(topicName).get(initTimeOut, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@ public class DefaultKafkaRestContext implements KafkaRestContext {
private ProducerPool producerPool;
private KafkaConsumerManager kafkaConsumerManager;
private AdminClientWrapper adminClientWrapper;
private GroupMetadataObserver groupMetadataObserver;


public DefaultKafkaRestContext(
KafkaRestConfig config,
ProducerPool producerPool,
KafkaConsumerManager kafkaConsumerManager,
AdminClientWrapper adminClientWrapper,
GroupMetadataObserver groupMetadataObserver,
ScalaConsumersContext scalaConsumersContext
) {

this.config = config;
this.producerPool = producerPool;
this.kafkaConsumerManager = kafkaConsumerManager;
this.adminClientWrapper = adminClientWrapper;
this.groupMetadataObserver = groupMetadataObserver;
this.scalaConsumersContext = scalaConsumersContext;
}

Expand All @@ -54,7 +57,7 @@ public KafkaRestConfig getConfig() {
}

@Override
public ProducerPool getProducerPool() {
public synchronized ProducerPool getProducerPool() {
if (producerPool == null) {
producerPool = new ProducerPool(config);
}
Expand All @@ -77,22 +80,30 @@ public SimpleConsumerManager getSimpleConsumerManager() {
}

@Override
public KafkaConsumerManager getKafkaConsumerManager() {
public synchronized KafkaConsumerManager getKafkaConsumerManager() {
if (kafkaConsumerManager == null) {
kafkaConsumerManager = new KafkaConsumerManager(config);
}
return kafkaConsumerManager;
}

@Override
public AdminClientWrapper getAdminClientWrapper() {
public synchronized AdminClientWrapper getAdminClientWrapper() {
if (adminClientWrapper == null) {
adminClientWrapper = new AdminClientWrapper(config,
AdminClient.create(AdminClientWrapper.adminProperties(config)));
}
return adminClientWrapper;
}

@Override
public synchronized GroupMetadataObserver getGroupMetadataObserver() {
if (groupMetadataObserver == null) {
groupMetadataObserver = new GroupMetadataObserver(config, getAdminClientWrapper());
}
return groupMetadataObserver;
}

@Override
public void shutdown() {
if (kafkaConsumerManager != null) {
Expand Down
Loading