diff --git a/.github/workflows/ci-base-tests-linux.yml b/.github/workflows/ci-base-tests-linux.yml index 051735dfc5..f6b1e243fd 100644 --- a/.github/workflows/ci-base-tests-linux.yml +++ b/.github/workflows/ci-base-tests-linux.yml @@ -63,6 +63,7 @@ jobs: --ignore=./smarts/core/tests/test_smarts_memory_growth.py \ --ignore=./smarts/core/tests/test_env_frame_rate.py \ --ignore=./smarts/env/tests/test_benchmark.py \ + --ignore=./smarts/core/utils/tests/test_traci_port_acquisition.py \ -k 'not test_long_determinism' examples-rl: diff --git a/.github/workflows/ci-base-tests-mac.yml b/.github/workflows/ci-base-tests-mac.yml index 5290ec769a..f207fca9eb 100644 --- a/.github/workflows/ci-base-tests-mac.yml +++ b/.github/workflows/ci-base-tests-mac.yml @@ -69,4 +69,5 @@ jobs: --ignore=./smarts/core/tests/test_renderers.py \ --ignore=./smarts/core/tests/test_smarts.py \ --ignore=./smarts/core/tests/test_env_frame_rate.py \ - --ignore=./smarts/core/tests/test_observations.py + --ignore=./smarts/core/tests/test_observations.py \ + --ignore=./smarts/core/utils/tests/test_traci_port_acquisition.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c0b7d7c8c7..357a09d609 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Copy and pasting the git commit messages is __NOT__ enough. - The following methods now exist explicitly `Vehicle.{add_sensor|detach_sensor|subscribed_to|sensor_property|}`. - Resources loaded with `load_yaml_config_with_substitution()` now substitute in SMARTS configuration with single squiggle bracket `${}` syntax. This is currently restricted to environment variable names prefixed with `"SMARTS_"`. This extends to benchmark configuration and vehicle configuration. - Default vehicle definitions can be now modified using `assets:default_vehicle_definitions_list`/`SMARTS_ASSSETS_DEFAULT_VEHICLE_DEFINITIONS_LIST` or by providing a `vehicle_definitions_list.yaml` in the scenario. These vehicle types are related to the `AgentInterface.vehicle_type` attribute. +- There is now a centralized `TraCI` server mananger that can be used to prevent port collisions. It can be run using `python smarts.core.utils.sumo_server` and the use of the server can be enabled with `SMARTS_SUMO_TRACI_SERVE_MODE="central"`. ### Changed - `VehicleIndex.build_agent_vehicle()` no longer has `filename` and `surface_patches` parameters. - The following modules have been renamed: `envision.types` -> `envision.etypes`, `smarts.core.utils.logging` -> `smarts.core.utils.core_logging`, `smarts.core.utils.math` -> `smarts.core.utils.core_math`, `smarts.sstudio.types` -> `smarts.sstudio.sstypes`. For compatibility reasons they can still be imported by their original module name. diff --git a/docs/conf.py b/docs/conf.py index 1fb8684439..70bcad2e94 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -42,8 +42,8 @@ "sphinx.ext.viewcode", # link to sourcecode from docs "sphinx_rtd_theme", # Read The Docs theme "sphinx_click", # extract documentation from a `click` application - "sphinxcontrib.apidoc", - "sphinxcontrib.spelling", + "sphinxcontrib.apidoc", # automatically document the API + "sphinxcontrib.spelling", # check documentation for spelling ] extlinks = { diff --git a/docs/ecosystem/sumo.rst b/docs/ecosystem/sumo.rst index 020eb68245..34df393846 100644 --- a/docs/ecosystem/sumo.rst +++ b/docs/ecosystem/sumo.rst @@ -13,6 +13,35 @@ SMARTS currently directly installs SUMO version >=1.15.0 via `pip`. Alternative installation methods, albeit more difficult, are described below. +Centralized TraCI management +---------------------------- +.. _centralized_traci_management: + +With the default behaviour each SMARTS instance will attempt to ask the operating system + for a port to generate a ``TraCI`` server on which can result in cross-connection of SMARTS and ``TraCI`` server instances. + +.. code-block:: bash + + ## console 1 (or in background OR on remote machine) + # Run the centralized sumo port management server. + # Use `export SMARTS_SUMO_CENTRAL_PORT=62232` or `--port=62232` + $ python -m smarts.core.utils.centralized_traci_server + +By setting ``SMARTS_SUMO_TRACI_SERVE_MODE`` to ``"central"`` SMARTS will use the ``TraCI`` management server. + +.. code-block:: bash + + ## console 2 + ## Set environment variable to switch to the server. + # This can also be set in the engine configuration. + $ export SMARTS_SUMO_TRACI_SERVE_MODE=central + ## Optional configuration + # export SMARTS_SUMO_CENTRAL_HOST=localhost + # export SMARTS_SUMO_CENTRAL_PORT=62232 + ## do run + $ python experiment.py + + Package managers ---------------- diff --git a/docs/resources/faq.rst b/docs/resources/faq.rst index 61c20489fd..6453402ddf 100644 --- a/docs/resources/faq.rst +++ b/docs/resources/faq.rst @@ -24,3 +24,6 @@ This is a list of frequently asked questions. Feel free to suggest new entries! # Set xorg server $ sudo wget -O /etc/X11/xorg.conf http://xpra.org/xorg.conf $ sudo /usr/bin/Xorg -noreset +extension GLX +extension RANDR +extension RENDER -logfile ./xdummy.log -config /etc/X11/xorg.conf $DISPLAY & 0 + +4. The simulation keeps crashing on connection in ``SumoTrafficSimulation``. How do I fix this? + This is likely due to using large scale parallelization. You will want to use the centralized management server. See :ref:`centralized_traci_management`. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 7a91088731..ced97e6deb 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -73,6 +73,7 @@ superclass terminateds timestep Todo +TraCI travelled truncateds unassociated diff --git a/examples/e4_environment_config.py b/examples/e4_environment_config.py index f9d6476afb..870e73a422 100644 --- a/examples/e4_environment_config.py +++ b/examples/e4_environment_config.py @@ -8,7 +8,7 @@ from tools.argument_parser import empty_parser from smarts.core.agent_interface import AgentInterface, AgentType -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.env.configs.hiway_env_configs import EnvReturnMode from smarts.env.gymnasium.hiway_env_v1 import HiWayEnvV1 from smarts.env.utils.action_conversion import ActionOptions diff --git a/examples/tools/sumo_multi_clients.py b/examples/tools/sumo_multi_clients.py index b79e83eeed..4b49e27f05 100644 --- a/examples/tools/sumo_multi_clients.py +++ b/examples/tools/sumo_multi_clients.py @@ -3,7 +3,7 @@ import threading import time -from smarts.core.utils.sumo import SUMO_PATH, sumolib, traci +from smarts.core.utils.sumo_utils import SUMO_PATH, sumolib, traci PORT = 8001 diff --git a/smarts/core/bubble_manager.py b/smarts/core/bubble_manager.py index 42f50a05d6..bdf228171c 100644 --- a/smarts/core/bubble_manager.py +++ b/smarts/core/bubble_manager.py @@ -44,7 +44,7 @@ from smarts.core.road_map import RoadMap from smarts.core.utils.cache import cache, clear_cache from smarts.core.utils.id import SocialAgentId -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.core.vehicle import Vehicle from smarts.core.vehicle_index import VehicleIndex from smarts.sstudio.sstypes import BoidAgentActor diff --git a/smarts/core/configuration.py b/smarts/core/configuration.py index 1d1bb83bce..8e381513d4 100644 --- a/smarts/core/configuration.py +++ b/smarts/core/configuration.py @@ -72,6 +72,9 @@ def _convert_truthy(t: str) -> bool: ("ray", "num_gpus"): 0, ("ray", "num_cpus"): None, ("ray", "log_to_driver"): False, + ("sumo", "central_port"): 8619, + ("sumo", "central_host"): "localhost", + ("sumo", "traci_serve_mode"): "local", # local|central } diff --git a/smarts/core/lanepoints.py b/smarts/core/lanepoints.py index 53a02842e0..67f05e20d6 100644 --- a/smarts/core/lanepoints.py +++ b/smarts/core/lanepoints.py @@ -102,7 +102,7 @@ def from_sumo( the network, the result of this function can be used to interpolate lane-points along lanes to the desired granularity. """ - from smarts.core.utils.sumo import sumolib # isort:skip + from smarts.core.utils.sumo_utils import sumolib # isort:skip from sumolib.net.edge import Edge # isort:skip from sumolib.net.lane import Lane # isort:skip from .sumo_road_network import SumoRoadNetwork diff --git a/smarts/core/sumo_road_network.py b/smarts/core/sumo_road_network.py index b01871dd22..a0700acfdc 100644 --- a/smarts/core/sumo_road_network.py +++ b/smarts/core/sumo_road_network.py @@ -45,7 +45,7 @@ from .utils.geometry import buffered_shape from .utils.glb import make_map_glb, make_road_line_glb -from smarts.core.utils.sumo import sumolib # isort:skip +from smarts.core.utils.sumo_utils import sumolib # isort:skip def pairwise(iterable): diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 43d8faf87b..85596d26c3 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -20,10 +20,10 @@ import logging import random -import time import weakref +from functools import partial from pathlib import Path -from typing import Iterable, List, Optional, Sequence, Tuple +from typing import Final, Iterable, List, Optional, Sequence, Tuple import numpy as np from shapely.affinity import rotate as shapely_rotate @@ -45,10 +45,20 @@ from smarts.core.signals import SignalLightState, SignalState from smarts.core.sumo_road_network import SumoRoadNetwork from smarts.core.traffic_provider import TrafficProvider +from smarts.core.utils.centralized_traci_server import spawn_if_not from smarts.core.utils.core_logging import suppress_output from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState -from smarts.core.utils.sumo import traci, TraciConn # isort:skip +NO_CHECKS: Final = 0b00000 + +# isort:skip +from smarts.core.utils.sumo_utils import ( + LocalSumoProcess, + RemoteSumoProcess, + TraciConn, + traci, +) + import traci.constants as tc # isort:skip @@ -125,6 +135,23 @@ def __init__( self._sim = None self._handling_error = False self._traci_retries = traci_retries + # XXX: This is used to try to avoid interrupting other instances in race condition (see GH #2139) + self._foreign_traci_servers: List[TraciConn] = [] + + if ( + self._sumo_port is not None + or (traci_serve_mode := config()("sumo", "traci_serve_mode")) == "local" + ): + self._process_factory = partial(LocalSumoProcess, self._sumo_port) + elif traci_serve_mode == "central": + remote_host = config()("sumo", "central_host") + remote_port = config()("sumo", "central_port", cast=int) + spawn_if_not(remote_host, remote_port) + self._process_factory = partial( + RemoteSumoProcess, + remote_host=remote_host, + remote_port=remote_port, + ) # start with the default recovery flags... self._recovery_flags = super().recovery_flags @@ -199,43 +226,46 @@ def _initialize_traci_conn(self, num_retries=5): self._traci_conn.close_traci_and_pipes() self._traci_conn = None - sumo_port = self._sumo_port sumo_binary = "sumo" if self._headless else "sumo-gui" + sumo_process = self._process_factory() + sumo_process.generate( + base_params=self._base_sumo_load_params(), sumo_binary=sumo_binary + ) self._traci_conn = TraciConn( - sumo_port=sumo_port, - base_params=self._base_sumo_load_params(), - sumo_binary=sumo_binary, + sumo_process=sumo_process, ) try: - while self._traci_conn.viable and not self._traci_conn.connected: - try: - self._traci_conn.connect( - timeout=5, - minimum_traci_version=20, - minimum_sumo_version=(1, 10, 0), - debug=self._debug, - ) - except traci.exceptions.FatalTraCIError: - # Could not connect in time just retry connection - pass + self._traci_conn.connect( + timeout=5, + minimum_traci_version=20, + minimum_sumo_version=(1, 10, 0), + debug=self._debug, + ) + + if not self._traci_conn.connected: + # Save the connection to try to avoid closing it for the other client. + self._foreign_traci_servers.append(self._traci_conn) + self._traci_conn = None + raise traci.exceptions.TraCIException( + "TraCI server was likely taken by other client." + ) + except traci.exceptions.FatalTraCIError: + # Could not connect in time just retry connection + current_retries += 1 + continue except traci.exceptions.TraCIException: # SUMO process died... unsure why this is not a fatal traci error current_retries += 1 - - self._traci_conn.close_traci_and_pipes() continue except ConnectionRefusedError: # Some other process somehow owns the port... sumo needs to be restarted. continue - except OSError: - # TraCI or SUMO version are not at the minimum required version. - raise except KeyboardInterrupt: self._log.debug("Keyboard interrupted TraCI connection.") - self._traci_conn.close_traci_and_pipes() + self._traci_conn.close_traci_and_pipes(wait=False) raise break else: @@ -262,7 +292,7 @@ def _base_sumo_load_params(self): "--net-file=%s" % self._scenario.road_map.source, "--quit-on-end", "--log=%s" % self._log_file, - "--error-log=%s" % self._log_file, + "--error-log=%s.err" % self._log_file, "--no-step-log", "--no-warnings=1", "--seed=%s" % random.randint(0, 2147483648), @@ -298,6 +328,13 @@ def _base_sumo_load_params(self): return load_params + def _restart_sumo(self): + engine_config = config() + traci_retries = self._traci_retries or engine_config( + "sumo", "traci_retries", default=5, cast=int + ) + self._initialize_traci_conn(num_retries=traci_retries) + def setup(self, scenario) -> ProviderState: """Initialize the simulation with a new scenario.""" self._log.debug("Setting up SumoTrafficSim %s", self) @@ -321,24 +358,19 @@ def setup(self, scenario) -> ProviderState: ), "SumoTrafficSimulation requires a SumoRoadNetwork" self._log_file = scenario.unique_sumo_log_file() - if restart_sumo: - try: - engine_config = config() - traci_retries = self._traci_retries or engine_config( - "sumo", "traci_retries", default=5, cast=int - ) - self._initialize_traci_conn(num_retries=traci_retries) - except traci.exceptions.FatalTraCIError: - return ProviderState() - elif self._allow_reload: - assert ( - self._traci_conn is not None - ), "TraCI should be connected at this point." - try: - self._traci_conn.load(self._base_sumo_load_params()) - except traci.exceptions.FatalTraCIError as err: - self._handle_traci_exception(err, actors_relinquishable=False) - return ProviderState() + try: + if restart_sumo: + self._restart_sumo() + elif self._allow_reload: + assert ( + self._traci_conn is not None + ), "TraCI should be connected at this point." + try: + self._traci_conn.load(self._base_sumo_load_params()) + except traci.exceptions.FatalTraCIError: + self._restart_sumo() + except traci.exceptions.FatalTraCIError: + return ProviderState() assert self._traci_conn is not None, "No active TraCI connection" @@ -378,7 +410,7 @@ def _handle_traci_exception( self._handling_error = True if isinstance(error, traci.exceptions.TraCIException): # XXX: Needs further investigation whenever this happens. - self._log.warning("TraCI has provided a warning %s", error) + self._log.debug("TraCI has provided a warning %s", error) return if isinstance(error, traci.exceptions.FatalTraCIError): self._log.error( @@ -435,6 +467,18 @@ def teardown(self): self._remove_vehicles() except traci.exceptions.FatalTraCIError: pass + if not self._allow_reload and self._traci_conn is not None: + self._traci_conn.close_traci_and_pipes() + + for i, trc in reversed( + [ + (j, trc) + for j, trc in enumerate(self._foreign_traci_servers) + if not trc.viable + ] + ): + self._foreign_traci_servers.pop(i) + trc.close_traci_and_pipes(wait=False) self._cumulative_sim_seconds = 0 self._non_sumo_vehicle_ids = set() @@ -566,7 +610,7 @@ def _sync(self, provider_state: ProviderState): VEHICLE_CONFIGS[vehicle_state.vehicle_config_type].dimensions, ) self._create_vehicle(vehicle_id, dimensions, vehicle_state.role) - no_checks = 0b00000 + no_checks = NO_CHECKS self._traci_conn.vehicle.setSpeedMode(vehicle_id, no_checks) # update the state of all current managed vehicles @@ -611,7 +655,7 @@ def _sync(self, provider_state: ProviderState): ) for vehicle_id in vehicles_that_have_become_external: - no_checks = 0b00000 + no_checks = NO_CHECKS self._traci_conn.vehicle.setSpeedMode(vehicle_id, no_checks) self._traci_conn.vehicle.setColor( vehicle_id, SumoTrafficSimulation._color_for_role(ActorRole.SocialAgent) diff --git a/smarts/core/tests/test_sumo_version.py b/smarts/core/tests/test_sumo_version.py index 222c779719..4dddc346af 100644 --- a/smarts/core/tests/test_sumo_version.py +++ b/smarts/core/tests/test_sumo_version.py @@ -25,12 +25,12 @@ def test_sumo_lib(): # import does runtime check by necessity - from smarts.core.utils.sumo import sumolib + from smarts.core.utils.sumo_utils import sumolib def test_sumo_version(): from smarts.core.utils import networking - from smarts.core.utils.sumo import SUMO_PATH, traci + from smarts.core.utils.sumo_utils import SUMO_PATH, traci load_params = [ "--start", diff --git a/smarts/core/tests/test_traffic_simulation.py b/smarts/core/tests/test_traffic_simulation.py index 34973bce72..2f5dc9b54d 100644 --- a/smarts/core/tests/test_traffic_simulation.py +++ b/smarts/core/tests/test_traffic_simulation.py @@ -33,7 +33,7 @@ from smarts.core.scenario import Scenario from smarts.core.smarts import SMARTS from smarts.core.sumo_traffic_simulation import SumoTrafficSimulation -from smarts.core.utils.sumo import traci +from smarts.core.utils.sumo_utils import traci SUMO_PORT = 8082 diff --git a/smarts/core/utils/centralized_traci_server.py b/smarts/core/utils/centralized_traci_server.py new file mode 100644 index 0000000000..814694b902 --- /dev/null +++ b/smarts/core/utils/centralized_traci_server.py @@ -0,0 +1,224 @@ +# MIT License +# +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from __future__ import annotations + +import argparse +import asyncio +import asyncio.streams +import json +import os +import socket +import subprocess +import time +from typing import Optional, Set + +from smarts.core import config +from smarts.core.utils.networking import find_free_port +from smarts.core.utils.sumo_utils import SUMO_PATH + + +class CentralizedTraCIServer: + """A centralized server for handling SUMO instances to prevent race conditions.""" + + def __init__(self, host, port) -> None: + self._host = host + self._port = port + + self._used_ports: Set[int] = set() + self._last_client: float = time.time() + + async def start(self, timeout: Optional[float] = 60.0 * 60.0): + """Start the server.""" + # Create a socket object + server = await asyncio.start_server(self.handle_client, self._host, self._port) + + address = server.sockets[0].getsockname() + print(f"Server listening on `{address = }`") + async with server: + waitable = server.serve_forever() + if timeout is not None: + _timeout_watcher = asyncio.create_task(self._timeout_watcher(timeout)) + waitable = asyncio.gather(waitable, _timeout_watcher) + await waitable + + async def _timeout_watcher(self, timeout: float): + """Closes the server if it is not in use for `timeout` length of time.""" + + while True: + await asyncio.sleep(60) + if time.time() - self._last_client > timeout and len(self._used_ports) == 0: + print(f"Closing because `{timeout=}` was reached.") + loop = asyncio.get_event_loop() + loop.stop() + + async def _process_manager(self, binary, args, f: asyncio.Future): + """Manages the lifecycle of the TraCI server.""" + _sumo_proc = None + _port = None + try: + # Create a new Future object. + while (_port := find_free_port()) in self._used_ports: + pass + self._used_ports.add(_port) + _sumo_proc = await asyncio.create_subprocess_exec( + *[ + os.path.join(SUMO_PATH, "bin", binary), + f"--remote-port={_port}", + ], + *args, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + loop = asyncio.get_event_loop() + future = loop.create_future() + f.set_result( + { + "port": _port, + "future": future, + } + ) + + result = await asyncio.wait_for(future, None) + finally: + if _port is not None: + self._used_ports.discard(_port) + if _sumo_proc is not None and _sumo_proc.returncode is None: + _sumo_proc.kill() + self._last_client = time.time() + + async def handle_client(self, reader, writer): + """Read data from the client.""" + address = writer.get_extra_info("peername") + print(f"Received connection from {address}") + loop = asyncio.get_event_loop() + try: + port = None + _traci_server_info = None + while True: + data = await reader.readline() + message = data.decode("utf-8") + # print(f"Received {message!r} from {addr}") + if message.startswith("sumo"): + kill_f = loop.create_future() + sumo_binary, _, sumo_cmd = message.partition(":") + response_list = json.loads(sumo_cmd) + command_args = [ + *response_list, + ] + asyncio.create_task( + self._process_manager(sumo_binary, command_args, kill_f) + ) + if _traci_server_info is not None: + print("Duplicate start request received.") + continue + _traci_server_info = await asyncio.wait_for(kill_f, None) + port = _traci_server_info["port"] + + response = f"{self._host}:{port}" + print(f"Send TraCI address: {response!r}") + writer.write(response.encode("utf-8")) + + if message.startswith("e:"): + if _traci_server_info is None: + print("Kill received for uninitialized process.") + else: + _traci_server_info["future"].set_result("kill") + break + if len(message) == 0: + break + + # Close the connection + await writer.drain() + writer.close() + except asyncio.CancelledError: + # Handle client disconnect + pass + + +def spawn_if_not(remote_host: str, remote_port: int): + """Create a new server if it does not already exist. + + Args: + remote_host (str): The host name. + remote_port (int): The host port. + """ + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + client_socket.connect((remote_host, remote_port)) + except (OSError): + if remote_host in ("localhost", "127.0.0.1"): + command = [ + "python", + "-m", + __name__, + "--timeout", + "600", + "--port", + remote_port, + ] + + # Use subprocess.Popen to start the process in the background + _ = subprocess.Popen( + command, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + shell=False, + close_fds=True, + ) + + else: + client_socket.close() + + +def main(*_, _host=None, _port=None, timeout: Optional[float] = 10 * 60): + """The program entrypoint.""" + # Define the host and port on which the server will listen + _host = _host or config()( + "sumo", "central_host" + ) # Use '0.0.0.0' to listen on all available interfaces + _port = _port or config()("sumo", "central_port") + + ss = CentralizedTraCIServer(_host, _port) + asyncio.run(ss.start(timeout=timeout)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(__name__) + parser.add_argument( + "--timeout", + help="Duration of time until server shuts down if not in use.", + type=float, + default=None, + ) + parser.add_argument( + "--port", + help="The port to host on.", + type=int, + default=None, + ) + args = parser.parse_args() + + main(_port=args.port, timeout=args.timeout) diff --git a/smarts/core/utils/string.py b/smarts/core/utils/strings.py similarity index 100% rename from smarts/core/utils/string.py rename to smarts/core/utils/strings.py diff --git a/smarts/core/utils/sumo.py b/smarts/core/utils/sumo.py deleted file mode 100644 index 169e660caa..0000000000 --- a/smarts/core/utils/sumo.py +++ /dev/null @@ -1,255 +0,0 @@ -# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -"""Importing this module "redirects" the import to the "real" sumolib. This is available -for convenience and to reduce code duplication as sumolib lives under SUMO_HOME. -""" -from __future__ import annotations - -import functools -import inspect -import logging -import multiprocessing -import os -import subprocess -import sys -from typing import Any, List, Optional, Tuple - -from smarts.core.utils import networking -from smarts.core.utils.core_logging import suppress_output - -try: - import sumo - - SUMO_PATH = sumo.SUMO_HOME - os.environ["SUMO_HOME"] = sumo.SUMO_HOME -except ImportError: - if "SUMO_HOME" not in os.environ: - raise ImportError("SUMO_HOME not set, can't import sumolib") - SUMO_PATH = os.environ["SUMO_HOME"] - -tools_path = os.path.join(SUMO_PATH, "tools") -if tools_path not in sys.path: - sys.path.append(tools_path) - -try: - import sumo.tools.sumolib as sumolib - import sumo.tools.traci as traci -except ModuleNotFoundError as e: - raise ImportError( - "Missing dependencies for SUMO. Install them using the command `pip install -e .[sumo]` at the source directory." - ) from e - - -class DomainWrapper: - """Wraps `traci.Domain` type for the `TraciConn` utility""" - - def __init__(self, sumo_proc, domain: traci.domain.Domain) -> None: - self._domain = domain - self._sumo_proc = sumo_proc - - def __getattr__(self, name: str) -> Any: - attribute = getattr(self._domain, name) - - if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): - attribute = functools.partial( - _wrap_traci_method, method=attribute, sumo_process=self._sumo_proc - ) - - return attribute - - -class TraciConn: - """A simplified utility for connecting to a SUMO process.""" - - def __init__( - self, - sumo_port: Optional[int], - base_params: List[str], - sumo_binary: str = "sumo", # Literal["sumo", "sumo-gui"] - host: str = "localhost", - ): - self._sumo_proc = None - self._traci_conn = None - self._sumo_port = None - self._sumo_version: Tuple[int, ...] = tuple() - self._host = host - self._log = logging.Logger(self.__class__.__name__) - - if sumo_port is None: - sumo_port = networking.find_free_port() - self._sumo_port = sumo_port - sumo_cmd = [ - os.path.join(SUMO_PATH, "bin", sumo_binary), - f"--remote-port={sumo_port}", - *base_params, - ] - - self._log.debug("Starting sumo process:\n\t %s", sumo_cmd) - self._sumo_proc = subprocess.Popen( - sumo_cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - ) - - def __del__(self) -> None: - # We should not raise in delete. - try: - self.close_traci_and_pipes() - except Exception: - pass - - def connect( - self, - timeout: float, - minimum_traci_version: int, - minimum_sumo_version: Tuple[int, ...], - debug: bool = False, - ): - """Attempt a connection with the SUMO process.""" - traci_conn = None - try: - with suppress_output(stderr=not debug, stdout=True): - traci_conn = traci.connect( - self._sumo_port, - host=self._host, - numRetries=max(0, int(20 * timeout)), - proc=self._sumo_proc, - waitBetweenRetries=0.05, - ) # SUMO must be ready within timeout seconds - # We will retry since this is our first sumo command - except traci.exceptions.FatalTraCIError: - self._log.debug("TraCI could not connect in time.") - raise - except traci.exceptions.TraCIException: - self._log.warning("SUMO process died.") - raise - except ConnectionRefusedError: - self._log.warning( - "Connection refused. Tried to connect to an unpaired TraCI client." - ) - raise - - try: - vers, vers_str = traci_conn.getVersion() - if vers < minimum_traci_version: - raise OSError( - f"TraCI API version must be >= {minimum_traci_version}. Got version ({vers})" - ) - self._sumo_version = tuple( - int(v) for v in vers_str.partition(" ")[2].split(".") - ) # e.g. "SUMO 1.11.0" -> (1, 11, 0) - if self._sumo_version < minimum_sumo_version: - raise OSError(f"SUMO version must be >= SUMO {minimum_sumo_version}") - except (traci.exceptions.FatalTraCIError, TypeError) as err: - logging.error( - "TraCI disconnected from '%s:%s', process may have died.", - self._host, - self._sumo_port, - ) - # XXX: the error type is changed to TraCIException to make it consistent with the - # process died case of `traci.connect`. - raise traci.exceptions.TraCIException(err) - except OSError: - self.close_traci_and_pipes() - raise - self._traci_conn = traci_conn - - @property - def connected(self) -> bool: - """Check if the connection is still valid.""" - return self._sumo_proc is not None and self._traci_conn is not None - - @property - def viable(self) -> bool: - """If making a connection to the sumo process is still viable.""" - return self._sumo_proc is not None and self._sumo_proc.poll() is None - - @property - def sumo_version(self) -> Tuple[int, ...]: - """Get the current SUMO version as a tuple.""" - return self._sumo_version - - def __getattr__(self, name: str) -> Any: - if not self.connected: - return None - - attribute = getattr(self._traci_conn, name) - - if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): - attribute = functools.partial( - _wrap_traci_method, method=attribute, sumo_process=self - ) - - if isinstance(attribute, traci.domain.Domain): - attribute = DomainWrapper(sumo_proc=self, domain=attribute) - - return attribute - - def must_reset(self): - """If the version of sumo will have errors if just reloading such that it must be reset.""" - return self._sumo_version > (1, 12, 0) - - def close_traci_and_pipes(self): - """Safely closes all connections. We should expect this method to always work without throwing""" - - def __safe_close(conn): - try: - conn.close() - except (subprocess.SubprocessError, multiprocessing.ProcessError): - # Subprocess or process failed - pass - except traci.exceptions.FatalTraCIError: - # TraCI connection is already dead. - pass - except AttributeError: - # Socket was destroyed internally, likely due to an error. - pass - - if self._traci_conn: - self._log.debug("Closing TraCI connection to %s", self._sumo_port) - __safe_close(self._traci_conn) - - if self._sumo_proc: - __safe_close(self._sumo_proc.stdin) - __safe_close(self._sumo_proc.stdout) - __safe_close(self._sumo_proc.stderr) - self._sumo_proc.kill() - - self._sumo_proc = None - self._traci_conn = None - - def teardown(self): - """Clean up all resources.""" - self.close_traci_and_pipes() - - -def _wrap_traci_method(*args, method, sumo_process: TraciConn, **kwargs): - # Argument order must be `*args` first so `method` and `sumo_process` are keyword only arguments. - try: - return method(*args, **kwargs) - except traci.exceptions.FatalTraCIError: - # TraCI cannot continue - sumo_process.close_traci_and_pipes() - raise - except traci.exceptions.TraCIException: - # Case where TraCI/SUMO can theoretically continue - raise diff --git a/smarts/core/utils/sumo_utils.py b/smarts/core/utils/sumo_utils.py new file mode 100644 index 0000000000..1f806193ea --- /dev/null +++ b/smarts/core/utils/sumo_utils.py @@ -0,0 +1,474 @@ +# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Importing this module "redirects" the import to the "real" sumolib. This is available +for convenience and to reduce code duplication as sumolib lives under SUMO_HOME. +""" +from __future__ import annotations + +import abc +import functools +import inspect +import json +import logging +import multiprocessing +import os +import socket +import subprocess +import sys +import time +from typing import Any, List, Literal, Optional, Tuple + +from smarts.core.utils import networking +from smarts.core.utils.core_logging import suppress_output + +try: + import sumo + + SUMO_PATH = sumo.SUMO_HOME + os.environ["SUMO_HOME"] = sumo.SUMO_HOME +except ImportError: + if "SUMO_HOME" not in os.environ: + raise ImportError("SUMO_HOME not set, can't import sumolib") + SUMO_PATH = os.environ["SUMO_HOME"] + +tools_path = os.path.join(SUMO_PATH, "tools") +if tools_path not in sys.path: + sys.path.append(tools_path) + +try: + import sumo.tools.sumolib as sumolib + import sumo.tools.traci as traci +except ModuleNotFoundError as e: + raise ImportError( + "Missing dependencies for SUMO. Install them using the command `pip install -e .[sumo]` at the source directory." + ) from e + + +def _safe_close(conn, **kwargs): + try: + conn.close(**kwargs) + except (subprocess.SubprocessError, multiprocessing.ProcessError): + # Subprocess or process failed + pass + except traci.exceptions.FatalTraCIError: + # TraCI connection is already dead. + pass + except AttributeError: + # Socket was destroyed internally, likely due to an error. + pass + except Exception as err: + pass + + +class DomainWrapper: + """Wraps `traci.Domain` type for the `TraciConn` utility""" + + def __init__(self, traci_conn, domain: traci.domain.Domain, attribute_name) -> None: + self._domain = domain + self._traci_conn = traci_conn + self._attribute_name = attribute_name + + def __getattr__(self, name: str) -> Any: + attribute = getattr(self._domain, name) + + if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): + attribute = functools.partial( + _wrap_traci_method, + method=attribute, + traci_conn=self._traci_conn, + attribute_name=self._attribute_name, + ) + + return attribute + + +class SumoProcess(metaclass=abc.ABCMeta): + """A simplified utility representing a SUMO process.""" + + @abc.abstractmethod + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + """Generate the process.""" + raise NotImplementedError + + @abc.abstractmethod + def terminate(self, kill: bool): + """Terminate this process.""" + raise NotImplementedError + + @abc.abstractmethod + def poll(self) -> Optional[int]: + """Poll the underlying process.""" + raise NotImplementedError + + @abc.abstractmethod + def wait(self, timeout: Optional[float] = None) -> int: + """Wait on the underlying process.""" + raise NotImplementedError + + @property + @abc.abstractmethod + def port(self) -> int: + """The port this process is associated with.""" + raise NotImplementedError + + @property + @abc.abstractmethod + def host(self) -> str: + """The port this process is associated with.""" + raise NotImplementedError + + +class RemoteSumoProcess(SumoProcess): + """Connects to a sumo server.""" + + def __init__(self, remote_host, remote_port) -> None: + self._remote_host = remote_host + self._remote_port = remote_port + self._port = None + self._host = None + self._client_socket = None + + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Wait on server to start if it needs to. + error = None + for _ in range(5): + try: + client_socket.connect((self._remote_host, self._remote_port)) + except OSError as err: + time.sleep(1) + error = err + continue + break + else: + raise OSError( + f"Unable to connect to server {self._remote_host}:{self._remote_port}. Try running again or running the server using `python -m smarts.core.utils.sumo_server`." + ) from error + + client_socket.send(f"{sumo_binary}:{json.dumps(base_params)}\n".encode("utf-8")) + + self._client_socket = client_socket + + response = client_socket.recv(1024) + self._host, _, port = response.decode("utf-8").partition(":") + self._port = int(port) + + def terminate(self, kill: bool): + self._client_socket.send("e:".encode("utf-8")) + self._client_socket.close() + + @property + def port(self) -> int: + return self._port or 0 + + @property + def host(self) -> str: + return self._host or "-1" + + def poll(self) -> Optional[int]: + return None + + def wait(self, timeout: Optional[float] = None) -> int: + return 0 + + +class LocalSumoProcess(SumoProcess): + """Connects to a local sumo process.""" + + def __init__(self, sumo_port) -> None: + self._sumo_proc = None + self._sumo_port = sumo_port + + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + if self._sumo_port is None: + self._sumo_port = networking.find_free_port() + sumo_cmd = [ + os.path.join(SUMO_PATH, "bin", sumo_binary), + f"--remote-port={self._sumo_port}", + *base_params, + ] + self._sumo_proc = subprocess.Popen( + sumo_cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + + @property + def port(self) -> int: + assert self._sumo_port is not None + return self._sumo_port + + @property + def host(self) -> str: + return "localhost" + + def terminate(self, kill): + if self._sumo_proc: + _safe_close(self._sumo_proc.stdin) + _safe_close(self._sumo_proc.stdout) + _safe_close(self._sumo_proc.stderr) + if kill: + self._sumo_proc.kill() + self._sumo_proc = None + + def poll(self) -> Optional[int]: + return self._sumo_proc.poll() + + def wait(self, timeout=None): + return self._sumo_proc.wait(timeout=timeout) + + +class TraciConn: + """A simplified utility for connecting to a SUMO process.""" + + def __init__( + self, + sumo_process: SumoProcess, + host: str = "localhost", + name: str = "", + ): + self._traci_conn = None + self._sumo_port = None + self._sumo_version: Tuple[int, ...] = tuple() + self._host = host + self._name = name + self._log = logging.Logger(self.__class__.__name__) + self._log = logging + self._connected = False + + self._sumo_process = sumo_process + + def __del__(self) -> None: + # We should not raise in delete. + try: + self.close_traci_and_pipes() + except Exception: + pass + + def connect( + self, + timeout: float, + minimum_traci_version: int, + minimum_sumo_version: Tuple[int, ...], + debug: bool = False, + ): + """Attempt a connection with the SUMO process.""" + traci_conn = None + self._host = self._sumo_process.host + self._sumo_port = self._sumo_process.port + try: + # See if the process is still alive before attempting a connection. + with suppress_output(stderr=not debug, stdout=True): + traci_conn = traci.connect( + self._sumo_process.port, + host=self._sumo_process.host, + numRetries=max(0, int(20 * timeout)), + proc=self._sumo_process, + waitBetweenRetries=0.05, + ) # SUMO must be ready within timeout seconds + # We will retry since this is our first sumo command + except traci.exceptions.FatalTraCIError as err: + self._log.error( + "[%s] TraCI could not connect in time to '%s:%s' [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + # XXX: Actually not fatal... + raise + except traci.exceptions.TraCIException as err: + self._log.error( + "[%s] SUMO process died while trying to connect to '%s:%s' [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + self.close_traci_and_pipes() + raise + except ConnectionRefusedError: + self._log.error( + "[%s] Intended TraCI server '%s:%s' refused connection.", + self._name, + self._host, + self._sumo_port, + ) + self.close_traci_and_pipes() + raise + + self._connected = True + self._traci_conn = traci_conn + try: + if not self.viable: + raise traci.exceptions.TraCIException("TraCI server already finished!?") + vers, vers_str = traci_conn.getVersion() + if vers < minimum_traci_version: + raise ValueError( + f"TraCI API version must be >= {minimum_traci_version}. Got version ({vers})" + ) + self._sumo_version = tuple( + int(v) for v in vers_str.partition(" ")[2].split(".") + ) # e.g. "SUMO 1.11.0" -> (1, 11, 0) + if self._sumo_version < minimum_sumo_version: + raise ValueError(f"SUMO version must be >= SUMO {minimum_sumo_version}") + except traci.exceptions.FatalTraCIError as err: + self._log.error( + "[%s] TraCI disconnected for connection attempt '%s:%s': [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + # XXX: the error type is changed to TraCIException to make it consistent with the + # process died case of `traci.connect`. Since TraCIException is fatal just in this case... + self.close_traci_and_pipes() + raise traci.exceptions.TraCIException(err) + except OSError as err: + self._log.error( + "[%s] OS error occurred for TraCI connection attempt '%s:%s': [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + self.close_traci_and_pipes() + raise traci.exceptions.TraCIException(err) + except ValueError: + self.close_traci_and_pipes() + raise + + @property + def connected(self) -> bool: + """Check if the connection is still valid.""" + return self._sumo_process is not None and self._connected + + @property + def viable(self) -> bool: + """If making a connection to the sumo process is still viable.""" + return self._sumo_process is not None and self._sumo_process.poll() is None + + @property + def sumo_version(self) -> Tuple[int, ...]: + """Get the current SUMO version as a tuple.""" + return self._sumo_version + + @property + def port(self) -> Optional[int]: + """Get the used TraCI port.""" + return self._sumo_port + + @property + def hostname(self) -> str: + """Get the used TraCI port.""" + return self._host + + def __getattr__(self, name: str) -> Any: + if not self.connected: + raise traci.exceptions.FatalTraCIError("TraCI died.") + + attribute = getattr(self._traci_conn, name) + + if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): + attribute = functools.partial( + _wrap_traci_method, + method=attribute, + attribute_name=name, + traci_conn=self, + ) + elif isinstance(attribute, traci.domain.Domain): + attribute = DomainWrapper( + traci_conn=self, domain=attribute, attribute_name=name + ) + else: + raise NotImplementedError() + + return attribute + + def must_reset(self): + """If the version of sumo will have errors if just reloading such that it must be reset.""" + return self._sumo_version > (1, 12, 0) + + def close_traci_and_pipes(self, wait: bool = True, kill: bool = True): + """Safely closes all connections. We should expect this method to always work without throwing""" + + if self._connected: + self._log.debug("Closing TraCI connection to %s", self._sumo_port) + _safe_close(self._traci_conn, wait=wait) + + if self._sumo_process: + self._sumo_process.terminate(kill=kill) + self._log.info( + "Killed TraCI server process '%s:%s'", self._host, self._sumo_port + ) + self._sumo_process = None + + self._connected = False + + def teardown(self): + """Clean up all resources.""" + self.close_traci_and_pipes(True) + self._traci_conn = None + + +def _wrap_traci_method( + *args, method, traci_conn: TraciConn, attribute_name: str, **kwargs +): + # Argument order must be `*args` first so `method` and `sumo_process` are keyword only arguments. + try: + return method(*args, **kwargs) + except traci.exceptions.FatalTraCIError as err: + logging.error( + "[%s] TraCI '%s:%s' disconnected for call '%s', process may have died: [%s]", + traci_conn._name, + traci_conn.hostname, + traci_conn.port, + attribute_name, + err, + ) + # TraCI cannot continue + traci_conn.close_traci_and_pipes() + raise traci.exceptions.FatalTraCIError("TraCI died.") from err + except OSError as err: + logging.error( + "[%s] OS error occurred for TraCI '%s:%s' call '%s': [%s]", + traci_conn._name, + traci_conn.hostname, + traci_conn.port, + attribute_name, + err, + ) + traci_conn.close_traci_and_pipes() + raise OSError("Connection dropped.") from err + except traci.exceptions.TraCIException as err: + # Case where TraCI/SUMO can theoretically continue + raise traci.exceptions.TraCIException("TraCI can continue.") from err + except KeyboardInterrupt: + traci_conn.close_traci_and_pipes(wait=False) + raise diff --git a/smarts/core/utils/tests/test_traci_port_acquisition.py b/smarts/core/utils/tests/test_traci_port_acquisition.py new file mode 100644 index 0000000000..a541a9c5c3 --- /dev/null +++ b/smarts/core/utils/tests/test_traci_port_acquisition.py @@ -0,0 +1,141 @@ +# MIT License +# +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +from __future__ import annotations + +import logging +import multiprocessing +import os +import random +import time +from multiprocessing.pool import AsyncResult +from typing import List, Tuple + +from smarts.core import config +from smarts.core.utils.file import make_dir_in_smarts_log_dir +from smarts.core.utils.sumo_utils import RemoteSumoProcess, TraciConn + +load_params = [ + "--net-file=%s" % "./scenarios/sumo/loop/map.net.xml", + "--quit-on-end", + "--no-step-log", + "--no-warnings=1", + "--seed=%s" % random.randint(0, 2147483648), + "--time-to-teleport=%s" % -1, + "--collision.check-junctions=true", + "--collision.action=none", + "--lanechange.duration=3.0", + # TODO: 1--lanechange.duration1 or 1--lateral-resolution`, in combination with `route_id`, + # causes lane change crashes as of SUMO 1.6.0. + # Controlling vehicles that have been added to the simulation with a route causes + # lane change related crashes. + # "--lateral-resolution=100", # smooth lane changes + "--step-length=%f" % 0.1, + "--default.action-step-length=%f" % 0.1, + "--begin=0", # start simulation at time=0 + "--end=31536000", # keep the simulation running for a year + "--start", +] + +MAX_PARALLEL = 32 +ITERATIONS = 60000 # 64512 ports available by Ubuntu standard +LOGGING_STEP = 1000 + + +def run_client(t): + conn = None + try: + f = os.path.abspath(make_dir_in_smarts_log_dir("_sumo_run_logs")) + f"/{t}" + lsp = RemoteSumoProcess( + remote_host=config()("sumo", "central_host"), + remote_port=config()("sumo", "central_port", cast=int), + ) + lsp.generate( + base_params=load_params + + [ + "--log=%s.log" % f, + "--message-log=%s" % f, + "--error-log=%s.err" % f, + ], + sumo_binary="sumo", + ) + conn = TraciConn( + sumo_process=lsp, + name=f"Client@{t}", + ) + conn.connect( + timeout=5, + minimum_traci_version=20, + minimum_sumo_version=(1, 10, 0), + ) + time.sleep(0.1) + conn.getVersion() + except KeyboardInterrupt: + if conn is not None: + conn.close_traci_and_pipes(False) + raise + except Exception as err: + logging.error("Primary occurred. [%s]", err) + logging.exception(err) + raise + finally: + # try: + # conn.close_traci_and_pipes() + # except Exception as err: + # logging.error("Secondary occurred. [%s]", err) + diff = time.time() - t + if diff > 9: + logging.error("Client took %ss to close", diff) + if conn is not None: + conn.teardown() + + +def test_traffic_sim_with_multi_client(): + with multiprocessing.Pool(processes=MAX_PARALLEL) as pool: + clients: List[Tuple[AsyncResult, float]] = [] + start = time.time() + # Attempt to run out of ports. + for i in range(ITERATIONS): + while len(clients) > MAX_PARALLEL: + for j, (c, t) in reversed( + [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] + ): + clients.pop(j) + current = time.time() + if i % LOGGING_STEP == 0: + logging.error("Working on %s at %ss", i, current - start) + clients.append((pool.apply_async(run_client, args=(current,)), current)) + + for j, (c, t) in reversed( + [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] + ): + clients.pop(j) + logging.error("Popping remaining ready clients %s", t) + + for (c, t) in clients: + if time.time() - t > 0.2: + logging.error("Stuck clients %s", t) + + logging.error("Finished") + pool.close() + logging.error("Closed") + pool.join() + logging.error("Joined") diff --git a/smarts/core/vehicle_index.py b/smarts/core/vehicle_index.py index bf08c406da..8c4f954d9f 100644 --- a/smarts/core/vehicle_index.py +++ b/smarts/core/vehicle_index.py @@ -52,7 +52,7 @@ from smarts.core.coordinates import Dimensions, Pose from smarts.core.utils import resources from smarts.core.utils.cache import cache, clear_cache -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.core.vehicle_state import VEHICLE_CONFIGS from .actor import ActorRole diff --git a/smarts/sstudio/generators.py b/smarts/sstudio/generators.py index 8adbe21269..f022ed4008 100644 --- a/smarts/sstudio/generators.py +++ b/smarts/sstudio/generators.py @@ -184,7 +184,7 @@ def plan_and_save( log_path = f"{self._log_dir}/{scenario_name}" os.makedirs(log_path, exist_ok=True) - from smarts.core.utils.sumo import sumolib + from smarts.core.utils.sumo_utils import sumolib int32_limits = np.iinfo(np.int32) duarouter_path = sumolib.checkBinary("duarouter")