Skip to content

Commit

Permalink
[LI-FIXUP] Add an integration test for LiCombinedControl request (#433)
Browse files Browse the repository at this point in the history
TICKET = LIKAFKA-49954
LI_DESCRIPTION =
This should a direct fixup to commit 8c5b1ec. Added integration test to verify commit 8c5b1ec do fix a LiCombinedControlRequest bug. This test adds replicas to a broker that is in a cluster with replicas in other brokers and that previously has not hosted any replicas on it. The test then verifies after assigning partitions to the broker, the broker still sends LiCombinedControl requests.

Verification is done that without commit 8c5b1ec, this added test would fail; and with commit 8c5b1ec, the test would pass.

EXIT_CRITERIA = The same as the LiCombinedControl request.
  • Loading branch information
hshi2022 authored Feb 8, 2023
1 parent 2765f1c commit e15ba9d
Showing 1 changed file with 49 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
import org.apache.kafka.common.errors.StaleBrokerEpochException
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.common.message.LiCombinedControlRequestData
import org.apache.kafka.common.message.{ApiMessageType, LiCombinedControlRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.LiCombinedControlRequest
Expand Down Expand Up @@ -90,6 +90,45 @@ class LiCombinedControlRequestTest extends KafkaServerTestHarness with Logging
assertEquals(combinedRequestsSent2, combinedRequestsSent3)
}

@Test
def testLiCombinedControlRequestNoPartitionBroker(): Unit = {
// This test is to verify that commit 8c5b1ec033577cb6ca2b445a70e7347581d5b7c6 would fix the bug described below.
// https://github.com/linkedin/kafka/commit/8c5b1ec033577cb6ca2b445a70e7347581d5b7c6
// This commit fixes a bug that would happen when the following sequences happen:
// (1) A broker is added to a cluster without any assigned partitions, and the cluster has
// liCombinedControlRequestEnable enabled and has some partitions.
// (2) The controller sends a raw full UpdateMetadataWithPartitions request to this broker.
// (firstUpdateMetadataWithPartitionsSent becomes true).
// (3) Subsequent requests from controller to the broker would be as LiCombinedControllerRequest, including
// LeaderAndIsr requests.
// (4) Some partitions are added to the broker.
// Without the fix, the bug is that all subsequent controller requests to this broker would be raw requests,
// not LiCombinedControllerRequest.


// turn on the feature by setting the /li_combined_control_request_flag to true
val props = new Properties
props.put(KafkaConfig.LiCombinedControlRequestEnableProp, "true")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LiCombinedControlRequestEnableProp, "true"))

// Create 1 partition on broker 0. Broker 1 will receive a UpdateMetadata request containing partitions,
// have firstUpdateMetadataWithPartitionsSent be true, and have LiCombinedControl enabled.
// At this time, broker 1 should have no partition.
createTopic("topic2", Map(0 -> Seq(0)))
createTopic("topic3", Map(0 -> Seq(0)))

// Create 1 partition on broker 1.
createTopic("topic4", Map(0 -> Seq(1)))

// check the subsequent controller requests are not sent through raw requests
val rawUpdateMetadataRequestsSent1 = getUpdateMetadataRequestCount()
createTopic("topic5")
createTopic("topic6")
val rawUpdateMetadataRequestsSent2 = getUpdateMetadataRequestCount()
// check no raw UpdateMetadata requests are sent.
assertTrue(rawUpdateMetadataRequestsSent1 == rawUpdateMetadataRequestsSent2)
}

@Test
def testLiCombinedControlResponseV1(): Unit = {
val topic1Uuid = Uuid.randomUuid()
Expand Down Expand Up @@ -163,8 +202,16 @@ class LiCombinedControlRequestTest extends KafkaServerTestHarness with Logging
createTopic(topic, 1, 1)
}

getRequestCount(ApiMessageType.LI_COMBINED_CONTROL.name())
}

def getUpdateMetadataRequestCount() = {
getRequestCount(ApiMessageType.UPDATE_METADATA.name())
}

def getRequestCount(requestName: String): Long = {
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.filter { case (n, metric) =>
n.getMBeanName.contains("name=brokerRequestRemoteTimeMs,request=LI_COMBINED_CONTROL")
n.getMBeanName.contains("name=brokerRequestRemoteTimeMs,request=" + requestName)
}

if (metrics.nonEmpty) {
Expand Down

0 comments on commit e15ba9d

Please sign in to comment.