Skip to content

Commit

Permalink
[LI-HOTFIX] Add metric for total connection count (#430)
Browse files Browse the repository at this point in the history
This metric is added for showing the total client connection count to a broker. The metric will be useful when we monitor the connection count and measure the max connections of a broker.


TICKET = N/A
LI_DESCRIPTION = LIKAFKA-49259
EXIT_CRITERIA = When upstream implement similar sensors
  • Loading branch information
CCisGG authored Jan 25, 2023
1 parent 8c5b1ec commit 21cece1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
8 changes: 7 additions & 1 deletion core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.network

import com.yammer.metrics.core.Gauge

import java.io.IOException
import java.net._
import java.nio.ByteBuffer
Expand Down Expand Up @@ -1299,7 +1301,7 @@ object ConnectionQuotas {
}
}

class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extends Logging with AutoCloseable {
class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extends Logging with KafkaMetricsGroup with AutoCloseable {

@volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
@volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
Expand All @@ -1310,6 +1312,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
// Listener counts and configs are synchronized on `counts`
private val listenerCounts = mutable.Map[ListenerName, Int]()
private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]()
private val TotalConnectionCountMetricName = "TotalConnectionCount"
@volatile private var totalCount = 0
// updates to defaultConnectionRatePerIp or connectionRatePerIp must be synchronized on `counts`
@volatile private var defaultConnectionRatePerIp = QuotaConfigs.IP_CONNECTION_RATE_DEFAULT.intValue()
Expand All @@ -1318,6 +1321,9 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity)
private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)

// sensor that emits the total broker connection count including external, admin, and replication connections
val totalConnectionCountGauge = newGauge(TotalConnectionCountMetricName, () => totalCount)

def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
counts.synchronized {
waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
Expand Down
32 changes: 30 additions & 2 deletions core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.net.InetAddress
import java.util
import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit}
import java.util.{Collections, Properties}
import com.yammer.metrics.core.Meter
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.{Gauge, Meter, MetricName}
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import kafka.network.Processor.ListenerMetricTag
import kafka.server.KafkaConfig
import kafka.utils.Implicits.MapExtensionMethods
Expand Down Expand Up @@ -748,6 +748,22 @@ class ConnectionQuotasTest {
s"Number of connections on EXTERNAL listener:")
}

@Test
def testTotalConnectionCountMetrics(): Unit = {
val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
connectionQuotas = new ConnectionQuotas(config, time, metrics)
addListenersAndVerify(config, connectionQuotas)

val numConnections = 100
val totalConnectionCountGauge = getGauge[Int]("TotalConnectionCount")
val futures = listeners.values.map { listener =>
executor.submit((() => acceptConnections(connectionQuotas, listener, numConnections, 10)): Runnable)
}

futures.foreach(_.get(10, TimeUnit.SECONDS))
assertEquals(numConnections * listeners.size, totalConnectionCountGauge.value())
}

private def addListenersAndVerify(config: KafkaConfig, connectionQuotas: ConnectionQuotas) : Unit = {
addListenersAndVerify(config, Map.empty.asJava, connectionQuotas)
}
Expand Down Expand Up @@ -951,4 +967,16 @@ class ConnectionQuotasTest {
)
}
}

private def getGauge[T](filter: MetricName => Boolean): Gauge[T] = {
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find { case (k, _) => filter(k) }
.getOrElse { fail(s"Unable to find metric") }
.asInstanceOf[(Any, Gauge[Any])]
._2
.asInstanceOf[Gauge[T]]
}

private def getGauge[T](metricName: String): Gauge[T] = {
getGauge(mName => mName.getName.endsWith(metricName))
}
}

0 comments on commit 21cece1

Please sign in to comment.