Skip to content

Commit

Permalink
Merge multi-protocol PR
Browse files Browse the repository at this point in the history
  • Loading branch information
camdencheek committed Nov 14, 2019
2 parents 234c4d0 + 44b7c7f commit 6bfd29a
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 150 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.5.0 - 2019-11-14
### Added
- Support for attempting multiple connection protocols to broker

## 2.4.1 - 2019-11-12
### Added
- Rollup metrics for consumer offsets
Expand Down
77 changes: 43 additions & 34 deletions src/brokercollect/broker_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,66 +86,75 @@ func brokerWorker(brokerChan <-chan int, collectedTopics []string, wg *sync.Wait
}

// Create Broker
b, err := createBroker(brokerID, zkConn, i)
brokers, err := createBrokerConnectionVariants(brokerID, zkConn, i)
if err != nil {
continue
}

// Populate inventory for broker
if args.GlobalArgs.All() || args.GlobalArgs.Inventory {
log.Debug("Collecting inventory for broker %q", b.Entity.Metadata.Name)
if err := populateBrokerInventory(b); err != nil {
continue
for _, broker := range brokers {
// Populate inventory for broker
if args.GlobalArgs.All() || args.GlobalArgs.Inventory {
log.Debug("Collecting inventory for broker %s", broker.Entity.Metadata.Name)
if err := populateBrokerInventory(broker); err != nil {
continue
}
log.Debug("Done Collecting inventory for broker %s", broker.Entity.Metadata.Name)
}
log.Debug("Done collecting inventory for broker %q", b.Entity.Metadata.Name)
}

// Populate metrics for broker
if args.GlobalArgs.All() || args.GlobalArgs.Metrics {
log.Debug("Collecting metrics for broker %q", b.Entity.Metadata.Name)
if err := collectBrokerMetrics(b, collectedTopics); err != nil {
continue
// Populate metrics for broker
if args.GlobalArgs.All() || args.GlobalArgs.Metrics {
log.Debug("Collecting metrics for broker %s", broker.Entity.Metadata.Name)
if err := collectBrokerMetrics(broker, collectedTopics); err != nil {
continue
}
log.Debug("Done Collecting metrics for broker %s", broker.Entity.Metadata.Name)
}
log.Debug("Done Collecting metrics for broker %q", b.Entity.Metadata.Name)
break
}
}
}

// Creates and populates a broker struct with all the information needed to
// populate inventory and metrics.
func createBroker(brokerID int, zkConn zookeeper.Connection, i *integration.Integration) (*broker, error) {
// Creates and populates an array of different ways to connect to one broker.
func createBrokerConnectionVariants(brokerID int, zkConn zookeeper.Connection, i *integration.Integration) ([]*broker, error) {

// Collect broker connection information from ZooKeeper
_, host, jmxPort, kafkaPort, err := zookeeper.GetBrokerConnectionInfo(brokerID, zkConn)
brokerConnections, err := zookeeper.GetBrokerConnections(brokerID, zkConn)
if err != nil {
log.Error("Unable to get broker JMX information for broker id %d: %s", brokerID, err)
return nil, err
}

// Create broker entity
clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName)
brokerEntity, err := i.Entity(fmt.Sprintf("%s:%d", host, kafkaPort), "ka-broker", clusterIDAttr)
if err != nil {
log.Error("Unable to create entity for broker ID %d: %s", brokerID, err)
return nil, err
}

// Gather broker configuration from ZooKeeper
brokerConfig, err := getBrokerConfig(brokerID, zkConn)
if err != nil {
log.Error("Unable to get broker configuration information for broker id %d: %s", brokerID, err)
}

newBroker := &broker{
Host: host,
JMXPort: jmxPort,
KafkaPort: kafkaPort,
Entity: brokerEntity,
ID: brokerID,
Config: brokerConfig,
var brokers []*broker
for _, brokerConnection := range brokerConnections {
// Create broker entity
clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName)
brokerEntity, err := i.Entity(
fmt.Sprintf("%s:%d", brokerConnection.BrokerHost, brokerConnection.BrokerPort),
"ka-broker",
clusterIDAttr)

if err != nil {
log.Error("Unable to create entity for broker ID %d: %s", brokerID, err)
return nil, err
}

brokers = append(brokers, &broker{
Host: brokerConnection.BrokerHost,
JMXPort: brokerConnection.JmxPort,
KafkaPort: brokerConnection.BrokerPort,
Entity: brokerEntity,
ID: brokerID,
Config: brokerConfig,
})
}

return newBroker, nil
return brokers, nil
}

// For a given broker struct, populate the inventory of its entity with the information gathered
Expand Down
95 changes: 61 additions & 34 deletions src/brokercollect/broker_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

var (
brokerConnectionBytes = []byte(`{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafkabroker:9092"],"jmx_port":9999,"host":"kafkabroker","timestamp":"1530886155628","port":9092,"version":4}`)
brokerConnectionBytes = []byte(`{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT", "SSL":"SSL"},"endpoints":["PLAINTEXT://kafkabroker:9092", "SSL://kafkabroker:9093"],"jmx_port":9999,"host":"kafkabroker","timestamp":"1530886155628","port":9092,"version":4}`)
brokerConfigBytes = []byte(`{"version":1,"config":{"flush.messages":"12345"}}`)
brokerConfigBytes2 = []byte(`{"version":1,"config":{"leader.replication.throttled.replicas":"10000"}}`)
)
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestCreateBroker_ZKError(t *testing.T) {
zkConn.On("Get", "/brokers/ids/0").Return([]byte{}, new(zk.Stat), errors.New("this is a test error"))
i, _ := integration.New("kafka", "1.0.0")

_, err := createBroker(brokerID, zkConn, i)
_, err := createBrokerConnectionVariants(brokerID, zkConn, i)
if err == nil {
t.Error("Expected error")
}
Expand All @@ -89,32 +89,43 @@ func TestCreateBroker_Normal(t *testing.T) {
zkConn.On("Get", "/config/brokers/0").Return(brokerConfigBytes, new(zk.Stat), nil)
i, _ := integration.New("kafka", "1.0.0")

b, err := createBroker(brokerID, zkConn, i)
brokers, err := createBrokerConnectionVariants(brokerID, zkConn, i)
if err != nil {
t.Errorf("Unexpected error: %s", err.Error())
}

expectedBroker := &broker{
Host: "kafkabroker",
KafkaPort: 9092,
JMXPort: 9999,
ID: 0,
expectedBrokers := []broker{
{
Host: "kafkabroker",
KafkaPort: 9092,
JMXPort: 9999,
ID: 0,
},
{
Host: "kafkabroker",
KafkaPort: 9093,
JMXPort: 9999,
ID: 0,
},
}

if expectedBroker.Host != b.Host {
t.Errorf("Expected JMX host '%s' got '%s'", expectedBroker.Host, b.Host)
}
if expectedBroker.JMXPort != b.JMXPort {
t.Errorf("Expected JMX Port '%d' got '%d'", expectedBroker.JMXPort, b.JMXPort)
}
if expectedBroker.KafkaPort != b.KafkaPort {
t.Errorf("Expected Kafka Port '%d' got '%d'", expectedBroker.KafkaPort, b.KafkaPort)
}
if b.Entity.Metadata.Name != "kafkabroker:9092" {
t.Errorf("Expected entity name '%s' got '%s'", expectedBroker.Host, b.Entity.Metadata.Name)
}
if b.Entity.Metadata.Namespace != "ka-broker" {
t.Errorf("Expected entity name '%s' got '%s'", "ka-broker", b.Entity.Metadata.Namespace)
for i, broker := range brokers {
if expectedBrokers[i].Host != broker.Host {
t.Errorf("Expected JMX host '%s' got '%s'", expectedBrokers[i].Host, broker.Host)
}
if expectedBrokers[i].JMXPort != broker.JMXPort {
t.Errorf("Expected JMX Port '%d' got '%d'", expectedBrokers[i].JMXPort, broker.JMXPort)
}
if expectedBrokers[i].KafkaPort != broker.KafkaPort {
t.Errorf("Expected Kafka Port '%d' got '%d'", expectedBrokers[i].KafkaPort, broker.KafkaPort)
}
metadataName := fmt.Sprintf("%s:%d", expectedBrokers[i].Host, expectedBrokers[i].KafkaPort)
if broker.Entity.Metadata.Name != metadataName {
t.Errorf("Expected entity name '%s' got '%s'", metadataName, broker.Entity.Metadata.Name)
}
if broker.Entity.Metadata.Namespace != "ka-broker" {
t.Errorf("Expected entity name '%s' got '%s'", "ka-broker", broker.Entity.Metadata.Namespace)
}
}
}

Expand Down Expand Up @@ -225,23 +236,39 @@ func TestGetBrokerJMX(t *testing.T) {
brokerID := 0
zkConn := zookeeper.MockConnection{}
zkConn.On("Get", "/brokers/ids/0").Return(brokerConnectionBytes, new(zk.Stat), nil)
expectedBrokers := []zookeeper.BrokerConnection{
{
Scheme: "http",
BrokerHost: "kafkabroker",
BrokerPort: 9092,
JmxPort: 9999,
},
{
Scheme: "https",
BrokerHost: "kafkabroker",
BrokerPort: 9093,
JmxPort: 9999,
},
}

scheme, host, jmxPort, kafkaPort, err := zookeeper.GetBrokerConnectionInfo(brokerID, &zkConn)
brokerConnections, err := zookeeper.GetBrokerConnections(brokerID, &zkConn)
if err != nil {
t.Error(err)
}

if scheme != "http" {
t.Errorf("Expected scheme http, got %s", scheme)
}
if host != "kafkabroker" {
t.Errorf("Expected host kafkabroker, got %s", host)
}
if kafkaPort != 9092 {
t.Errorf("Expected kafka port 9092, got %d", kafkaPort)
}
if jmxPort != 9999 {
t.Errorf("Expected jmx port 9999, got %d", jmxPort)
for i, brokerConnection := range brokerConnections {
if brokerConnection.Scheme != expectedBrokers[i].Scheme {
t.Errorf("Expected scheme %s, got %s", expectedBrokers[i].Scheme, brokerConnection.Scheme)
}
if brokerConnection.BrokerHost != expectedBrokers[i].BrokerHost {
t.Errorf("Expected host %s, got %s", expectedBrokers[i].BrokerHost, brokerConnection.BrokerHost)
}
if brokerConnection.BrokerPort != expectedBrokers[i].BrokerPort {
t.Errorf("Expected kafka port %d, got %d", expectedBrokers[i].BrokerPort, brokerConnection.BrokerPort)
}
if brokerConnection.JmxPort != expectedBrokers[i].JmxPort {
t.Errorf("Expected jmx port %d, got %d", expectedBrokers[i].JmxPort, brokerConnection.JmxPort)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

const (
integrationName = "com.newrelic.kafka"
integrationVersion = "2.4.1"
integrationVersion = "2.5.0"
)

func main() {
Expand Down
16 changes: 10 additions & 6 deletions src/topiccollect/topic_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,21 @@ func calculateUnderReplicatedCount(partitions []*partition, sample *metric.Set)
func topicRespondsToMetadata(t *Topic, zkConn zookeeper.Connection) int {

// Get connection information for a broker
_, host, _, port, err := zookeeper.GetBrokerConnectionInfo(0, zkConn)
connections, err := zookeeper.GetBrokerConnections(0, zkConn)
if err != nil {
return 0
}

var broker *sarama.Broker

// Create a broker connection object and open the connection
broker := sarama.NewBroker(fmt.Sprintf("%s:%d", host, port))
config := sarama.NewConfig()
err = broker.Open(config)
if err != nil {
return 0
for _, connection := range connections {
broker = sarama.NewBroker(fmt.Sprintf("%s:%d", connection.BrokerHost, connection.BrokerPort))
config := sarama.NewConfig()
err = broker.Open(config)
if err != nil {
return 0
}
}

defer func() {
Expand Down
Loading

0 comments on commit 6bfd29a

Please sign in to comment.