-
Notifications
You must be signed in to change notification settings - Fork 0
/
ProducerExample.java
58 lines (45 loc) · 2.5 KB
/
ProducerExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package io.confluent.examples.clients.basicavro;
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
public class ProducerExample {
private static final String TOPIC = "transactions";
public static final int PRODUCE_MESSAGE_COUNT = 3;
@SuppressWarnings("InfiniteLoopStatement")
public static void main(final String[] args) {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
// See https://docs.confluent.io/current/schema-registry/avro.html#using-compatibility-types
props.put("avro.compatibility.level", AvroCompatibilityLevel.FULL_TRANSITIVE);
try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {
for (long i = 0; i < PRODUCE_MESSAGE_COUNT; i++) {
final String orderId = "id" + UUID.randomUUID();
final Payment payment = new Payment(orderId, new Random().nextInt(10000) / 100.00);
final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
producer.send(record);
Thread.sleep(100L);
}
producer.flush();
System.out.printf("Successfully produced %d messages to a topic called %s%n", PRODUCE_MESSAGE_COUNT, TOPIC);
} catch (final SerializationException e) {
e.printStackTrace();
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}