Skip to content

Commit

Permalink
wait for port to open during startup (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Apr 16, 2024
1 parent ef7d119 commit 1f47787
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 19 deletions.
38 changes: 30 additions & 8 deletions app/src/main/scala/hstream/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import java.nio.file.{Files, Paths}
import kafka.utils.Logging
import kafka.network.SocketServer

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.sys.process._
import scala.util.Random

Expand Down Expand Up @@ -52,25 +55,20 @@ class KafkaBroker(
val image = config.testingConfig
.getOrElse("image", throw new IllegalArgumentException("image is required"))
.asInstanceOf[String]
val rmArg =
if (
config.testingConfig
.getOrElse("container_remove", throw new IllegalArgumentException("container_remove is required"))
.asInstanceOf[Boolean]
) "--rm"
else ""
val storeDir = config.testingConfig
.getOrElse("store_dir", throw new IllegalArgumentException("store_dir is required"))
.asInstanceOf[String]
val dockerCmd =
s"docker run $rmArg -d --network host --name $containerName -v $storeDir:/data/store $image $command"
s"docker run -d --network host --name $containerName -v $storeDir:/data/store $image $command"
info(s"=> Start hserver by: $dockerCmd")
dockerCmd.run()
} else {
throw new NotImplementedError("startup: spec is invalid!")
}
}
}

awaitStartup()
}

// TODO: TMP_FOR_HSTREAM
Expand Down Expand Up @@ -145,4 +143,28 @@ class KafkaBroker(
}
}

// --------------------------------------------------------------------------
// For hstream

private def awaitStartup(retries: Int = 20): Unit = {
if (retries <= 0) {
s"docker logs $containerName".!
throw new RuntimeException("Failed to start hstream!")
}
val port = config.port
val f = Future {
try {
val socket = new java.net.Socket("127.0.0.1", port)
socket.close()
info("=> The server port is open at " + port)
} catch {
case e: java.net.ConnectException =>
info("=> Retry to connect to the server port " + port)
Thread.sleep(1000)
awaitStartup(retries - 1)
}
}
Await.result(f, 60.second)
}

}
33 changes: 25 additions & 8 deletions app/src/test/scala/utils/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2767,21 +2767,38 @@ object TestUtils extends Logging {
endingIdNumber: Int,
testName: String
): Seq[Properties] = {
val brokerConfig = brokerContainer.get("config").asInstanceOf[java.util.Map[String, Object]]
val brokerConfig = brokerContainer.get("config").asInstanceOf[java.util.Map[String, Object]].asScala
// NOTE: Since this has side effect, do NOT move it inside the following loop (startingIdNumber to endingIdNumber)
val basePort = brokerConfig.remove("base_port").asInstanceOf[Int]
val advertisedAddress = brokerConfig.get("advertised.address").asInstanceOf[String]
val basePort = brokerConfig.remove("base_port").asInstanceOf[Option[Int]]
val advertisedAddress = brokerConfig
.getOrElse(
"advertised.address",
throw new IllegalArgumentException("advertised.address is required in broker_container")
)
.asInstanceOf[String]

// FIXME: we use a fixed port (the first gossipPort) for seed node, this is a temporary solution
var fstGossipPort: Int = -1

// generate
val props = (startingIdNumber to endingIdNumber).zipWithIndex.map { case (nodeId, idx) =>
val prop = new Properties
val port = basePort + idx * 2
val gossipPort = basePort + idx * 2 + 1
val port = basePort match {
case None => getUnusedPort()
case Some(p) => p + idx * 2
}
val gossipPort = basePort match {
case None => getUnusedPort()
case Some(p) => p + idx * 2 + 1
}
if (idx == 0) {
fstGossipPort = gossipPort
}
// broker config
prop.put("broker.id", nodeId.toString)
prop.put("port", port.toString)
prop.put("gossip.port", gossipPort.toString)
brokerConfig.asScala.foreach { case (k, v) => prop.put(k, v.toString) }
brokerConfig.foreach { case (k, v) => prop.put(k, v.toString) }
// testing config
val testingConfig: collection.mutable.Map[String, Object] =
brokerContainer
Expand All @@ -2799,10 +2816,10 @@ object TestUtils extends Logging {
val metastorePort = testingConfig
.getOrElse("metastore_port", throw new IllegalArgumentException("metastore_port is required in testing_config"))
.asInstanceOf[Int]
// FIXME: we use a fixed port `basePort + 1` for seed node, this is a temporary solution
// FIXME: we use a fixed port (the first gossipPort) for seed node, this is a temporary solution
val args = s"""--server-id $nodeId
--port $port --gossip-port $gossipPort
--seed-nodes 127.0.0.1:${basePort + 1}
--seed-nodes 127.0.0.1:$fstGossipPort
--advertised-address $advertisedAddress
--metastore-uri zk://127.0.0.1:$metastorePort
--store-config /data/store/logdevice.conf
Expand Down
6 changes: 4 additions & 2 deletions script/config.yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use: broker_container

broker_container:
config:
base_port: ${base_port}
# Optional. If no base_port, all ports will be random.
#
#base_port: ${base_port}
advertised.address: 127.0.0.1

testing_config:
Expand All @@ -15,7 +17,7 @@ broker_container:
hstream-server kafka
--bind-address 0.0.0.0
--metrics-port 0
--log-level debug1
--log-level debug
--log-with-color
--log-flush-immediately
--store-log-level error
Expand Down
2 changes: 1 addition & 1 deletion script/run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ generate_config() {
if [ -f $env_file ]; then
store_admin_port=$(cat $env_file | grep STORE_ADMIN_LOCAL_PORT | cut -d'=' -f2)
zookeeper_port=$(cat $env_file | grep ZOOKEEPER_LOCAL_PORT | cut -d'=' -f2)
base_port=$(find_freeport)
base_port=$(find_freeport) # Optional
sed -e "s/\${base_port}/$base_port/g" \
-e "s#\${image}#$hstream_image#g" \
-e "s/\${metastore_port}/$zookeeper_port/g" \
Expand Down

0 comments on commit 1f47787

Please sign in to comment.