Skip to content

Commit

Permalink
minor fixes (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Nov 5, 2024
1 parent 9f5b0bf commit 9053e9b
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions app/src/main/scala/hstream/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ object KafkaBroker extends Logging {
// status may be different between different nodes'views. This can cause infinite
// block in some edge cases (lookup resources).
for (config <- configs) {
awaitNode(image, num, config.port, timeout)
awaitNode(image, "hstream-kafka", num, config.port, timeout)
}
} else if (spec == 2) {
// TODO: Theoretically, it is adequate to ask any node to check the cluster status.
// However, due to the limitation of the current implementation, the cluster
// status may be different between different nodes'views. This can cause infinite
// block in some edge cases (lookup resources).
for (config <- configs) {
awaitNode(image, num, config.port, timeout)
awaitNode(image, "hornbill ctl", num, config.port, timeout)
}
} else {
throw new NotImplementedError("awaitCluster: spec is invalid!")
Expand All @@ -122,15 +122,15 @@ object KafkaBroker extends Logging {
)
}

private def awaitNode(cliImage: String, num: Int, port: Int, timeout: Int = 30): Unit = {
private def awaitNode(cliImage: String, cliExe: String, num: Int, port: Int, timeout: Int = 30): Unit = {
if (timeout <= 0) {
throw new RuntimeException("Failed to start hstream cluster!")
}
val f = Future {
try {
// FIXME: better way to check cluster is ready
val (_, nodeStatusOutOpt, _) = Utils.runCommand(
s"docker run --rm --network host $cliImage hstream-kafka --port $port node status",
s"docker run --rm --network host $cliImage $cliExe --port $port node status",
captureOut = true,
check = false
)
Expand All @@ -151,7 +151,7 @@ object KafkaBroker extends Logging {
case e: Exception => {
info("=> Waiting broker ready...")
Thread.sleep(2000)
awaitNode(cliImage, num, port, timeout - 2)
awaitNode(cliImage, cliExe, num, port, timeout - 2)
}
}
}
Expand Down Expand Up @@ -235,7 +235,7 @@ class KafkaBroker(
if (!Utils.containerExist(metaServerContainerName)) {
val metaServerCmd =
s"""docker run -d --network host --name $metaServerContainerName -v $storeConfig:$storeConfig:ro
$image hstream-meta-server --host 127.0.0.1 --port $metaServerPort
$image hornbill meta --host 127.0.0.1 --port $metaServerPort
--backend $storeConfig
""".stripMargin.linesIterator.mkString(" ").trim
info(s"=> Start meta server by: $metaServerCmd")
Expand Down

0 comments on commit 9053e9b

Please sign in to comment.