Skip to content

Commit

Permalink
Support testing spec 2 (#75)
Browse files Browse the repository at this point in the history
* improvements

* Support testing spec 2
  • Loading branch information
4eUeP authored Jul 26, 2024
1 parent 4d69ef7 commit 13156a3
Show file tree
Hide file tree
Showing 5 changed files with 466 additions and 237 deletions.
1 change: 1 addition & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ dependencies {
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson"
implementation "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson"
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:$versions.jackson"
implementation "org.yaml:snakeyaml:2.2"

implementation group: 'net.sf.jopt-simple', name: 'jopt-simple', version: '5.0.4'

Expand Down
201 changes: 153 additions & 48 deletions app/src/main/scala/hstream/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import kafka.network.SocketServer

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Success, Try}
import scala.concurrent.duration._
import scala.sys.process._
import scala.util.Random

object Utils {
object Utils extends Logging {
def runCommand(
cmd: String,
check: Boolean = true,
Expand All @@ -42,16 +43,73 @@ object Utils {
}
result
}

def containerExist(name: String): Boolean = {
val (ret, _, _) = Utils.runCommand(
s"bash -c 'docker container inspect $name >/dev/null 2>&1'",
captureOut = false,
check = false
)
ret == 0
}

def waitPort(port: Int, retries: Int = 20): Boolean = {
if (retries <= 0) {
false
} else {
val f = Future {
try {
val socket = new java.net.Socket("127.0.0.1", port)
socket.close()
info("=> The port is open at " + port)
true
} catch {
case e: java.net.ConnectException =>
info("=> Retry to connect to the port " + port)
Thread.sleep(1000)
waitPort(port, retries - 1)
}
}
Await.result(f, 60.second)
}
}
}

object KafkaBroker extends Logging {
def initCluster(port: Int): Unit = {

def awaitCluster(num: Int, configs: scala.collection.Seq[KafkaConfig], timeout: Int = 30): Unit = {
if (configs.isEmpty) {
throw new RuntimeException("No broker configs found!")
}
val initPort = configs.head.port
val spec = configs.head.testingConfig
.getOrElse("spec", throw new IllegalArgumentException("spec is required"))
.asInstanceOf[Int]

if (spec == 1) {
initCluster(initPort)
// TODO: Theoretically, it is adequate to ask any node to check the cluster status.
// However, due to the limitation of the current implementation, the cluster
// status may be different between different nodes'views. This can cause infinite
// block in some edge cases (lookup resources).
for (config <- configs) {
awaitNode(num, config.port, timeout)
}
} else if (spec == 2) {
// TODO
Thread.sleep(5000)
} else {
throw new NotImplementedError("awaitCluster: spec is invalid!")
}
}

private def initCluster(port: Int): Unit = {
Utils.runCommand(
s"docker run --rm --network host hstreamdb/hstream hstream-kafka --port $port node init"
)
}

def awaitCluster(num: Int, port: Int, timeout: Int = 30): Unit = {
private def awaitNode(num: Int, port: Int, timeout: Int = 30): Unit = {
if (timeout <= 0) {
throw new RuntimeException("Failed to start hstream cluster!")
}
Expand Down Expand Up @@ -80,7 +138,7 @@ object KafkaBroker extends Logging {
case e: Exception => {
info("=> Waiting cluster ready...")
Thread.sleep(2000)
awaitCluster(num, port, timeout - 2)
awaitNode(num, port, timeout - 2)
}
}
}
Expand Down Expand Up @@ -123,32 +181,69 @@ class KafkaBroker(
config.testingConfig
.getOrElse("spec", throw new IllegalArgumentException("spec is required"))
.asInstanceOf[Int]
// === common
val command = config.testingConfig
.getOrElse("command", throw new IllegalArgumentException("command is required"))
.asInstanceOf[String]
val image = config.testingConfig
.getOrElse("image", throw new IllegalArgumentException("image is required"))
.asInstanceOf[String]
val extraProps = config.hstreamKafkaBrokerProperties
.map { case (k, v) => s"--prop $k=$v" }
.mkString(" ")
// === spec 1: hstream
if (spec == 1) {
val command = config.testingConfig
.getOrElse("command", throw new IllegalArgumentException("command is required"))
.asInstanceOf[String]
val image = config.testingConfig
.getOrElse("image", throw new IllegalArgumentException("image is required"))
.asInstanceOf[String]
val storeDir = config.testingConfig
.getOrElse("store_dir", throw new IllegalArgumentException("store_dir is required"))
.asInstanceOf[String]
val extraProps = config.hstreamKafkaBrokerProperties
.map { case (k, v) => s"--prop $k=$v" }
.mkString(" ")
val dockerCmd =
s"docker run -d --network host --name $containerName -v $storeDir:/data/store $image $command $extraProps"
info(s"=> Start hserver by: $dockerCmd")
val code = dockerCmd.!
if (code != 0) {
throw new RuntimeException(s"Failed to start broker, exit code: $code")
}
}
// === spec 2: hornbill
else if (spec == 2) {
val storeConfig = config.testingConfig
.getOrElse("store_config", throw new IllegalArgumentException("store_config is required"))
.asInstanceOf[String]
val metaServerPort = config.testingConfig
.getOrElse("metaserver_port", throw new IllegalArgumentException("metaserver_port is required"))
.asInstanceOf[Int]
val metaServerContainerName = config.testingConfig
.getOrElse(
"metaserver_container_name",
throw new IllegalArgumentException("metaserver_container_name is required")
)
.asInstanceOf[String]
// We only start one meta server for all brokers
if (!Utils.containerExist(metaServerContainerName)) {
val metaServerCmd =
s"""docker run -d --network host --name $metaServerContainerName -v $storeConfig:$storeConfig:ro
$image hstream-meta-server --host 127.0.0.1 --port $metaServerPort
--backend $storeConfig
""".stripMargin.linesIterator.mkString(" ").trim
info(s"=> Start meta server by: $metaServerCmd")
Utils.runCommand(metaServerCmd)
}
info("=> Wait for meta server ready...")
val waitMetaRet = Utils.waitPort(metaServerPort)
if (!waitMetaRet) {
throw new RuntimeException("Failed to start meta server!")
}
val hserverCmd =
s"docker run -d --network host --name $containerName -v $storeConfig:$storeConfig:ro $image $command $extraProps"
info(s"=> Start hserver by: $hserverCmd")
Utils.runCommand(hserverCmd)
} else {
throw new NotImplementedError("startup: spec is invalid!")
}
}
}

info("=> Wait for server ready...")
awaitStartup()
}

Expand All @@ -165,27 +260,27 @@ class KafkaBroker(
config.testingConfig
.getOrElse("spec", throw new IllegalArgumentException("spec is required"))
.asInstanceOf[Int]
// === common
// Dump broker container logs
if (
config.testingConfig
.getOrElse("container_logs", throw new IllegalArgumentException("container_logs is required"))
.asInstanceOf[Boolean]
) {
info("=> dump container logs...")
dumpContainerLogs()
}
// Remove broker container
if (
config.testingConfig
.getOrElse("container_remove", throw new IllegalArgumentException("container_remove is required"))
.asInstanceOf[Boolean]
) {
info(s"=> Remove container $containerName...")
s"docker rm -f $containerName".!
}
// === spec 1: hstream
if (spec == 1) {
// Dump broker container logs
if (
config.testingConfig
.getOrElse("container_logs", throw new IllegalArgumentException("container_logs is required"))
.asInstanceOf[Boolean]
) {
info("=> dump container logs...")
dumpContainerLogs()
}

// Remove broker container
if (
config.testingConfig
.getOrElse("container_remove", throw new IllegalArgumentException("container_remove is required"))
.asInstanceOf[Boolean]
) {
info(s"=> Remove container $containerName...")
s"docker rm -f $containerName".!
}

// Delete all logs
val storeAdminPort = config.testingConfig
.getOrElse("store_admin_port", throw new IllegalArgumentException("store_admin_port is required"))
Expand All @@ -207,6 +302,28 @@ class KafkaBroker(
info("=> Delete all zk nodes...")
// Use the same zk version as scripe/dev-tools to avoid pulling new image
s"docker run --rm --network host zookeeper:3.6 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".!
// === spec 2: hornbill
} else if (spec == 2) {
// Remove meta server container
val metaServerContainerName = config.testingConfig
.getOrElse(
"metaserver_container_name",
throw new IllegalArgumentException("metaserver_container_name is required")
)
.asInstanceOf[String]
info("=> Remove meta server container...")
// FIXME: check should be false, because the meta server may be removed by other brokers
Utils.runCommand(s"docker rm -f $metaServerContainerName", check = false)
// Remove storage
val storeConfig = config.testingConfig
.getOrElse("store_config", throw new IllegalArgumentException("store_config is required"))
.asInstanceOf[String]
val storeRmCmd = config.testingConfig
.getOrElse("store_rm_command", throw new IllegalArgumentException("store_rm_command is required"))
.asInstanceOf[String]
.replace("${store_config}", storeConfig)
info("=> Delete all data in storage...")
Utils.runCommand(storeRmCmd)
} else {
throw new NotImplementedError("shutdown: spec is invalid!")
}
Expand All @@ -228,24 +345,12 @@ class KafkaBroker(
// For hstream

private def awaitStartup(retries: Int = 20): Unit = {
if (retries <= 0) {
val port = config.port
val ret = Utils.waitPort(port, retries)
if (!ret) {
s"docker logs $containerName".!
throw new RuntimeException("Failed to start hstream!")
}
val port = config.port
val f = Future {
try {
val socket = new java.net.Socket("127.0.0.1", port)
socket.close()
info("=> The server port is open at " + port)
} catch {
case e: java.net.ConnectException =>
info("=> Retry to connect to the server port " + port)
Thread.sleep(1000)
awaitStartup(retries - 1)
}
}
Await.result(f, 60.second)
}

private def dumpContainerLogs() = {
Expand Down
Loading

0 comments on commit 13156a3

Please sign in to comment.