From 44e6c88e403e4801151165b9179f9272a4475897 Mon Sep 17 00:00:00 2001 From: Max Schmeller <6088931+mojomex@users.noreply.github.com> Date: Tue, 4 Feb 2025 18:06:03 +0900 Subject: [PATCH] test(nebula_ros): fix what is probably a timing issue by explicitly waiting for known-good console output (#260) Signed-off-by: Max SCHMELLER --- nebula_ros/test/smoke_test.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/nebula_ros/test/smoke_test.py b/nebula_ros/test/smoke_test.py index a345c72af..b509ffe59 100644 --- a/nebula_ros/test/smoke_test.py +++ b/nebula_ros/test/smoke_test.py @@ -1,4 +1,3 @@ -import time import unittest from launch import LaunchContext @@ -11,7 +10,6 @@ import launch_testing.actions import launch_testing.asserts import pytest -import rclpy def resolve_launch_file(context: LaunchContext, *args, **kwargs): @@ -29,25 +27,17 @@ def resolve_launch_file(context: LaunchContext, *args, **kwargs): @pytest.mark.launch_test def generate_test_description(): return LaunchDescription( - [OpaqueFunction(function=resolve_launch_file), launch_testing.actions.ReadyToTest()] + [ + OpaqueFunction(function=resolve_launch_file), + launch_testing.actions.ReadyToTest(), + ] ) -class DummyTest(unittest.TestCase): - def test_wait_for_node_ready(self): - """Waiting for the node coming online.""" - rclpy.init() - test_node = rclpy.create_node("test_node") - # Wait until both dummy node "test_node" and real tested node are registered and then kill - # both of them, if tested node does not register within `timeout` seconds test will fail - start_time = time.time() - timeout = 2 # seconds - timeout_msg = "Smoke test timeout has been exceeded ({}s)".format(timeout) - print("waiting for nodes to be ready") - while len(test_node.get_node_names()) < 2: - assert time.time() - start_time < timeout, timeout_msg - time.sleep(0.1) - rclpy.shutdown() +class TestCorrectStartup(unittest.TestCase): + def test_wait_for_startup_then_shutdown(self): + self.proc_output: launch_testing.ActiveIoHandler + self.proc_output.assertWaitFor("Wrapper=OK", timeout=2) @launch_testing.post_shutdown_test()