diff --git a/app/src/main/scala/hstream/server/KafkaBroker.scala b/app/src/main/scala/hstream/server/KafkaBroker.scala index 9a3f625..27d1a5b 100644 --- a/app/src/main/scala/hstream/server/KafkaBroker.scala +++ b/app/src/main/scala/hstream/server/KafkaBroker.scala @@ -7,6 +7,9 @@ import java.nio.file.{Files, Paths} import kafka.utils.Logging import kafka.network.SocketServer +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ import scala.sys.process._ import scala.util.Random @@ -52,18 +55,11 @@ class KafkaBroker( val image = config.testingConfig .getOrElse("image", throw new IllegalArgumentException("image is required")) .asInstanceOf[String] - val rmArg = - if ( - config.testingConfig - .getOrElse("container_remove", throw new IllegalArgumentException("container_remove is required")) - .asInstanceOf[Boolean] - ) "--rm" - else "" val storeDir = config.testingConfig .getOrElse("store_dir", throw new IllegalArgumentException("store_dir is required")) .asInstanceOf[String] val dockerCmd = - s"docker run $rmArg -d --network host --name $containerName -v $storeDir:/data/store $image $command" + s"docker run -d --network host --name $containerName -v $storeDir:/data/store $image $command" info(s"=> Start hserver by: $dockerCmd") dockerCmd.run() } else { @@ -71,6 +67,8 @@ class KafkaBroker( } } } + + awaitStartup() } // TODO: TMP_FOR_HSTREAM @@ -145,4 +143,28 @@ class KafkaBroker( } } + // -------------------------------------------------------------------------- + // For hstream + + private def awaitStartup(retries: Int = 20): Unit = { + if (retries <= 0) { + s"docker logs $containerName".! + throw new RuntimeException("Failed to start hstream!") + } + val port = config.port + val f = Future { + try { + val socket = new java.net.Socket("127.0.0.1", port) + socket.close() + info("=> The server port is open at " + port) + } catch { + case e: java.net.ConnectException => + info("=> Retry to connect to the server port " + port) + Thread.sleep(1000) + awaitStartup(retries - 1) + } + } + Await.result(f, 60.second) + } + } diff --git a/app/src/test/scala/utils/kafka/utils/TestUtils.scala b/app/src/test/scala/utils/kafka/utils/TestUtils.scala index e897061..443c202 100644 --- a/app/src/test/scala/utils/kafka/utils/TestUtils.scala +++ b/app/src/test/scala/utils/kafka/utils/TestUtils.scala @@ -2767,21 +2767,38 @@ object TestUtils extends Logging { endingIdNumber: Int, testName: String ): Seq[Properties] = { - val brokerConfig = brokerContainer.get("config").asInstanceOf[java.util.Map[String, Object]] + val brokerConfig = brokerContainer.get("config").asInstanceOf[java.util.Map[String, Object]].asScala // NOTE: Since this has side effect, do NOT move it inside the following loop (startingIdNumber to endingIdNumber) - val basePort = brokerConfig.remove("base_port").asInstanceOf[Int] - val advertisedAddress = brokerConfig.get("advertised.address").asInstanceOf[String] + val basePort = brokerConfig.remove("base_port").asInstanceOf[Option[Int]] + val advertisedAddress = brokerConfig + .getOrElse( + "advertised.address", + throw new IllegalArgumentException("advertised.address is required in broker_container") + ) + .asInstanceOf[String] + + // FIXME: we use a fixed port (the first gossipPort) for seed node, this is a temporary solution + var fstGossipPort: Int = -1 // generate val props = (startingIdNumber to endingIdNumber).zipWithIndex.map { case (nodeId, idx) => val prop = new Properties - val port = basePort + idx * 2 - val gossipPort = basePort + idx * 2 + 1 + val port = basePort match { + case None => getUnusedPort() + case Some(p) => p + idx * 2 + } + val gossipPort = basePort match { + case None => getUnusedPort() + case Some(p) => p + idx * 2 + 1 + } + if (idx == 0) { + fstGossipPort = gossipPort + } // broker config prop.put("broker.id", nodeId.toString) prop.put("port", port.toString) prop.put("gossip.port", gossipPort.toString) - brokerConfig.asScala.foreach { case (k, v) => prop.put(k, v.toString) } + brokerConfig.foreach { case (k, v) => prop.put(k, v.toString) } // testing config val testingConfig: collection.mutable.Map[String, Object] = brokerContainer @@ -2799,10 +2816,10 @@ object TestUtils extends Logging { val metastorePort = testingConfig .getOrElse("metastore_port", throw new IllegalArgumentException("metastore_port is required in testing_config")) .asInstanceOf[Int] - // FIXME: we use a fixed port `basePort + 1` for seed node, this is a temporary solution + // FIXME: we use a fixed port (the first gossipPort) for seed node, this is a temporary solution val args = s"""--server-id $nodeId --port $port --gossip-port $gossipPort - --seed-nodes 127.0.0.1:${basePort + 1} + --seed-nodes 127.0.0.1:$fstGossipPort --advertised-address $advertisedAddress --metastore-uri zk://127.0.0.1:$metastorePort --store-config /data/store/logdevice.conf diff --git a/script/config.yaml.tmpl b/script/config.yaml.tmpl index df8bc28..59ef57e 100644 --- a/script/config.yaml.tmpl +++ b/script/config.yaml.tmpl @@ -2,7 +2,9 @@ use: broker_container broker_container: config: - base_port: ${base_port} + # Optional. If no base_port, all ports will be random. + # + #base_port: ${base_port} advertised.address: 127.0.0.1 testing_config: @@ -15,7 +17,7 @@ broker_container: hstream-server kafka --bind-address 0.0.0.0 --metrics-port 0 - --log-level debug1 + --log-level debug --log-with-color --log-flush-immediately --store-log-level error diff --git a/script/run_test.sh b/script/run_test.sh index e911852..5789b36 100755 --- a/script/run_test.sh +++ b/script/run_test.sh @@ -17,7 +17,7 @@ generate_config() { if [ -f $env_file ]; then store_admin_port=$(cat $env_file | grep STORE_ADMIN_LOCAL_PORT | cut -d'=' -f2) zookeeper_port=$(cat $env_file | grep ZOOKEEPER_LOCAL_PORT | cut -d'=' -f2) - base_port=$(find_freeport) + base_port=$(find_freeport) # Optional sed -e "s/\${base_port}/$base_port/g" \ -e "s#\${image}#$hstream_image#g" \ -e "s/\${metastore_port}/$zookeeper_port/g" \