Skip to content

Commit

Permalink
improvements (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Jul 26, 2024
1 parent 13156a3 commit 7381b37
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
7 changes: 0 additions & 7 deletions .github/actions/run-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ runs:
java-version: ${{ inputs.java-version }}
cache: "gradle"

- name: Start required services
if: ${{ inputs.is_lagacy_tests != 'true' }}
shell: bash
run: |
cd ${{ inputs.path }}
./script/dev-tools start-services --services store zookeeper
- name: Run lagacy tests
if: ${{ inputs.is_lagacy_tests == 'true' }}
shell: bash
Expand Down
22 changes: 15 additions & 7 deletions app/src/main/scala/hstream/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ object KafkaBroker extends Logging {
throw new RuntimeException("No broker configs found!")
}
val initPort = configs.head.port
val image = configs.head.testingConfig
.getOrElse("image", throw new IllegalArgumentException("image is required"))
.asInstanceOf[String]
val spec = configs.head.testingConfig
.getOrElse("spec", throw new IllegalArgumentException("spec is required"))
.asInstanceOf[Int]
Expand All @@ -93,11 +96,16 @@ object KafkaBroker extends Logging {
// status may be different between different nodes'views. This can cause infinite
// block in some edge cases (lookup resources).
for (config <- configs) {
awaitNode(num, config.port, timeout)
awaitNode(image, num, config.port, timeout)
}
} else if (spec == 2) {
// TODO
Thread.sleep(5000)
// TODO: Theoretically, it is adequate to ask any node to check the cluster status.
// However, due to the limitation of the current implementation, the cluster
// status may be different between different nodes'views. This can cause infinite
// block in some edge cases (lookup resources).
for (config <- configs) {
awaitNode(image, num, config.port, timeout)
}
} else {
throw new NotImplementedError("awaitCluster: spec is invalid!")
}
Expand All @@ -109,15 +117,15 @@ object KafkaBroker extends Logging {
)
}

private def awaitNode(num: Int, port: Int, timeout: Int = 30): Unit = {
private def awaitNode(cliImage: String, num: Int, port: Int, timeout: Int = 30): Unit = {
if (timeout <= 0) {
throw new RuntimeException("Failed to start hstream cluster!")
}
val f = Future {
try {
// FIXME: better way to check cluster is ready
val (_, nodeStatusOutOpt, _) = Utils.runCommand(
s"docker run --rm --network host hstreamdb/hstream hstream-kafka --port $port node status",
s"docker run --rm --network host $cliImage hstream-kafka --port $port node status",
captureOut = true,
check = false
)
Expand All @@ -136,9 +144,9 @@ object KafkaBroker extends Logging {
}
} catch {
case e: Exception => {
info("=> Waiting cluster ready...")
info("=> Waiting broker ready...")
Thread.sleep(2000)
awaitNode(num, port, timeout - 2)
awaitNode(cliImage, num, port, timeout - 2)
}
}
}
Expand Down
24 changes: 18 additions & 6 deletions script/run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -e

hstream_image=${hstream_image:-hstreamdb/hstream:latest}
CONFIG_FILE=$PWD/local-data/config.yaml
CONFIG_FILE=${CONFIG_FILE:-$PWD/local-data/config.yaml}

find_freeport() {
echo $(python3 -c '
Expand All @@ -12,11 +12,22 @@ with socketserver.TCPServer(("127.0.0.1", 0), None) as s:
')
}

ENV_FILE="local-data/dev_tools.env"

start_required_services() {
if [ -f $ENV_FILE ]; then
echo "Ignore starting services..."
else
echo "Starting required services..."
./script/dev-tools start-services --services store zookeeper
fi
}

generate_config() {
env_file="local-data/dev_tools.env"
if [ -f $env_file ]; then
store_admin_port=$(cat $env_file | grep STORE_ADMIN_LOCAL_PORT | cut -d'=' -f2)
zookeeper_port=$(cat $env_file | grep ZOOKEEPER_LOCAL_PORT | cut -d'=' -f2)
if [ -f $ENV_FILE ]; then
echo "Generating config file..."
store_admin_port=$(cat $ENV_FILE | grep STORE_ADMIN_LOCAL_PORT | cut -d'=' -f2)
zookeeper_port=$(cat $ENV_FILE | grep ZOOKEEPER_LOCAL_PORT | cut -d'=' -f2)
base_port=$(find_freeport) # Optional
sed -e "s/\${base_port}/$base_port/g" \
-e "s#\${image}#$hstream_image#g" \
Expand All @@ -25,7 +36,7 @@ generate_config() {
-e "s/\${store_admin_port}/$store_admin_port/g" \
script/config.yaml.tmpl > $CONFIG_FILE
else
echo "Run './script/dev-tools start-services --services store zookeeper' first!"
echo "Error: $ENV_FILE not found. Please run 'start_required_services' first."
exit 1
fi
}
Expand All @@ -34,6 +45,7 @@ if [ -f $CONFIG_FILE ]; then
echo "Config file found"
else
echo "Config file not found, generating one..."
start_required_services
generate_config
fi

Expand Down

0 comments on commit 7381b37

Please sign in to comment.