Skip to content

Commit

Permalink
fix: Ensure generated random ports are unique (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored May 15, 2024
1 parent edf0766 commit ad74454
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 52 deletions.
3 changes: 2 additions & 1 deletion app/src/main/scala/hstream/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ class KafkaBroker(
.getOrElse("metastore_port", throw new IllegalArgumentException("metastore_port is required"))
.asInstanceOf[Int]
info("=> Delete all zk nodes...")
s"docker run --rm --network host zookeeper:3.7 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".!
// Use the same zk version as scripe/dev-tools to avoid pulling new image
s"docker run --rm --network host zookeeper:3.6 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".!
} else {
throw new NotImplementedError("shutdown: spec is invalid!")
}
Expand Down
129 changes: 78 additions & 51 deletions app/src/test/scala/utils/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,36 @@ object TestUtils extends Logging {
/* 0 gives a random port; you can then retrieve the assigned port from the Socket object. */
val RandomPort = 0

def getUnusedPort(): Int = {
// There is no need to be done in a try/finally
val serverSocket = new ServerSocket(0)
val port = serverSocket.getLocalPort()
serverSocket.close()

// wait until the port is released
waitUntilTrue(
() => {
try {
val sock = new Socket("localhost", port)
sock.close()
false
} catch {
case _: Exception => true
}
},
s"Port $port is still in use",
waitTimeMs = 60000
)

port
def getUnusedPort(): Int = getUnusedPorts(0).head

def getUnusedPorts(n: Int): Seq[Int] = {
val sockPorts = (0 to n).map(_ => {
// There is no need to be done in a try/finally
val serverSocket = new ServerSocket(0)
val port = serverSocket.getLocalPort()
(serverSocket, port)
})
sockPorts.map {
case (serverSocket, port) => {
// close server
serverSocket.close()
// wait until the port is released
waitUntilTrue(
() => {
try {
val sock = new Socket("localhost", port)
sock.close()
false
} catch {
case _: Exception => true
}
},
s"Port $port is still in use",
waitTimeMs = 60000
)
port
}
}
}

val currentTestTimeMillis = System.currentTimeMillis()
Expand Down Expand Up @@ -305,21 +313,24 @@ object TestUtils extends Logging {
startingIdNumber: Int = 0
): Seq[Properties] = {
val endingIdNumber = startingIdNumber + numConfigs - 1
// For each config, we need to find two unused ports for: "port" and "gossip.port"
val unusedPorts = getUnusedPorts(numConfigs * 2)

val configFile = sys.env.getOrElse("CONFIG_FILE", "").trim
// Read from yaml file
if (configFile.nonEmpty) {
parseConfigFile(configFile, startingIdNumber, endingIdNumber, testInfo.getDisplayName())
parseConfigFile(configFile, startingIdNumber, endingIdNumber, testInfo.getDisplayName(), unusedPorts)
}
// Generate
else {
(startingIdNumber to endingIdNumber).map { node =>
createBrokerConfig(
(startingIdNumber to endingIdNumber).zipWithIndex.map { case (node, i) =>
genBrokerConfig(
node,
metaStoreUri,
testInfo,
storeConfig = storeConfig,
port = getUnusedPort(),
port = unusedPorts(i * 2),
gossipPort = unusedPorts(i * 2 + 1),
numPartitions = numPartitions,
defaultReplicationFactor = defaultReplicationFactor
)
Expand Down Expand Up @@ -489,32 +500,45 @@ object TestUtils extends Logging {
metaStoreUri: String,
testInfo: TestInfo,
storeConfig: String = "/store/logdevice/logdevice.conf",
port: Int = getUnusedPort(),
numPartitions: Int = 1,
defaultReplicationFactor: Short = 1
): Properties = {
createBrokerConfigs(
1,
metaStoreUri,
testInfo,
storeConfig = storeConfig,
numPartitions = numPartitions,
defaultReplicationFactor = defaultReplicationFactor,
startingIdNumber = 0
)(0)
}

private def genBrokerConfig(
brokerId: Int,
metaStoreUri: String,
testInfo: TestInfo,
storeConfig: String = "/store/logdevice/logdevice.conf",
port: Int = RandomPort,
gossipPort: Int = RandomPort,
numPartitions: Int = 1,
defaultReplicationFactor: Short = 1
): Properties = {
val props = new Properties

val configFile = sys.env.getOrElse("CONFIG_FILE", "").trim
// Read from yaml file
if (configFile.nonEmpty) {
return parseConfigFile(configFile, brokerId, brokerId, testInfo.getDisplayName())(0)
}
// Generate
else {
props.put(KafkaConfig.BrokerIdProp, brokerId.toString)
props.put(KafkaConfig.PortProp, port.toString)
props.put(KafkaConfig.AdvertisedAddressProp, "127.0.0.1")
props.put(KafkaConfig.MetaStoreUriProp, metaStoreUri)
props.put(KafkaConfig.GossipPortProp, getUnusedPort().toString)
props.put(KafkaConfig.StoreConfigProp, storeConfig)
// props.put(KafkaConfig.AdvertisedListenersProp, )
// props.put(KafkaConfig.ListenerSecurityProtocolMapProp, )
props.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
props.put(KafkaConfig.DefaultReplicationFactorProp, defaultReplicationFactor.toString)

props
}
props.put(KafkaConfig.BrokerIdProp, brokerId.toString)
props.put(KafkaConfig.PortProp, port.toString)
props.put(KafkaConfig.AdvertisedAddressProp, "127.0.0.1")
props.put(KafkaConfig.MetaStoreUriProp, metaStoreUri)
props.put(KafkaConfig.GossipPortProp, gossipPort.toString)
props.put(KafkaConfig.StoreConfigProp, storeConfig)
// props.put(KafkaConfig.AdvertisedListenersProp, )
// props.put(KafkaConfig.ListenerSecurityProtocolMapProp, )
props.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
props.put(KafkaConfig.DefaultReplicationFactorProp, defaultReplicationFactor.toString)

props
}

// @nowarn("cat=deprecation")
Expand Down Expand Up @@ -2770,7 +2794,8 @@ object TestUtils extends Logging {
configFile: String,
startingIdNumber: Int,
endingIdNumber: Int,
testName: String
testName: String,
unusedPorts: Seq[Int]
): Seq[Properties] = {
val configs = readConfigFile(configFile)
val use = configs
Expand All @@ -2788,7 +2813,8 @@ object TestUtils extends Logging {
brokerContainer,
startingIdNumber,
endingIdNumber,
testName
testName,
unusedPorts
)
} else if (use == "broker_connections") {
// Directly connect to brokers
Expand Down Expand Up @@ -2828,7 +2854,8 @@ object TestUtils extends Logging {
brokerContainer: java.util.Map[String, Object],
startingIdNumber: Int,
endingIdNumber: Int,
testName: String
testName: String,
unusedPorts: Seq[Int]
): Seq[Properties] = {
val brokerConfig = brokerContainer.get("config").asInstanceOf[java.util.Map[String, Object]].asScala
val testingConfig: collection.mutable.Map[String, Object] =
Expand Down Expand Up @@ -2863,11 +2890,11 @@ object TestUtils extends Logging {
val props = (startingIdNumber to endingIdNumber).zipWithIndex.map { case (nodeId, idx) =>
val prop = new Properties
val port = basePort match {
case None => getUnusedPort()
case None => unusedPorts(idx * 2)
case Some(p) => p + idx * 2
}
val gossipPort = basePort match {
case None => getUnusedPort()
case None => unusedPorts(idx * 2 + 1)
case Some(p) => p + idx * 2 + 1
}
if (initMode == "join") {
Expand Down

0 comments on commit ad74454

Please sign in to comment.