From bad37db21681896d0f8276080d33c2e870c7c64b Mon Sep 17 00:00:00 2001 From: Anton Dalgren Date: Mon, 28 Oct 2019 13:21:59 +0100 Subject: [PATCH 1/3] Possible to utilize multiple connection types to kafka broker --- src/brokercollect/broker_collection.go | 75 +++++++------ src/brokercollect/broker_collection_test.go | 95 +++++++++++------ src/topiccollect/topic_collection.go | 16 +-- src/zookeeper/connection.go | 95 +++++++++++------ src/zookeeper/connection_test.go | 112 +++++++++++++------- 5 files changed, 246 insertions(+), 147 deletions(-) diff --git a/src/brokercollect/broker_collection.go b/src/brokercollect/broker_collection.go index 8ed13df7..242d811b 100644 --- a/src/brokercollect/broker_collection.go +++ b/src/brokercollect/broker_collection.go @@ -86,66 +86,75 @@ func brokerWorker(brokerChan <-chan int, collectedTopics []string, wg *sync.Wait } // Create Broker - b, err := createBroker(brokerID, zkConn, i) + brokers, err := createBrokers(brokerID, zkConn, i) if err != nil { continue } - // Populate inventory for broker - if args.GlobalArgs.All() || args.GlobalArgs.Inventory { - log.Debug("Collecting inventory for broker %q", b.Entity.Metadata.Name) - if err := populateBrokerInventory(b); err != nil { - continue + for _, broker := range brokers { + // Populate inventory for broker + if args.GlobalArgs.All() || args.GlobalArgs.Inventory { + log.Debug("Collecting inventory for broker %s", broker.Entity.Metadata.Name) + if err := populateBrokerInventory(broker); err != nil { + continue + } + log.Debug("Done Collecting inventory for broker %s", broker.Entity.Metadata.Name) } - log.Debug("Done collecting inventory for broker %q", b.Entity.Metadata.Name) - } - // Populate metrics for broker - if args.GlobalArgs.All() || args.GlobalArgs.Metrics { - log.Debug("Collecting metrics for broker %q", b.Entity.Metadata.Name) - if err := collectBrokerMetrics(b, collectedTopics); err != nil { - continue + // Populate metrics for broker + if args.GlobalArgs.All() || args.GlobalArgs.Metrics { + log.Debug("Collecting metrics for broker %s", broker.Entity.Metadata.Name) + if err := collectBrokerMetrics(broker, collectedTopics); err != nil { + continue + } + log.Debug("Done Collecting metrics for broker %s", broker.Entity.Metadata.Name) } - log.Debug("Done Collecting metrics for broker %q", b.Entity.Metadata.Name) } } } -// Creates and populates a broker struct with all the information needed to +// Creates and populates an array of broker structs with all the information needed to // populate inventory and metrics. -func createBroker(brokerID int, zkConn zookeeper.Connection, i *integration.Integration) (*broker, error) { +func createBrokers(brokerID int, zkConn zookeeper.Connection, i *integration.Integration) ([]*broker, error) { // Collect broker connection information from ZooKeeper - _, host, jmxPort, kafkaPort, err := zookeeper.GetBrokerConnectionInfo(brokerID, zkConn) + brokerConnections, err := zookeeper.GetBrokerConnectionInfo(brokerID, zkConn) if err != nil { log.Error("Unable to get broker JMX information for broker id %d: %s", brokerID, err) return nil, err } - // Create broker entity - clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName) - brokerEntity, err := i.Entity(fmt.Sprintf("%s:%d", host, kafkaPort), "ka-broker", clusterIDAttr) - if err != nil { - log.Error("Unable to create entity for broker ID %d: %s", brokerID, err) - return nil, err - } - // Gather broker configuration from ZooKeeper brokerConfig, err := getBrokerConfig(brokerID, zkConn) if err != nil { log.Error("Unable to get broker configuration information for broker id %d: %s", brokerID, err) } - newBroker := &broker{ - Host: host, - JMXPort: jmxPort, - KafkaPort: kafkaPort, - Entity: brokerEntity, - ID: brokerID, - Config: brokerConfig, + var brokers []*broker + for _, brokerConnection := range brokerConnections { + // Create broker entity + clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName) + brokerEntity, err := i.Entity( + fmt.Sprintf("%s:%d", brokerConnection.BrokerHost, brokerConnection.BrokerPort), + "ka-broker", + clusterIDAttr) + + if err != nil { + log.Error("Unable to create entity for broker ID %d: %s", brokerID, err) + return nil, err + } + + brokers = append(brokers, &broker{ + Host: brokerConnection.BrokerHost, + JMXPort: brokerConnection.JmxPort, + KafkaPort: brokerConnection.BrokerPort, + Entity: brokerEntity, + ID: brokerID, + Config: brokerConfig, + }) } - return newBroker, nil + return brokers, nil } // For a given broker struct, populate the inventory of its entity with the information gathered diff --git a/src/brokercollect/broker_collection_test.go b/src/brokercollect/broker_collection_test.go index df61fc7a..354b2b9b 100644 --- a/src/brokercollect/broker_collection_test.go +++ b/src/brokercollect/broker_collection_test.go @@ -21,7 +21,7 @@ import ( ) var ( - brokerConnectionBytes = []byte(`{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafkabroker:9092"],"jmx_port":9999,"host":"kafkabroker","timestamp":"1530886155628","port":9092,"version":4}`) + brokerConnectionBytes = []byte(`{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT", "SSL":"SLL"},"endpoints":["PLAINTEXT://kafkabroker:9092", "SSL://kafkabroker:9093"],"jmx_port":9999,"host":"kafkabroker","timestamp":"1530886155628","port":9092,"version":4}`) brokerConfigBytes = []byte(`{"version":1,"config":{"flush.messages":"12345"}}`) brokerConfigBytes2 = []byte(`{"version":1,"config":{"leader.replication.throttled.replicas":"10000"}}`) ) @@ -77,7 +77,7 @@ func TestCreateBroker_ZKError(t *testing.T) { zkConn.On("Get", "/brokers/ids/0").Return([]byte{}, new(zk.Stat), errors.New("this is a test error")) i, _ := integration.New("kafka", "1.0.0") - _, err := createBroker(brokerID, zkConn, i) + _, err := createBrokers(brokerID, zkConn, i) if err == nil { t.Error("Expected error") } @@ -89,32 +89,43 @@ func TestCreateBroker_Normal(t *testing.T) { zkConn.On("Get", "/config/brokers/0").Return(brokerConfigBytes, new(zk.Stat), nil) i, _ := integration.New("kafka", "1.0.0") - b, err := createBroker(brokerID, zkConn, i) + brokers, err := createBrokers(brokerID, zkConn, i) if err != nil { t.Errorf("Unexpected error: %s", err.Error()) } - expectedBroker := &broker{ - Host: "kafkabroker", - KafkaPort: 9092, - JMXPort: 9999, - ID: 0, + expectedBrokers := []broker{ + { + Host: "kafkabroker", + KafkaPort: 9092, + JMXPort: 9999, + ID: 0, + }, + { + Host: "kafkabroker", + KafkaPort: 9093, + JMXPort: 9999, + ID: 0, + }, } - if expectedBroker.Host != b.Host { - t.Errorf("Expected JMX host '%s' got '%s'", expectedBroker.Host, b.Host) - } - if expectedBroker.JMXPort != b.JMXPort { - t.Errorf("Expected JMX Port '%d' got '%d'", expectedBroker.JMXPort, b.JMXPort) - } - if expectedBroker.KafkaPort != b.KafkaPort { - t.Errorf("Expected Kafka Port '%d' got '%d'", expectedBroker.KafkaPort, b.KafkaPort) - } - if b.Entity.Metadata.Name != "kafkabroker:9092" { - t.Errorf("Expected entity name '%s' got '%s'", expectedBroker.Host, b.Entity.Metadata.Name) - } - if b.Entity.Metadata.Namespace != "ka-broker" { - t.Errorf("Expected entity name '%s' got '%s'", "ka-broker", b.Entity.Metadata.Namespace) + for i, broker := range brokers { + if expectedBrokers[i].Host != broker.Host { + t.Errorf("Expected JMX host '%s' got '%s'", expectedBrokers[i].Host, broker.Host) + } + if expectedBrokers[i].JMXPort != broker.JMXPort { + t.Errorf("Expected JMX Port '%d' got '%d'", expectedBrokers[i].JMXPort, broker.JMXPort) + } + if expectedBrokers[i].KafkaPort != broker.KafkaPort { + t.Errorf("Expected Kafka Port '%d' got '%d'", expectedBrokers[i].KafkaPort, broker.KafkaPort) + } + metadataName := fmt.Sprintf("%s:%d", expectedBrokers[i].Host, expectedBrokers[i].KafkaPort) + if broker.Entity.Metadata.Name != metadataName { + t.Errorf("Expected entity name '%s' got '%s'", metadataName, broker.Entity.Metadata.Name) + } + if broker.Entity.Metadata.Namespace != "ka-broker" { + t.Errorf("Expected entity name '%s' got '%s'", "ka-broker", broker.Entity.Metadata.Namespace) + } } } @@ -225,23 +236,39 @@ func TestGetBrokerJMX(t *testing.T) { brokerID := 0 zkConn := zookeeper.MockConnection{} zkConn.On("Get", "/brokers/ids/0").Return(brokerConnectionBytes, new(zk.Stat), nil) + expectedBrokers := []zookeeper.BrokerConnection{ + { + Scheme: "http", + BrokerHost: "kafkabroker", + BrokerPort: 9092, + JmxPort: 9999, + }, + { + Scheme: "https", + BrokerHost: "kafkabroker", + BrokerPort: 9093, + JmxPort: 9999, + }, + } - scheme, host, jmxPort, kafkaPort, err := zookeeper.GetBrokerConnectionInfo(brokerID, &zkConn) + brokerConnections, err := zookeeper.GetBrokerConnectionInfo(brokerID, &zkConn) if err != nil { t.Error(err) } - if scheme != "http" { - t.Errorf("Expected scheme http, got %s", scheme) - } - if host != "kafkabroker" { - t.Errorf("Expected host kafkabroker, got %s", host) - } - if kafkaPort != 9092 { - t.Errorf("Expected kafka port 9092, got %d", kafkaPort) - } - if jmxPort != 9999 { - t.Errorf("Expected jmx port 9999, got %d", jmxPort) + for i, brokerConnection := range brokerConnections { + if brokerConnection.Scheme != expectedBrokers[i].Scheme { + t.Errorf("Expected scheme %s, got %s", expectedBrokers[i].Scheme, brokerConnection.Scheme) + } + if brokerConnection.BrokerHost != expectedBrokers[i].BrokerHost { + t.Errorf("Expected host %s, got %s", expectedBrokers[i].BrokerHost, brokerConnection.BrokerHost) + } + if brokerConnection.BrokerPort != expectedBrokers[i].BrokerPort { + t.Errorf("Expected kafka port %d, got %d", expectedBrokers[i].BrokerPort, brokerConnection.BrokerPort) + } + if brokerConnection.JmxPort != expectedBrokers[i].JmxPort { + t.Errorf("Expected jmx port %d, got %d", expectedBrokers[i].JmxPort, brokerConnection.JmxPort) + } } } diff --git a/src/topiccollect/topic_collection.go b/src/topiccollect/topic_collection.go index 6737bf64..de8b686b 100644 --- a/src/topiccollect/topic_collection.go +++ b/src/topiccollect/topic_collection.go @@ -218,17 +218,21 @@ func calculateUnderReplicatedCount(partitions []*partition, sample *metric.Set) func topicRespondsToMetadata(t *Topic, zkConn zookeeper.Connection) int { // Get connection information for a broker - _, host, _, port, err := zookeeper.GetBrokerConnectionInfo(0, zkConn) + connections, err := zookeeper.GetBrokerConnectionInfo(0, zkConn) if err != nil { return 0 } + var broker *sarama.Broker + // Create a broker connection object and open the connection - broker := sarama.NewBroker(fmt.Sprintf("%s:%d", host, port)) - config := sarama.NewConfig() - err = broker.Open(config) - if err != nil { - return 0 + for _, connection := range connections { + broker = sarama.NewBroker(fmt.Sprintf("%s:%d", connection.BrokerHost, connection.BrokerPort)) + config := sarama.NewConfig() + err = broker.Open(config) + if err != nil { + return 0 + } } defer func() { diff --git a/src/zookeeper/connection.go b/src/zookeeper/connection.go index 944119f1..d906fd7d 100644 --- a/src/zookeeper/connection.go +++ b/src/zookeeper/connection.go @@ -26,6 +26,14 @@ type Connection interface { CreateClusterAdmin() (sarama.ClusterAdmin, error) } +// BrokerConnection struct to allow for multiple connection setups. +type BrokerConnection struct { + Scheme string + BrokerHost string + JmxPort int + BrokerPort int +} + type zookeeperConnection struct { inner *zk.Conn } @@ -44,8 +52,7 @@ func (z zookeeperConnection) CreateClient() (connection.Client, error) { return nil, err } - brokers := make([]string, 0, len(brokerIDs)) - isTLS := false + connections := make(map[string][]string, 0) for _, brokerID := range brokerIDs { // convert to int id intID, err := strconv.Atoi(brokerID) @@ -55,25 +62,31 @@ func (z zookeeperConnection) CreateClient() (connection.Client, error) { } // get broker connection info - scheme, host, _, port, err := GetBrokerConnectionInfo(intID, z) + brokerConnections, err := GetBrokerConnectionInfo(intID, z) if err != nil { log.Warn("Unable to get connection information for broker with ID '%d'. Will not collect offset data for consumer groups on this broker: %s", intID, err) continue } - if !isTLS && scheme == "https" { - isTLS = true + for _, brokerConnection := range brokerConnections { + connections[brokerConnection.Scheme] = append(connections[brokerConnection.Scheme], + fmt.Sprintf("%s:%d", brokerConnection.BrokerHost, brokerConnection.BrokerPort)) } - - brokers = append(brokers, fmt.Sprintf("%s:%d", host, port)) } - c, err := sarama.NewClient(brokers, createConfig(isTLS)) + var client sarama.Client + for scheme, connection := range connections { + client, err = sarama.NewClient(connection, createConfig(scheme == "https")) + if err != nil { + continue + } else { // make sure that we break when we have a working connection. + break + } + } if err != nil { return nil, err } - - return connection.SaramaClient{c}, nil + return connection.SaramaClient{client}, nil } func (z zookeeperConnection) CreateClusterAdmin() (sarama.ClusterAdmin, error) { @@ -82,8 +95,7 @@ func (z zookeeperConnection) CreateClusterAdmin() (sarama.ClusterAdmin, error) { return nil, err } - brokers := make([]string, 0, len(brokerIDs)) - isTLS := false + connections := make(map[string][]string, 0) for _, brokerID := range brokerIDs { // convert to int id intID, err := strconv.Atoi(brokerID) @@ -93,25 +105,32 @@ func (z zookeeperConnection) CreateClusterAdmin() (sarama.ClusterAdmin, error) { } // get broker connection info - scheme, host, _, port, err := GetBrokerConnectionInfo(intID, z) + brokerConnections, err := GetBrokerConnectionInfo(intID, z) if err != nil { log.Warn("Unable to get connection information for broker with ID '%d'. Will not collect offset data for consumer groups on this broker: %s", intID, err) continue } - if !isTLS && scheme == "https" { - isTLS = true + for _, brokerConnection := range brokerConnections { + connections[brokerConnection.Scheme] = append(connections[brokerConnection.Scheme], + fmt.Sprintf("%s:%d", brokerConnection.BrokerHost, brokerConnection.BrokerPort)) } - - brokers = append(brokers, fmt.Sprintf("%s:%d", host, port)) } - c, err := sarama.NewClusterAdmin(brokers, createConfig(isTLS)) + var client sarama.ClusterAdmin + for scheme, connection := range connections { + client, err = sarama.NewClusterAdmin(connection, createConfig(scheme == "https")) + if err != nil { + continue + } else { // make sure that we break when we have a working connection. + break + } + } if err != nil { return nil, err } - return c, nil + return client, nil } func createConfig(isTLS bool) *sarama.Config { @@ -174,16 +193,21 @@ func GetBrokerIDs(zkConn Connection) ([]string, error) { return brokerIDs, nil } -func getURLStringAndSchemeFromEndpoints(endpoints []string, protocolMap map[string]string) (scheme string, brokerHost *url.URL, err error) { +func getURLStringAndSchemeFromEndpoints(endpoints []string, protocolMap map[string]string) (schemes []string, brokerHosts []*url.URL, err error) { + schemes = make([]string, 0) + brokerHosts = make([]*url.URL, 0) for _, urlString := range endpoints { - scheme, brokerHost, err = getURLStringAndSchemeFromEndpoint(urlString, protocolMap) - if err == nil { - return + scheme, brokerHost, err := getURLStringAndSchemeFromEndpoint(urlString, protocolMap) + if err != nil { + continue } - - log.Debug("Error getting host and schema from url list: %s", err) + schemes = append(schemes, scheme) + brokerHosts = append(brokerHosts, brokerHost) + } + if len(schemes) == 0 && len(brokerHosts) == 0 { + return nil, nil, errors.New("host could not be found for broker") } - return "", nil, errors.New("host could not be found for broker") + return schemes, brokerHosts, nil } func getURLStringAndSchemeFromEndpoint(urlString string, protocolMap map[string]string) (scheme string, brokerHost *url.URL, err error) { @@ -204,7 +228,7 @@ func getURLStringAndSchemeFromEndpoint(urlString string, protocolMap map[string] } // GetBrokerConnectionInfo Collects Broker connection info from Zookeeper -func GetBrokerConnectionInfo(brokerID int, zkConn Connection) (scheme, brokerHost string, jmxPort int, brokerPort int, err error) { +func GetBrokerConnectionInfo(brokerID int, zkConn Connection) (brokerConnetions []BrokerConnection, err error) { // Query Zookeeper for broker information path := Path("/brokers/ids/" + strconv.Itoa(brokerID)) @@ -226,18 +250,21 @@ func GetBrokerConnectionInfo(brokerID int, zkConn Connection) (scheme, brokerHos } // We only want the URL if it's SSL or PLAINTEXT - scheme, brokerURL, err := getURLStringAndSchemeFromEndpoints(brokerDecoded.Endpoints, brokerDecoded.ProtocolMap) + schemes, brokerURLs, err := getURLStringAndSchemeFromEndpoints(brokerDecoded.Endpoints, brokerDecoded.ProtocolMap) if err != nil { return } - host, portString := brokerURL.Hostname(), brokerURL.Port() + connections := make([]BrokerConnection, 0) + for i, scheme := range schemes { + host, portString := brokerURLs[i].Hostname(), brokerURLs[i].Port() - port, err := strconv.Atoi(portString) - if err != nil { - return + port, err := strconv.Atoi(portString) + if err != nil { + return nil, nil + } + connections = append(connections, BrokerConnection{Scheme: scheme, BrokerHost: host, JmxPort: brokerDecoded.JmxPort, BrokerPort: port}) } - - return scheme, host, brokerDecoded.JmxPort, port, nil + return connections, nil } diff --git a/src/zookeeper/connection_test.go b/src/zookeeper/connection_test.go index 65f6a4ef..65e64353 100644 --- a/src/zookeeper/connection_test.go +++ b/src/zookeeper/connection_test.go @@ -13,26 +13,35 @@ func Test_GetBrokerConnectionInfo_WithHost(t *testing.T) { brokerID := 0 zkConn := MockConnection{} - zkConn.On("Get", "/brokers/ids/0").Return([]byte(`{"listener_security_protocol_map":{"SASL_SSL":"SASL_SSL","SSL":"SSL"},"endpoints":["SASL_SSL://my-broker.host:9193","SSL://my-broker.host:9093"],"rack":"us-east-1d","jmx_port":9999,"host":null,"timestamp":"1542127633364","port":-1,"version":4}`), new(zk.Stat), nil) + zkConn.On("Get", "/brokers/ids/0").Return([]byte(`{"listener_security_protocol_map":{"SASL_SSL":"SASL_SSL","SSL":"SSL", "PLAINTEXT":"PLAINTEXT"},"endpoints":["SASL_SSL://my-broker.host:9193","SSL://my-broker.host:9093","PLAINTEXT://my-broker.host:9092"],"rack":"us-east-1d","jmx_port":9999,"host":null,"timestamp":"1542127633364","port":-1,"version":4}`), new(zk.Stat), nil) - expectedScheme, expectedHost, expectedJMXPort, expectedKafkaPort := "https", "my-broker.host", 9999, 9093 + expectedValues := []BrokerConnection{ + {Scheme: "https", BrokerHost: "my-broker.host", JmxPort: 9999, BrokerPort: 9093}, + {Scheme: "http", BrokerHost: "my-broker.host", JmxPort: 9999, BrokerPort: 9092}, + } - scheme, host, jmxPort, kafkaPort, err := GetBrokerConnectionInfo(brokerID, &zkConn) + brokerConnections, err := GetBrokerConnectionInfo(brokerID, &zkConn) if err != nil { t.Fatalf("Unexpected error %s", err.Error()) } - if scheme != expectedScheme { - t.Errorf("Expected %s got %s", expectedScheme, scheme) - } - if host != expectedHost { - t.Errorf("Expected %s got %s", expectedHost, host) - } - if jmxPort != expectedJMXPort { - t.Errorf("Expected %d got %d", expectedJMXPort, jmxPort) + if len(expectedValues) != len(brokerConnections) { + t.Errorf("Expected %d entires got %d", len(expectedValues), len(brokerConnections)) } - if kafkaPort != expectedKafkaPort { - t.Errorf("Expected %d got %d", expectedKafkaPort, kafkaPort) + + for i, brokerConnection := range brokerConnections { + if brokerConnection.Scheme != expectedValues[i].Scheme { + t.Errorf("Expected %s got %s", expectedValues[i].Scheme, brokerConnection.Scheme) + } + if brokerConnection.BrokerHost != expectedValues[i].BrokerHost { + t.Errorf("Expected %s got %s", expectedValues[i].BrokerHost, brokerConnection.BrokerHost) + } + if brokerConnection.JmxPort != expectedValues[i].JmxPort { + t.Errorf("Expected %d got %d", expectedValues[i].JmxPort, brokerConnection.JmxPort) + } + if brokerConnection.BrokerPort != expectedValues[i].BrokerPort { + t.Errorf("Expected %d got %d", expectedValues[i].BrokerPort, brokerConnection.BrokerPort) + } } } @@ -43,49 +52,67 @@ func Test_GetBrokerConnectionInfo_WithProtocolMap(t *testing.T) { zkConn := MockConnection{} zkConn.On("Get", "/brokers/ids/0").Return([]byte(`{"listener_security_protocol_map":{"EXTERNAL":"SASL_SSL","INTERNAL":"PLAINTEXT"},"endpoints":["EXTERNAL://my-broker.host:9193", "INTERNAL://my-broker.host:9093"],"rack":"us-east-1d","jmx_port":9999,"host":null,"timestamp":"1542127633364","port":-1,"version":4}`), new(zk.Stat), nil) - expectedScheme, expectedHost, expectedJMXPort, expectedKafkaPort := "http", "my-broker.host", 9999, 9093 + expectedValues := []BrokerConnection{ + {Scheme: "http", BrokerHost: "my-broker.host", JmxPort: 9999, BrokerPort: 9093}, + } + + brokerConnections, err := GetBrokerConnectionInfo(brokerID, &zkConn) - scheme, host, jmxPort, kafkaPort, err := GetBrokerConnectionInfo(brokerID, &zkConn) if err != nil { t.Fatalf("Unexpected error %s", err.Error()) } - if scheme != expectedScheme { - t.Errorf("Expected %s got %s", expectedScheme, scheme) - } - if host != expectedHost { - t.Errorf("Expected %s got %s", expectedHost, host) - } - if jmxPort != expectedJMXPort { - t.Errorf("Expected %d got %d", expectedJMXPort, jmxPort) + if len(expectedValues) != len(brokerConnections) { + t.Errorf("Expected %d entires got %d", len(expectedValues), len(brokerConnections)) } - if kafkaPort != expectedKafkaPort { - t.Errorf("Expected %d got %d", expectedKafkaPort, kafkaPort) + + for i, brokerConnection := range brokerConnections { + if brokerConnection.Scheme != expectedValues[i].Scheme { + t.Errorf("Expected %s got %s", expectedValues[i].Scheme, brokerConnection.Scheme) + } + if brokerConnection.BrokerHost != expectedValues[i].BrokerHost { + t.Errorf("Expected %s got %s", expectedValues[i].BrokerHost, brokerConnection.BrokerHost) + } + if brokerConnection.JmxPort != expectedValues[i].JmxPort { + t.Errorf("Expected %d got %d", expectedValues[i].JmxPort, brokerConnection.JmxPort) + } + if brokerConnection.BrokerPort != expectedValues[i].BrokerPort { + t.Errorf("Expected %d got %d", expectedValues[i].BrokerPort, brokerConnection.BrokerPort) + } } } func Test_getUrlStringAndSchemeFromEndpoints_WithVanilla(t *testing.T) { testutils.SetupTestArgs() - endpoints := []string{"SASL_SSL://my-broker.host:9193", "SSL://my-broker.host:9093"} + endpoints := []string{"SASL_SSL://my-broker.host:9193", "SSL://my-broker.host:9093", "PLAINTEXT://my-broker.host:9092"} protocolMap := map[string]string{ - "SASL_SSL": "SASL_SSL", - "SSL": "SSL", + "SASL_SSL": "SASL_SSL", + "SSL": "SSL", + "PLAINTEXT": "PLAINTEXT", } - expectedScheme := "https" - expectedHost, err := url.Parse("SSL://my-broker.host:9093") + expectedSchemes := []string{"https", "http"} + expectedHosts := []*url.URL{} + host, err := url.Parse("SSL://my-broker.host:9093") + expectedHosts = append(expectedHosts, host) + host, err = url.Parse("PLAINTEXT://my-broker.host:9092") + expectedHosts = append(expectedHosts, host) - scheme, host, err := getURLStringAndSchemeFromEndpoints(endpoints, protocolMap) + schemes, hosts, err := getURLStringAndSchemeFromEndpoints(endpoints, protocolMap) if err != nil { t.Fatalf("Unexpected error %s", err.Error()) } - if scheme != expectedScheme { - t.Errorf("Expected '%s' got '%s'", expectedScheme, scheme) + for i, scheme := range schemes { + if scheme != expectedSchemes[i] { + t.Errorf("Expected '%s' got '%s'", expectedSchemes[i], scheme) + } } - if *host != *expectedHost { - t.Errorf("Expected %s got %s", expectedHost, host) + for i, host := range hosts { + if *host != *expectedHosts[i] { + t.Errorf("Expected %s got %s", expectedHosts[i], host) + } } } @@ -102,16 +129,21 @@ func Test_getUrlStringAndSchemeFromEndpoints_WithProtocolMap(t *testing.T) { expectedScheme := "http" expectedHost, err := url.Parse("INTERNAL://my-broker.host:9093") - scheme, host, err := getURLStringAndSchemeFromEndpoints(endpoints, protocolMap) + schemes, hosts, err := getURLStringAndSchemeFromEndpoints(endpoints, protocolMap) if err != nil { t.Fatalf("Unexpected error %s", err.Error()) } - if scheme != expectedScheme { - t.Errorf("Expected %s got %s", expectedScheme, scheme) + + for _, scheme := range schemes { + if scheme != expectedScheme { + t.Errorf("Expected '%s' got '%s'", expectedScheme, scheme) + } } - if *host != *expectedHost { - t.Errorf("Expected %q got %q", expectedHost, host) + for _, host := range hosts { + if *host != *expectedHost { + t.Errorf("Expected %s got %s", expectedHost, host) + } } } From 4987630eb93bdb3baf61e4df80c8334826285bc4 Mon Sep 17 00:00:00 2001 From: Anton Dalgren Date: Wed, 6 Nov 2019 09:15:07 +0100 Subject: [PATCH 2/3] fixing a loop that would do redundant metric collections, some typos, naming and styling --- src/brokercollect/broker_collection.go | 10 +++++----- src/brokercollect/broker_collection_test.go | 8 ++++---- src/topiccollect/topic_collection.go | 2 +- src/zookeeper/connection.go | 18 +++++++++--------- src/zookeeper/connection_test.go | 4 ++-- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/brokercollect/broker_collection.go b/src/brokercollect/broker_collection.go index 242d811b..12c8c6bd 100644 --- a/src/brokercollect/broker_collection.go +++ b/src/brokercollect/broker_collection.go @@ -86,7 +86,7 @@ func brokerWorker(brokerChan <-chan int, collectedTopics []string, wg *sync.Wait } // Create Broker - brokers, err := createBrokers(brokerID, zkConn, i) + brokers, err := createBrokerConnectionVariants(brokerID, zkConn, i) if err != nil { continue } @@ -109,16 +109,16 @@ func brokerWorker(brokerChan <-chan int, collectedTopics []string, wg *sync.Wait } log.Debug("Done Collecting metrics for broker %s", broker.Entity.Metadata.Name) } + break } } } -// Creates and populates an array of broker structs with all the information needed to -// populate inventory and metrics. -func createBrokers(brokerID int, zkConn zookeeper.Connection, i *integration.Integration) ([]*broker, error) { +// Creates and populates an array of different ways to connect to one broker. +func createBrokerConnectionVariants(brokerID int, zkConn zookeeper.Connection, i *integration.Integration) ([]*broker, error) { // Collect broker connection information from ZooKeeper - brokerConnections, err := zookeeper.GetBrokerConnectionInfo(brokerID, zkConn) + brokerConnections, err := zookeeper.GetBrokerConnections(brokerID, zkConn) if err != nil { log.Error("Unable to get broker JMX information for broker id %d: %s", brokerID, err) return nil, err diff --git a/src/brokercollect/broker_collection_test.go b/src/brokercollect/broker_collection_test.go index 354b2b9b..7685e27b 100644 --- a/src/brokercollect/broker_collection_test.go +++ b/src/brokercollect/broker_collection_test.go @@ -21,7 +21,7 @@ import ( ) var ( - brokerConnectionBytes = []byte(`{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT", "SSL":"SLL"},"endpoints":["PLAINTEXT://kafkabroker:9092", "SSL://kafkabroker:9093"],"jmx_port":9999,"host":"kafkabroker","timestamp":"1530886155628","port":9092,"version":4}`) + brokerConnectionBytes = []byte(`{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT", "SSL":"SSL"},"endpoints":["PLAINTEXT://kafkabroker:9092", "SSL://kafkabroker:9093"],"jmx_port":9999,"host":"kafkabroker","timestamp":"1530886155628","port":9092,"version":4}`) brokerConfigBytes = []byte(`{"version":1,"config":{"flush.messages":"12345"}}`) brokerConfigBytes2 = []byte(`{"version":1,"config":{"leader.replication.throttled.replicas":"10000"}}`) ) @@ -77,7 +77,7 @@ func TestCreateBroker_ZKError(t *testing.T) { zkConn.On("Get", "/brokers/ids/0").Return([]byte{}, new(zk.Stat), errors.New("this is a test error")) i, _ := integration.New("kafka", "1.0.0") - _, err := createBrokers(brokerID, zkConn, i) + _, err := createBrokerConnectionVariants(brokerID, zkConn, i) if err == nil { t.Error("Expected error") } @@ -89,7 +89,7 @@ func TestCreateBroker_Normal(t *testing.T) { zkConn.On("Get", "/config/brokers/0").Return(brokerConfigBytes, new(zk.Stat), nil) i, _ := integration.New("kafka", "1.0.0") - brokers, err := createBrokers(brokerID, zkConn, i) + brokers, err := createBrokerConnectionVariants(brokerID, zkConn, i) if err != nil { t.Errorf("Unexpected error: %s", err.Error()) } @@ -251,7 +251,7 @@ func TestGetBrokerJMX(t *testing.T) { }, } - brokerConnections, err := zookeeper.GetBrokerConnectionInfo(brokerID, &zkConn) + brokerConnections, err := zookeeper.GetBrokerConnections(brokerID, &zkConn) if err != nil { t.Error(err) } diff --git a/src/topiccollect/topic_collection.go b/src/topiccollect/topic_collection.go index de8b686b..4aa90bb7 100644 --- a/src/topiccollect/topic_collection.go +++ b/src/topiccollect/topic_collection.go @@ -218,7 +218,7 @@ func calculateUnderReplicatedCount(partitions []*partition, sample *metric.Set) func topicRespondsToMetadata(t *Topic, zkConn zookeeper.Connection) int { // Get connection information for a broker - connections, err := zookeeper.GetBrokerConnectionInfo(0, zkConn) + connections, err := zookeeper.GetBrokerConnections(0, zkConn) if err != nil { return 0 } diff --git a/src/zookeeper/connection.go b/src/zookeeper/connection.go index d906fd7d..5b3d5f9d 100644 --- a/src/zookeeper/connection.go +++ b/src/zookeeper/connection.go @@ -62,7 +62,7 @@ func (z zookeeperConnection) CreateClient() (connection.Client, error) { } // get broker connection info - brokerConnections, err := GetBrokerConnectionInfo(intID, z) + brokerConnections, err := GetBrokerConnections(intID, z) if err != nil { log.Warn("Unable to get connection information for broker with ID '%d'. Will not collect offset data for consumer groups on this broker: %s", intID, err) continue @@ -105,7 +105,7 @@ func (z zookeeperConnection) CreateClusterAdmin() (sarama.ClusterAdmin, error) { } // get broker connection info - brokerConnections, err := GetBrokerConnectionInfo(intID, z) + brokerConnections, err := GetBrokerConnections(intID, z) if err != nil { log.Warn("Unable to get connection information for broker with ID '%d'. Will not collect offset data for consumer groups on this broker: %s", intID, err) continue @@ -207,7 +207,7 @@ func getURLStringAndSchemeFromEndpoints(endpoints []string, protocolMap map[stri if len(schemes) == 0 && len(brokerHosts) == 0 { return nil, nil, errors.New("host could not be found for broker") } - return schemes, brokerHosts, nil + return } func getURLStringAndSchemeFromEndpoint(urlString string, protocolMap map[string]string) (scheme string, brokerHost *url.URL, err error) { @@ -227,8 +227,8 @@ func getURLStringAndSchemeFromEndpoint(urlString string, protocolMap map[string] return "", nil, errors.New("Protocol not found") } -// GetBrokerConnectionInfo Collects Broker connection info from Zookeeper -func GetBrokerConnectionInfo(brokerID int, zkConn Connection) (brokerConnetions []BrokerConnection, err error) { +// GetBrokerConnections Collects Broker connection info from Zookeeper +func GetBrokerConnections(brokerID int, zkConn Connection) (brokerConnections []BrokerConnection, err error) { // Query Zookeeper for broker information path := Path("/brokers/ids/" + strconv.Itoa(brokerID)) @@ -256,15 +256,15 @@ func GetBrokerConnectionInfo(brokerID int, zkConn Connection) (brokerConnetions return } - connections := make([]BrokerConnection, 0) + brokerConnections = make([]BrokerConnection, len(brokerURLs)) for i, scheme := range schemes { host, portString := brokerURLs[i].Hostname(), brokerURLs[i].Port() port, err := strconv.Atoi(portString) if err != nil { - return nil, nil + return nil, err } - connections = append(connections, BrokerConnection{Scheme: scheme, BrokerHost: host, JmxPort: brokerDecoded.JmxPort, BrokerPort: port}) + brokerConnections[i] = BrokerConnection{Scheme: scheme, BrokerHost: host, JmxPort: brokerDecoded.JmxPort, BrokerPort: port} } - return connections, nil + return } diff --git a/src/zookeeper/connection_test.go b/src/zookeeper/connection_test.go index 65e64353..e91f7a36 100644 --- a/src/zookeeper/connection_test.go +++ b/src/zookeeper/connection_test.go @@ -20,7 +20,7 @@ func Test_GetBrokerConnectionInfo_WithHost(t *testing.T) { {Scheme: "http", BrokerHost: "my-broker.host", JmxPort: 9999, BrokerPort: 9092}, } - brokerConnections, err := GetBrokerConnectionInfo(brokerID, &zkConn) + brokerConnections, err := GetBrokerConnections(brokerID, &zkConn) if err != nil { t.Fatalf("Unexpected error %s", err.Error()) } @@ -56,7 +56,7 @@ func Test_GetBrokerConnectionInfo_WithProtocolMap(t *testing.T) { {Scheme: "http", BrokerHost: "my-broker.host", JmxPort: 9999, BrokerPort: 9093}, } - brokerConnections, err := GetBrokerConnectionInfo(brokerID, &zkConn) + brokerConnections, err := GetBrokerConnections(brokerID, &zkConn) if err != nil { t.Fatalf("Unexpected error %s", err.Error()) From 44b7c7fcdfa7963810acdae9c583f8b2c55aa99f Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Thu, 14 Nov 2019 11:33:04 -0500 Subject: [PATCH 3/3] Bump version --- CHANGELOG.md | 4 ++++ src/kafka.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e792e94..0e408b30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## 2.5.0 - 2019-11-14 +### Added +- Support for attempting multiple connection protocols to broker ## 2.4.0 - 2019-10-25 ### Added - `consumer_group_regex` argument +- ### Changed - Deprecated `consumer_groups` in favor of `consumer_group_regex` diff --git a/src/kafka.go b/src/kafka.go index 87710173..9b96cdb4 100644 --- a/src/kafka.go +++ b/src/kafka.go @@ -17,7 +17,7 @@ import ( const ( integrationName = "com.newrelic.kafka" - integrationVersion = "2.4.0" + integrationVersion = "2.5.0" ) func main() {