From 4d499462605ceeb4fc311deea6d3467124cbe578 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 23 Apr 2015 21:21:48 +0800 Subject: [PATCH] Add complete producer and consumer examples --- .gitignore | 3 + README.md | 58 +++++ consumer/pom.xml | 65 +++++ .../consumer/ConsumerGroupExample.java | 121 ++++++++++ .../examples/consumer/ConsumerLogic.java | 53 +++++ .../consumer/SimpleConsumerExample.java | 223 ++++++++++++++++++ producer/pom.xml | 65 +++++ .../examples/producer/ProducerExample.java | 76 ++++++ 8 files changed, 664 insertions(+) create mode 100644 .gitignore create mode 100644 consumer/pom.xml create mode 100644 consumer/src/main/java/io/confluent/examples/consumer/ConsumerGroupExample.java create mode 100644 consumer/src/main/java/io/confluent/examples/consumer/ConsumerLogic.java create mode 100644 consumer/src/main/java/io/confluent/examples/consumer/SimpleConsumerExample.java create mode 100644 producer/pom.xml create mode 100644 producer/src/main/java/io/confluent/examples/producer/ProducerExample.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..d9e32598 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +target +.idea/ +*.iml diff --git a/README.md b/README.md index e69de29b..91dafda0 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,58 @@ +Examples +======== + +This repository includes projects demonstrating how to use the Java Kafka producer +and consumer. You can find detailed explanation of the code at the +[application development section](http://confluent.io/docs/current/app-development.html) +of the Confluent Platform documentation. + +To build the producer project + + $ cd producer + $ mvn clean package + +To build the consumer project + + $ cd consumer + $ mvn clean package + +Quickstart +---------- + +Before running the examples, make sure that Zookeeper, Kafka and Schema Registry are +running. In what follows, we assume that Zookeeper, Kafka and Schema Registry are +started with the default settings. + + # Start Zookeeper + $ bin/zookeeper-server-start config/zookeeper.properties + + # Start Kafka + $ bin/kafka-server-start config/server.properties + + # Start Schema Registry + $ bin/schema-registry-start config/schema-registry.properties + +Then create a topic called page_visits: + + # Create page_visits topic + $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ + --partitions 1 --topic page_visits + +Next, cd to the `examples` directory: + + $ cd examples + +First cd to the `producer` directory, then run the example producer to publish 10 records. + + # Run the producer + $ cd producer + $ mvn exec:java -Dexec.mainClass="io.confluent.examples.producer.ProducerExample" \ + -Dexec.args="10 http://localhost:8081" + +Then cd to the `consumer` directory, and run the consumer group example to consume +the records we just published to the cluster and display in the console. + + # Run the consumer + $ cd ../consumer + $ mvn exec:java -Dexec.mainClass="io.confluent.examples.consumer.ConsumerGroupExample" \ + -Dexec.args="localhost:2181 group page_visits 1 http://localhost:8081" diff --git a/consumer/pom.xml b/consumer/pom.xml new file mode 100644 index 00000000..ac0ef8ad --- /dev/null +++ b/consumer/pom.xml @@ -0,0 +1,65 @@ + + + 4.0.0 + + io.confluent + consumer-example + jar + 2.0-SNAPSHOT + + + + confluent + http://packages.confluent.io/maven/ + + + + + 0.8.2.1 + 2.10 + 1.0 + UTF-8 + + + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + org.apache.kafka + kafka_${kafka.scala.version} + ${kafka.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.5.1 + true + + 1.6 + 1.6 + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + java + + + + + + + diff --git a/consumer/src/main/java/io/confluent/examples/consumer/ConsumerGroupExample.java b/consumer/src/main/java/io/confluent/examples/consumer/ConsumerGroupExample.java new file mode 100644 index 00000000..b260c2ec --- /dev/null +++ b/consumer/src/main/java/io/confluent/examples/consumer/ConsumerGroupExample.java @@ -0,0 +1,121 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.confluent.examples.consumer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import io.confluent.kafka.serializers.KafkaAvroDecoder; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.utils.VerifiableProperties; + +public class ConsumerGroupExample { + private final ConsumerConnector consumer; + private final String topic; + private ExecutorService executor; + private String zookeeper; + private String groupId; + private String url; + + public ConsumerGroupExample(String zookeeper, String groupId, String topic, String url) { + consumer = kafka.consumer.Consumer.createJavaConsumerConnector( + new ConsumerConfig(createConsumerConfig(zookeeper, groupId, url))); + this.topic = topic; + this.zookeeper = zookeeper; + this.groupId = groupId; + this.url = url; + } + + private Properties createConsumerConfig(String zookeeper, String groupId, String url) { + Properties props = new Properties(); + props.put("zookeeper.connect", zookeeper); + props.put("group.id", groupId); + props.put("auto.commit.enable", "false"); + props.put("auto.offset.reset", "smallest"); + props.put("schema.registry.url", url); + + return props; + } + + public void run(int numThreads) { + Map topicCountMap = new HashMap(); + topicCountMap.put(topic, numThreads); + + Properties props = createConsumerConfig(zookeeper, groupId, url); + VerifiableProperties vProps = new VerifiableProperties(props); + + // Create decoders for key and value + KafkaAvroDecoder avroDecoder = new KafkaAvroDecoder(vProps); + + Map>> consumerMap = + consumer.createMessageStreams(topicCountMap, avroDecoder, avroDecoder); + List> streams = consumerMap.get(topic); + + // Launch all the threads + executor = Executors.newFixedThreadPool(numThreads); + + // Create ConsumerLogic objects and bind them to threads + int threadNumber = 0; + for (final KafkaStream stream : streams) { + executor.submit(new ConsumerLogic(stream, threadNumber)); + threadNumber++; + } + } + + public void shutdown() { + if (consumer != null) consumer.shutdown(); + if (executor != null) executor.shutdown(); + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + System.out.println( + "Timed out waiting for consumer threads to shut down, exiting uncleanly"); + } + } catch (InterruptedException e) { + System.out.println("Interrupted during shutdown, exiting uncleanly"); + } + } + + public static void main(String[] args) { + if (args.length != 5) { + System.out.println("Please provide command line arguments: " + + "zookeeper groupId topic threads schemaRegistryUrl"); + System.exit(-1); + } + + String zooKeeper = args[0]; + String groupId = args[1]; + String topic = args[2]; + int threads = Integer.parseInt(args[3]); + String url = args[4]; + + ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic, url); + example.run(threads); + + try { + Thread.sleep(10000); + } catch (InterruptedException ie) { + + } + example.shutdown(); + } +} diff --git a/consumer/src/main/java/io/confluent/examples/consumer/ConsumerLogic.java b/consumer/src/main/java/io/confluent/examples/consumer/ConsumerLogic.java new file mode 100644 index 00000000..88a05d39 --- /dev/null +++ b/consumer/src/main/java/io/confluent/examples/consumer/ConsumerLogic.java @@ -0,0 +1,53 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.confluent.examples.consumer; + +import org.apache.avro.generic.GenericRecord; + +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.message.MessageAndMetadata; + +public class ConsumerLogic implements Runnable { + private KafkaStream stream; + private int threadNumber; + + public ConsumerLogic(KafkaStream stream, int threadNumber) { + this.threadNumber = threadNumber; + this.stream = stream; + } + + public void run() { + ConsumerIterator it = stream.iterator(); + + while (it.hasNext()) { + MessageAndMetadata record = it.next(); + + String topic = record.topic(); + int partition = record.partition(); + long offset = record.offset(); + Object key = record.key(); + GenericRecord message = (GenericRecord) record.message(); + System.out.println("Thread " + threadNumber + + " received: " + "Topic " + topic + + " Partition " + partition + + " Offset " + offset + + " Key " + key + + " Message " + message.toString()); + } + System.out.println("Shutting down Thread: " + threadNumber); + } +} diff --git a/consumer/src/main/java/io/confluent/examples/consumer/SimpleConsumerExample.java b/consumer/src/main/java/io/confluent/examples/consumer/SimpleConsumerExample.java new file mode 100644 index 00000000..7964289b --- /dev/null +++ b/consumer/src/main/java/io/confluent/examples/consumer/SimpleConsumerExample.java @@ -0,0 +1,223 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.confluent.examples.consumer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.TopicMetadataRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; + +public class SimpleConsumerExample { + + private List replicaBrokers = new ArrayList(); + + public SimpleConsumerExample() { + replicaBrokers = new ArrayList(); + } + + // Find the Lead Broker for a Topic Partition + private PartitionMetadata findLeader(List seedBrokers, int port, String topic, + int partition) { + for (String seed : seedBrokers) { + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup"); + List topics = Collections.singletonList(topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + + List metaData = resp.topicsMetadata(); + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == partition) { + replicaBrokers.clear(); + for (kafka.cluster.Broker replica : part.replicas()) { + replicaBrokers.add(replica.host()); + } + return part; + } + } + } + } catch (Exception e) { + System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + + topic + ", " + partition + "] Reason: " + e); + } finally { + if (consumer != null) consumer.close(); + } + } + return null; + } + + // Determine Starting Offset + public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, + long whichTime, String clientName) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map requestInfo = + new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + System.out.println("Error fetching data Offset Data the Broker. Reason: " + + response.errorCode(topic, partition) ); + return 0; + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } + + private String findNewLeader(String oldLeader, String topic, int partition, int port) + throws Exception { + for (int i = 0; i < 3; i++) { + PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition); + // first time through if the leader hasn't changed give ZooKeeper a second to recover + // second time, assume the broker did recover before fail over, or it was a non-Broker issue + if (!(metadata == null || metadata.leader() == null || + (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0))) { + return metadata.leader().host(); + } + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + + } + } + throw new Exception("Unable to find new leader after Broker failure. Exiting"); + } + + public void run(long maxReads, String topic, int partition, List seedBrokers, + int port) throws Exception { + // find the metadata on the interested topic partition + PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition); + if (metadata == null) { + System.out.println("Can't find metadata for Topic and Partition. Exiting"); + return; + } + if (metadata.leader() == null) { + System.out.println("Can't find Leader for Topic and Partition. Exiting"); + return; + } + String leadBroker = metadata.leader().host(); + String clientName = "Client_" + topic + "_" + partition; + + SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); + long readOffset = getLastOffset(consumer,topic, partition, + kafka.api.OffsetRequest.EarliestTime(), clientName); + + int numErrors = 0; + while (maxReads > 0) { + if (consumer == null) { + consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); + } + // Note: this fetchSize of 100000 might need to be increased if large batches are + // written to Kafka + int fetchSize = 100000; + FetchRequest req = new FetchRequestBuilder() + .clientId(clientName) + .addFetch(topic, partition, readOffset, fetchSize) + .build(); + FetchResponse fetchResponse = consumer.fetch(req); + + // Identify and Recover from Leader Changes + if (fetchResponse.hasError()) { + numErrors++; + // Something went wrong! + short code = fetchResponse.errorCode(topic, partition); + System.out.println("Error fetching data from the Broker:" + leadBroker + + " Reason: " + code); + if (numErrors > 5) break; + if (code == ErrorMapping.OffsetOutOfRangeCode()) { + // We asked for an invalid offset. For simple case ask for the last element to reset + readOffset = getLastOffset(consumer,topic, partition, + kafka.api.OffsetRequest.LatestTime(), clientName); + continue; + } + consumer.close(); + consumer = null; + leadBroker = findNewLeader(leadBroker, topic, partition, port); + continue; + } + numErrors = 0; + + // Read the data + long numRead = 0; + for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { + long currentOffset = messageAndOffset.offset(); + if (currentOffset < readOffset) { + System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); + continue; + } + readOffset = messageAndOffset.nextOffset(); + ByteBuffer payload = messageAndOffset.message().payload(); + + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + + new String(bytes, "UTF-8")); + numRead++; + maxReads--; + } + + if (numRead == 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + } + } + if (consumer != null) consumer.close(); + } + + public static void main(String args[]) { + if (args.length != 4) { + System.out.println("Please provide command line arguments: maxReads partitionId broker port"); + System.exit(-1); + } + + long maxReads = Long.parseLong(args[0]); + String topic = args[1]; + int partition = Integer.parseInt(args[2]); + List seeds = new ArrayList(); + seeds.add(args[3]); + int port = Integer.parseInt(args[4]); + + SimpleConsumerExample example = new SimpleConsumerExample(); + + try { + example.run(maxReads, topic, partition, seeds, port); + } catch (Exception e) { + System.out.println("Oops:" + e); + e.printStackTrace(); + } + } +} diff --git a/producer/pom.xml b/producer/pom.xml new file mode 100644 index 00000000..404c03f3 --- /dev/null +++ b/producer/pom.xml @@ -0,0 +1,65 @@ + + + 4.0.0 + + io.confluent + producer-example + jar + 2.0-SNAPSHOT + + + + confluent + http://packages.confluent.io/maven/ + + + + + 0.8.2.1 + 2.10 + 1.0 + UTF-8 + + + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.5.1 + true + + 1.6 + 1.6 + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + java + + + + + + + diff --git a/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java b/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java new file mode 100644 index 00000000..38f89097 --- /dev/null +++ b/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java @@ -0,0 +1,76 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.confluent.examples.producer; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Date; +import java.util.Properties; +import java.util.Random; + +public class ProducerExample { + public static void main(String[] args){ + if (args.length != 2) { + System.out.println("Please provide command line arguments: numEvents schemaRegistryUrl"); + System.exit(-1); + } + long events = Long.parseLong(args[0]); + String url = args[1]; + + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("acks", "all"); + props.put("retries", 0); + props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + props.put("schema.registry.url", url); + + String schemaString = "{\"namespace\": \"example.avro\", \"type\": \"record\", " + + "\"name\": \"page_visit\"," + + "\"fields\": [" + + "{\"name\": \"time\", \"type\": \"long\"}," + + "{\"name\": \"site\", \"type\": \"string\"}," + + "{\"name\": \"ip\", \"type\": \"string\"}" + + "]}"; + Producer producer = new KafkaProducer(props); + + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(schemaString); + + Random rnd = new Random(); + for (long nEvents = 0; nEvents < events; nEvents++) { + long runtime = new Date().getTime(); + String site = "www.example.com"; + String ip = "192.168.2." + rnd.nextInt(255); + + GenericRecord page_visit = new GenericData.Record(schema); + page_visit.put("time", runtime); + page_visit.put("site", site); + page_visit.put("ip", ip); + + ProducerRecord data = new ProducerRecord( + "page_visits", ip, page_visit); + producer.send(data); + } + + producer.close(); + } +}