Skip to content

Commit

Permalink
Update sarama library to support last kafka versions (#151)
Browse files Browse the repository at this point in the history
- Bump version of sarama client to support kafka up to v3.0
- Add dummy consumer and producer for integration test
- Add consumer offset integration test
  • Loading branch information
alvarocabanas authored Jan 12, 2022
1 parent cca5a80 commit 9c58ed0
Show file tree
Hide file tree
Showing 18 changed files with 1,352 additions and 66 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.18.0 (2022-01-12)
### Changed
- Upgraded Sarama library to 1.30.1 in order to support Kafka latest versions (up to 3.0)
- Added consumer-offset integration test

## 2.17.0 (2021-06-27)
### Changed
- Moved default config.sample to [V4](https://docs.newrelic.com/docs/create-integrations/infrastructure-integrations-sdk/specifications/host-integrations-newer-configuration-format/), added a dependency for infra-agent version 1.20.0
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/newrelic/nri-kafka
go 1.16

require (
github.com/Shopify/sarama v1.27.0
github.com/Shopify/sarama v1.30.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.2.1-0.20190716143035-b98ce2825f72 // indirect
github.com/eapache/queue v1.1.1-0.20180227141424-093482f3f8ce // indirect
Expand Down
71 changes: 38 additions & 33 deletions go.sum

Large diffs are not rendered by default.

22 changes: 3 additions & 19 deletions src/connection/connection.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//go:generate mockery --name=Client --name=SaramaBroker
//go:generate mockery --name=Client
//go:generate mockery --name=SaramaBroker

// Package connection implements connection code
package connection
Expand All @@ -23,24 +24,7 @@ import (
// Client is a wrapper around sarama.Client so that we can generate mocks
// See sarama.Client for documentation
type Client interface {
Config() *sarama.Config
Controller() (*sarama.Broker, error)
Brokers() []*sarama.Broker
Topics() ([]string, error)
Partitions(topic string) ([]int32, error)
WritablePartitions(topic string) ([]int32, error)
Leader(topic string, partitionID int32) (*sarama.Broker, error)
Replicas(topic string, partitionID int32) ([]int32, error)
InSyncReplicas(topic string, partitionID int32) ([]int32, error)
OfflineReplicas(topic string, partitionID int32) ([]int32, error)
RefreshMetadata(topics ...string) error
GetOffset(topic string, partitionID int32, time int64) (int64, error)
Coordinator(consumerGroup string) (*sarama.Broker, error)
RefreshCoordinator(consumerGroup string) error
RefreshController() (*sarama.Broker, error)
InitProducerID() (*sarama.InitProducerIDResponse, error)
Close() error
Closed() bool
sarama.Client
}

// SaramaBroker is an interface over sarama.Broker for mocking
Expand Down
57 changes: 48 additions & 9 deletions src/connection/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connection/mocks/SaramaBroker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions tests/integration/consumer-producer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM maven
COPY src src
COPY pom.xml pom.xml

RUN mvn clean compile package

WORKDIR /
ENTRYPOINT ["java","-Dcom.sun.management.jmxremote","-Djava.rmi.server.hostname=localhost","-Dcom.sun.management.jmxremote.port=1088","-Dcom.sun.management.jmxremote.rmi.port=1088","-Dcom.sun.management.jmxremote.local.only=false","-Dcom.sun.management.jmxremote.authenticate=false","-Dcom.sun.management.jmxremote.ssl=false","-jar","./target/kafka-dummy-1.0-jar-with-dependencies.jar"]
67 changes: 67 additions & 0 deletions tests/integration/consumer-producer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>1</groupId>
<artifactId>kafka-dummy</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
kafkaDummy.Factory
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
13 changes: 13 additions & 0 deletions tests/integration/consumer-producer/src/main/java/Factory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kafkaDummy;

public class Factory {
public static void main(String[] args) {
if (args[0].equals("producer")) {
SimpleProducer.main(args);
} else if (args[0].equals("consumer")) {
SimpleConsumer.main(args);
} else {
System.out.println("First argument should be consumer/producer" + args[0]);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package kafkaDummy;

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
public static void main(String[] args) {
if(args.length < 4){
System.out.println("Usage: consumer <bootstrapBroker> <topic> <groupname>");
return;
}

String bootstrapBroker = args[1].toString();
String topic = args[2].toString();
String group = args[3].toString();

Properties props = new Properties();
props.put("bootstrap.servers", bootstrapBroker);
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kafkaDummy;

import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
public static void main(String[] args) {
if(args.length < 3){
System.out.println("Usage: producer <bootstrapBroker> <topic>");
return;
}

String bootstrapBroker = args[1].toString();
String topicName = args[2].toString();

Properties props = new Properties();
props.put("bootstrap.servers", bootstrapBroker);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer
<String, String>(props);

int msg = 0;
while(true) {
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(msg), Integer.toString(msg)));
System.out.println("Message sent successfully");
msg++;
wait(2000);
}

// never reached
// producer.close();
}

public static void wait(int ms)
{
try
{
Thread.sleep(ms);
}
catch(InterruptedException ex)
{
Thread.currentThread().interrupt();
}
}
}
31 changes: 31 additions & 0 deletions tests/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,37 @@ services:
networks:
- kfk

start_3party_dependencies:
image: dadarek/wait-for-dependencies
environment:
- SLEEP_LENGTH=5
- TIMEOUT_LENGTH=120
networks:
- kfk
depends_on:
- kafka1
command: kafka1:9092

kafka_dummy_consumer:
container_name: kafka_dummy_consumer
build:
context: consumer-producer
command: ["consumer","kafka1:9092","topicA","groupA"]
networks:
- kfk
depends_on:
- start_3party_dependencies

kafka_dummy_producer:
container_name: kafka_dummy_producer
build:
context: consumer-producer
command: ["producer","kafka1:9092","topicA"]
networks:
- kfk
depends_on:
- start_3party_dependencies

nri-kafka:
container_name: integration_nri_kafka_1
build:
Expand Down
Loading

0 comments on commit 9c58ed0

Please sign in to comment.