Skip to content

Commit

Permalink
[LI-HOTFIX] Improve AlterIsr-related logging (#458)
Browse files Browse the repository at this point in the history
TICKET = LIKAFKA-52213

LI_DESCRIPTION =
Convert ISR-related unhappy-path DEBUG logs in Partition into WARN-level logs, and add summary-style INFO logs to BrokerToControllerRequestManager's DEBUG logs in the AlterIsr/ElectLeaders sendRequest path.

Tested by uploading home-dir builds to two brokers in a certification cluster, bouncing both to pick up the new code, and observing kafka-server.log on the first while the second is bounced (=> ISR for multiple topic-partitions is in flux).

Sample summary-logs:
```
2023/05/23 21:09:35.182 INFO [DefaultAlterIsrManager] [data-plane-kafka-request-handler-7] [kafka-server] [] Sending to controller org.apache.kafka.common.requests.AlterIsrRequest$Builder of ~274 bytes (1 items)
2023/05/23 21:09:35.324 INFO [DefaultAlterIsrManager] [BrokerToControllerChannelManager broker=78001 name=alterIsr] [kafka-server] [] Received AlterIsr response of ~479 bytes
2023/05/23 21:09:35.329 INFO [DefaultAlterIsrManager] [BrokerToControllerChannelManager broker=78001 name=alterIsr] [kafka-server] [] Sending to controller org.apache.kafka.common.requests.AlterIsrRequest$Builder of ~24723 bytes (191 items)
2023/05/23 21:09:35.663 INFO [DefaultAlterIsrManager] [BrokerToControllerChannelManager broker=78001 name=alterIsr] [kafka-server] [] Received AlterIsr response of ~29868 bytes
2023/05/23 21:09:35.670 INFO [DefaultAlterIsrManager] [BrokerToControllerChannelManager broker=78001 name=alterIsr] [kafka-server] [] Sending to controller org.apache.kafka.common.requests.AlterIsrRequest$Builder of ~66024 bytes (586 items)
2023/05/23 21:09:36.665 INFO [DefaultAlterIsrManager] [BrokerToControllerChannelManager broker=78001 name=alterIsr] [kafka-server] [] Received AlterIsr response of ~81441 bytes
```

The complete request and response is still printed at DEBUG level.

EXIT_CRITERIA = If/when this change is accepted upstream and pulled into this repo.
  • Loading branch information
groelofs authored May 24, 2023
1 parent a012f6a commit c006041
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,7 @@ class Partition(val topicPartition: TopicPartition,
// This means isrState was updated through leader election or some other mechanism before we got the AlterIsr
// response. We don't know what happened on the controller exactly, but we do know this response is out of date
// so we ignore it.
debug(s"Ignoring failed ISR update to $proposedIsrState since we have already updated state to $isrState")
warn(s"Ignoring failed ISR update to $proposedIsrState since we have already updated state to $isrState")
return
}

Expand All @@ -1462,22 +1462,22 @@ class Partition(val topicPartition: TopicPartition,
isrChangeListener.markFailed()
error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
debug(s"Failed to update ISR to $proposedIsrState since it doesn't know about this topic or partition. Giving up.")
warn(s"Failed to update ISR to $proposedIsrState since it doesn't know about this topic or partition. Giving up.")
case Errors.FENCED_LEADER_EPOCH =>
debug(s"Failed to update ISR to $proposedIsrState since we sent an old leader epoch. Giving up.")
warn(s"Failed to update ISR to $proposedIsrState since we sent an old leader epoch. Giving up.")
case Errors.INVALID_UPDATE_VERSION =>
debug(s"Failed to update ISR to $proposedIsrState due to invalid version. Giving up.")
warn(s"Failed to update ISR to $proposedIsrState due to invalid version. Giving up.")
case _ =>
warn(s"Failed to update ISR to $proposedIsrState due to unexpected $error. Retrying.")
sendAlterIsrRequest(proposedIsrState)
}
case Right(leaderAndIsr: LeaderAndIsr) =>
// Success from controller, still need to check a few things
if (leaderAndIsr.leaderEpoch != leaderEpoch) {
debug(s"Ignoring new ISR ${leaderAndIsr} since we have a stale leader epoch $leaderEpoch.")
warn(s"Ignoring new ISR ${leaderAndIsr} since we have a stale leader epoch $leaderEpoch.")
isrChangeListener.markFailed()
} else if (leaderAndIsr.zkVersion < zkVersion) {
debug(s"Ignoring new ISR ${leaderAndIsr} since we have a newer version $zkVersion.")
warn(s"Ignoring new ISR ${leaderAndIsr} since we have a newer version $zkVersion.")
isrChangeListener.markFailed()
} else {
// This is one of two states:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class BrokerToControllerRequestThread(
// We don't care if disconnect has an error, just log it and get a new network client
networkClient.disconnect(controllerAddress.idString)
} catch {
case t: Throwable => error("Had an error while disconnecting from NetworkClient.", t)
case t: Throwable => error(s"Had an error while disconnecting NetworkClient from stale controller ${controllerAddress.idString}.", t)
}
}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,18 @@ abstract class AbstractBrokerToControllerRequestManager[Item <: BrokerToControll
private def sendRequest(itemsToSend: Seq[Item]): Unit = {
val brokerEpoch = brokerEpochSupplier.apply()
val request = buildRequest(itemsToSend, brokerEpoch)
debug(s"Sending to controller $request")
info(s"Sending to controller ${request.getClass.getName} of ~${request.toString.length} bytes (${itemsToSend.length} items)")
debug(s"Full request: $request")

// We will not timeout the request, instead letting it retry indefinitely
// until a response is received.
controllerChannelManager.sendRequest(request,
new ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
debug(s"Received response $response")
// several tests create ClientResponses with null requestHeaders
val requestType = if (response.requestHeader != null) response.requestHeader.apiKey.name else "unknown-BrokerToControllerRequest"
info(s"Received ${requestType} response of ~${response.toString.length} bytes")
debug(s"Full response: $response")
val error = try {
if (response.authenticationException != null) {
// For now we treat authentication errors as retriable. We use the
Expand All @@ -141,6 +145,7 @@ abstract class AbstractBrokerToControllerRequestManager[Item <: BrokerToControll
// authentication errors so that users have a chance to fix the problem.
Errors.NETWORK_EXCEPTION
} else if (response.versionMismatch != null) {
warn(s"Unsupported version: controller response ${response}")
Errors.UNSUPPORTED_VERSION
} else {
handleResponse(response, brokerEpoch, itemsToSend)
Expand All @@ -157,7 +162,8 @@ abstract class AbstractBrokerToControllerRequestManager[Item <: BrokerToControll
maybeSendRequest()
case _ =>
// If we received a top-level error from the controller, retry the request in the near future
scheduler.schedule("send-alter-isr", () => maybeSendRequest(), 50, -1, TimeUnit.MILLISECONDS)
// (could be either AlterIsrRequest or ElectLeadersRequest)
scheduler.schedule(s"send-${requestType}", () => maybeSendRequest(), 50, -1, TimeUnit.MILLISECONDS)
}
}

Expand Down

0 comments on commit c006041

Please sign in to comment.