From 9053e9b717f3f0cc5e64ca7a2e311a0bef202098 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:06:35 +0800 Subject: [PATCH] minor fixes (#80) --- app/src/main/scala/hstream/server/KafkaBroker.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/app/src/main/scala/hstream/server/KafkaBroker.scala b/app/src/main/scala/hstream/server/KafkaBroker.scala index 6dbf699..fe7610c 100644 --- a/app/src/main/scala/hstream/server/KafkaBroker.scala +++ b/app/src/main/scala/hstream/server/KafkaBroker.scala @@ -101,7 +101,7 @@ object KafkaBroker extends Logging { // status may be different between different nodes'views. This can cause infinite // block in some edge cases (lookup resources). for (config <- configs) { - awaitNode(image, num, config.port, timeout) + awaitNode(image, "hstream-kafka", num, config.port, timeout) } } else if (spec == 2) { // TODO: Theoretically, it is adequate to ask any node to check the cluster status. @@ -109,7 +109,7 @@ object KafkaBroker extends Logging { // status may be different between different nodes'views. This can cause infinite // block in some edge cases (lookup resources). for (config <- configs) { - awaitNode(image, num, config.port, timeout) + awaitNode(image, "hornbill ctl", num, config.port, timeout) } } else { throw new NotImplementedError("awaitCluster: spec is invalid!") @@ -122,7 +122,7 @@ object KafkaBroker extends Logging { ) } - private def awaitNode(cliImage: String, num: Int, port: Int, timeout: Int = 30): Unit = { + private def awaitNode(cliImage: String, cliExe: String, num: Int, port: Int, timeout: Int = 30): Unit = { if (timeout <= 0) { throw new RuntimeException("Failed to start hstream cluster!") } @@ -130,7 +130,7 @@ object KafkaBroker extends Logging { try { // FIXME: better way to check cluster is ready val (_, nodeStatusOutOpt, _) = Utils.runCommand( - s"docker run --rm --network host $cliImage hstream-kafka --port $port node status", + s"docker run --rm --network host $cliImage $cliExe --port $port node status", captureOut = true, check = false ) @@ -151,7 +151,7 @@ object KafkaBroker extends Logging { case e: Exception => { info("=> Waiting broker ready...") Thread.sleep(2000) - awaitNode(cliImage, num, port, timeout - 2) + awaitNode(cliImage, cliExe, num, port, timeout - 2) } } } @@ -235,7 +235,7 @@ class KafkaBroker( if (!Utils.containerExist(metaServerContainerName)) { val metaServerCmd = s"""docker run -d --network host --name $metaServerContainerName -v $storeConfig:$storeConfig:ro - $image hstream-meta-server --host 127.0.0.1 --port $metaServerPort + $image hornbill meta --host 127.0.0.1 --port $metaServerPort --backend $storeConfig """.stripMargin.linesIterator.mkString(" ").trim info(s"=> Start meta server by: $metaServerCmd")