From 5c41b8b0581382c436ff38d82e04180166fa1838 Mon Sep 17 00:00:00 2001 From: Tucker Date: Sat, 20 Jan 2024 02:37:05 -0500 Subject: [PATCH 01/17] Stress test traci connections. --- .github/workflows/ci-base-tests-linux.yml | 1 + .github/workflows/ci-base-tests-mac.yml | 3 +- smarts/core/sumo_traffic_simulation.py | 11 +- smarts/core/utils/sumo.py | 186 ++++++++++++++---- .../tests/test_traci_port_acquisition.py | 115 +++++++++++ 5 files changed, 269 insertions(+), 47 deletions(-) create mode 100644 smarts/core/utils/tests/test_traci_port_acquisition.py diff --git a/.github/workflows/ci-base-tests-linux.yml b/.github/workflows/ci-base-tests-linux.yml index 051735dfc5..f6b1e243fd 100644 --- a/.github/workflows/ci-base-tests-linux.yml +++ b/.github/workflows/ci-base-tests-linux.yml @@ -63,6 +63,7 @@ jobs: --ignore=./smarts/core/tests/test_smarts_memory_growth.py \ --ignore=./smarts/core/tests/test_env_frame_rate.py \ --ignore=./smarts/env/tests/test_benchmark.py \ + --ignore=./smarts/core/utils/tests/test_traci_port_acquisition.py \ -k 'not test_long_determinism' examples-rl: diff --git a/.github/workflows/ci-base-tests-mac.yml b/.github/workflows/ci-base-tests-mac.yml index 5290ec769a..f207fca9eb 100644 --- a/.github/workflows/ci-base-tests-mac.yml +++ b/.github/workflows/ci-base-tests-mac.yml @@ -69,4 +69,5 @@ jobs: --ignore=./smarts/core/tests/test_renderers.py \ --ignore=./smarts/core/tests/test_smarts.py \ --ignore=./smarts/core/tests/test_env_frame_rate.py \ - --ignore=./smarts/core/tests/test_observations.py + --ignore=./smarts/core/tests/test_observations.py \ + --ignore=./smarts/core/utils/tests/test_traci_port_acquisition.py diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 43d8faf87b..76cf6921a6 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -209,7 +209,7 @@ def _initialize_traci_conn(self, num_retries=5): ) try: - while self._traci_conn.viable and not self._traci_conn.connected: + while not self._traci_conn.connected: try: self._traci_conn.connect( timeout=5, @@ -224,15 +224,10 @@ def _initialize_traci_conn(self, num_retries=5): except traci.exceptions.TraCIException: # SUMO process died... unsure why this is not a fatal traci error current_retries += 1 - - self._traci_conn.close_traci_and_pipes() continue except ConnectionRefusedError: # Some other process somehow owns the port... sumo needs to be restarted. continue - except OSError: - # TraCI or SUMO version are not at the minimum required version. - raise except KeyboardInterrupt: self._log.debug("Keyboard interrupted TraCI connection.") self._traci_conn.close_traci_and_pipes() @@ -378,7 +373,7 @@ def _handle_traci_exception( self._handling_error = True if isinstance(error, traci.exceptions.TraCIException): # XXX: Needs further investigation whenever this happens. - self._log.warning("TraCI has provided a warning %s", error) + self._log.debug("TraCI has provided a warning %s", error) return if isinstance(error, traci.exceptions.FatalTraCIError): self._log.error( @@ -435,6 +430,8 @@ def teardown(self): self._remove_vehicles() except traci.exceptions.FatalTraCIError: pass + if self._traci_conn is not None: + self._traci_conn.close_traci_and_pipes() self._cumulative_sim_seconds = 0 self._non_sumo_vehicle_ids = set() diff --git a/smarts/core/utils/sumo.py b/smarts/core/utils/sumo.py index 169e660caa..263dda72ec 100644 --- a/smarts/core/utils/sumo.py +++ b/smarts/core/utils/sumo.py @@ -29,7 +29,7 @@ import os import subprocess import sys -from typing import Any, List, Optional, Tuple +from typing import Any, List, Literal, Optional, Tuple from smarts.core.utils import networking from smarts.core.utils.core_logging import suppress_output @@ -60,16 +60,20 @@ class DomainWrapper: """Wraps `traci.Domain` type for the `TraciConn` utility""" - def __init__(self, sumo_proc, domain: traci.domain.Domain) -> None: + def __init__(self, traci_conn, domain: traci.domain.Domain, attribute_name) -> None: self._domain = domain - self._sumo_proc = sumo_proc + self._traci_conn = traci_conn + self._attribute_name = attribute_name def __getattr__(self, name: str) -> Any: attribute = getattr(self._domain, name) if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): attribute = functools.partial( - _wrap_traci_method, method=attribute, sumo_process=self._sumo_proc + _wrap_traci_method, + method=attribute, + traci_conn=self._traci_conn, + attribute_name=self._attribute_name, ) return attribute @@ -82,15 +86,22 @@ def __init__( self, sumo_port: Optional[int], base_params: List[str], - sumo_binary: str = "sumo", # Literal["sumo", "sumo-gui"] + sumo_binary: Literal[ + "sumo", "sumo-gui" + ] = "sumo", # Literal["sumo", "sumo-gui"] host: str = "localhost", + name: str = "", ): self._sumo_proc = None self._traci_conn = None self._sumo_port = None self._sumo_version: Tuple[int, ...] = tuple() self._host = host + self._name = name + # self._log = logging self._log = logging.Logger(self.__class__.__name__) + # self._log.setLevel(logging.ERROR) + self._connected = False if sumo_port is None: sumo_port = networking.find_free_port() @@ -127,6 +138,12 @@ def connect( """Attempt a connection with the SUMO process.""" traci_conn = None try: + # See if the process is still alive before attempting a connection. + if self._sumo_proc.poll() is not None: + raise traci.exceptions.TraCIException( + "TraCI server already finished before connection!!!" + ) + with suppress_output(stderr=not debug, stdout=True): traci_conn = traci.connect( self._sumo_port, @@ -136,47 +153,78 @@ def connect( waitBetweenRetries=0.05, ) # SUMO must be ready within timeout seconds # We will retry since this is our first sumo command - except traci.exceptions.FatalTraCIError: - self._log.debug("TraCI could not connect in time.") + except traci.exceptions.FatalTraCIError as err: + self._log.error( + "[%s] TraCI could not connect in time to '%s:%s' [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + # XXX: Actually not fatal... raise - except traci.exceptions.TraCIException: - self._log.warning("SUMO process died.") + except traci.exceptions.TraCIException as err: + self._log.error( + "[%s] SUMO process died while trying to connect to '%s:%s' [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + self.close_traci_and_pipes(kill=True) raise except ConnectionRefusedError: - self._log.warning( - "Connection refused. Tried to connect to an unpaired TraCI client." + self._log.error( + "[%s] Intended TraCI server '%s:%s' refused connection.", + self._name, + self._host, + self._sumo_port, ) + self.close_traci_and_pipes(kill=True) raise - + self._connected = True + self._traci_conn = traci_conn try: vers, vers_str = traci_conn.getVersion() if vers < minimum_traci_version: - raise OSError( + raise ValueError( f"TraCI API version must be >= {minimum_traci_version}. Got version ({vers})" ) self._sumo_version = tuple( int(v) for v in vers_str.partition(" ")[2].split(".") ) # e.g. "SUMO 1.11.0" -> (1, 11, 0) if self._sumo_version < minimum_sumo_version: - raise OSError(f"SUMO version must be >= SUMO {minimum_sumo_version}") - except (traci.exceptions.FatalTraCIError, TypeError) as err: - logging.error( - "TraCI disconnected from '%s:%s', process may have died.", + raise ValueError(f"SUMO version must be >= SUMO {minimum_sumo_version}") + except (traci.exceptions.FatalTraCIError) as err: + self._log.error( + "[%s] TraCI disconnected for connection attempt '%s:%s': [%s]", + self._name, self._host, self._sumo_port, + err, ) # XXX: the error type is changed to TraCIException to make it consistent with the - # process died case of `traci.connect`. + # process died case of `traci.connect`. Since TraCIException is fatal just in this case... + self.close_traci_and_pipes(kill=True) + raise traci.exceptions.TraCIException(err) + except OSError as err: + self._log.error( + "[%s] OS error occurred for TraCI connection attempt '%s:%s': [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + self.close_traci_and_pipes(kill=True) raise traci.exceptions.TraCIException(err) - except OSError: + except ValueError: self.close_traci_and_pipes() raise - self._traci_conn = traci_conn @property def connected(self) -> bool: """Check if the connection is still valid.""" - return self._sumo_proc is not None and self._traci_conn is not None + return self._sumo_proc is not None and self._connected @property def viable(self) -> bool: @@ -188,19 +236,35 @@ def sumo_version(self) -> Tuple[int, ...]: """Get the current SUMO version as a tuple.""" return self._sumo_version + @property + def port(self) -> Optional[int]: + """Get the used TraCI port.""" + return self._sumo_port + + @property + def hostname(self) -> Optional[int]: + """Get the used TraCI port.""" + return self._host + def __getattr__(self, name: str) -> Any: if not self.connected: - return None + raise traci.exceptions.FatalTraCIError("TraCI died.") attribute = getattr(self._traci_conn, name) if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): attribute = functools.partial( - _wrap_traci_method, method=attribute, sumo_process=self + _wrap_traci_method, + method=attribute, + attribute_name=name, + traci_conn=self, ) - - if isinstance(attribute, traci.domain.Domain): - attribute = DomainWrapper(sumo_proc=self, domain=attribute) + elif isinstance(attribute, traci.domain.Domain): + attribute = DomainWrapper( + traci_conn=self, domain=attribute, attribute_name=name + ) + else: + raise NotImplementedError() return attribute @@ -208,12 +272,15 @@ def must_reset(self): """If the version of sumo will have errors if just reloading such that it must be reset.""" return self._sumo_version > (1, 12, 0) - def close_traci_and_pipes(self): + def close_traci_and_pipes(self, wait: Optional[float] = 0, kill: bool = False): """Safely closes all connections. We should expect this method to always work without throwing""" + assert wait is None or isinstance(wait, (int, float)) + if isinstance(wait, (int, float)): + wait = max(0.0, wait) - def __safe_close(conn): + def __safe_close(conn, **kwargs): try: - conn.close() + conn.close(**kwargs) except (subprocess.SubprocessError, multiprocessing.ProcessError): # Subprocess or process failed pass @@ -223,33 +290,74 @@ def __safe_close(conn): except AttributeError: # Socket was destroyed internally, likely due to an error. pass + except Exception as err: + self._log.error("Different error occurred: [%s]", err) - if self._traci_conn: + if self._connected: self._log.debug("Closing TraCI connection to %s", self._sumo_port) - __safe_close(self._traci_conn) + __safe_close(self._traci_conn, wait=bool(wait)) if self._sumo_proc: __safe_close(self._sumo_proc.stdin) __safe_close(self._sumo_proc.stdout) __safe_close(self._sumo_proc.stderr) - self._sumo_proc.kill() + if wait: + try: + self._sumo_proc.wait(timeout=wait) + except subprocess.TimeoutExpired as err: + kill = True + self._log.error( + "TraCI server process shutdown timed out '%s:%s' [%s]", + self._host, + self._sumo_port, + err, + ) + if kill: + self._sumo_proc.kill() + self._sumo_proc = None + self._log.error( + "Killed TraCI server process '%s:%s", self._host, self._sumo_port + ) - self._sumo_proc = None - self._traci_conn = None + self._connected = False def teardown(self): """Clean up all resources.""" self.close_traci_and_pipes() -def _wrap_traci_method(*args, method, sumo_process: TraciConn, **kwargs): +def _wrap_traci_method( + *args, method, traci_conn: TraciConn, attribute_name: str, **kwargs +): # Argument order must be `*args` first so `method` and `sumo_process` are keyword only arguments. try: return method(*args, **kwargs) - except traci.exceptions.FatalTraCIError: + except traci.exceptions.FatalTraCIError as err: + logging.error( + "[%s] TraCI '%s:%s' disconnected for call '%s', process may have died: [%s]", + traci_conn._name, + traci_conn.hostname, + traci_conn.port, + attribute_name, + err, + ) # TraCI cannot continue - sumo_process.close_traci_and_pipes() - raise - except traci.exceptions.TraCIException: + traci_conn.close_traci_and_pipes(kill=True) + raise traci.exceptions.FatalTraCIError("TraCI died.") from err + except OSError as err: + logging.error( + "[%s] OS error occurred for TraCI '%s:%s' call '%s': [%s]", + traci_conn._name, + traci_conn.hostname, + traci_conn.port, + attribute_name, + err, + ) + traci_conn.close_traci_and_pipes(kill=True) + raise OSError("Connection dropped.") from err + except traci.exceptions.TraCIException as err: # Case where TraCI/SUMO can theoretically continue + raise traci.exceptions.TraCIException("TraCI can continue.") from err + except KeyboardInterrupt: + traci_conn.close_traci_and_pipes(kill=True) raise diff --git a/smarts/core/utils/tests/test_traci_port_acquisition.py b/smarts/core/utils/tests/test_traci_port_acquisition.py new file mode 100644 index 0000000000..0d581468c3 --- /dev/null +++ b/smarts/core/utils/tests/test_traci_port_acquisition.py @@ -0,0 +1,115 @@ +# MIT License +# +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +from __future__ import annotations + +import logging +import multiprocessing +import random +import time +from multiprocessing.pool import AsyncResult +from typing import List, Tuple + +from smarts.core.utils.sumo import TraciConn + +load_params = [ + "--net-file=%s" % "./scenarios/sumo/loop/map.net.xml", + "--quit-on-end", + "--no-step-log", + "--no-warnings=1", + "--seed=%s" % random.randint(0, 2147483648), + "--time-to-teleport=%s" % -1, + "--collision.check-junctions=true", + "--collision.action=none", + "--lanechange.duration=3.0", + # TODO: 1--lanechange.duration1 or 1--lateral-resolution`, in combination with `route_id`, + # causes lane change crashes as of SUMO 1.6.0. + # Controlling vehicles that have been added to the simulation with a route causes + # lane change related crashes. + # "--lateral-resolution=100", # smooth lane changes + "--step-length=%f" % 0.1, + "--default.action-step-length=%f" % 0.1, + "--begin=0", # start simulation at time=0 + "--end=31536000", # keep the simulation running for a year + "--start", +] + +MAX_PARALLEL = 32 +ITERATIONS = 800000 # 64512 ports available by Ubuntu standard +LOGGING_STEP = 1000 + + +def run_client(t): + conn = TraciConn(None, load_params, sumo_binary="sumo", name=f"Client@{t}") + try: + conn.connect( + timeout=5, + minimum_traci_version=20, + minimum_sumo_version=(1, 10, 0), + ) + time.sleep(0.1) + conn.getVersion() + except KeyboardInterrupt: + conn.close_traci_and_pipes() + raise + except Exception as err: + logging.error("Primary occurred. [%s]", err) + raise + finally: + # try: + # conn.close_traci_and_pipes() + # except Exception as err: + # logging.error("Secondary occurred. [%s]", err) + diff = time.time() - t + if diff > 9: + logging.error("Client took %ss to close", diff) + + +def test_traffic_sim_with_multi_client(): + pool = multiprocessing.Pool(processes=MAX_PARALLEL) + clients: List[Tuple[AsyncResult, float]] = [] + start = time.time() + # Attempt to run out of ports. + for i in range(ITERATIONS): + while len(clients) > MAX_PARALLEL: + for j, (c, t) in reversed( + [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] + ): + clients.pop(j) + current = time.time() + if i % LOGGING_STEP == 0: + logging.error("Working on %s at %ss", i, current - start) + clients.append((pool.apply_async(run_client, args=(current,)), current)) + + for j, (c, t) in reversed( + [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] + ): + clients.pop(j) + logging.error("Popping remaining ready clients %s", t) + + for (c, t) in clients: + logging.error("Stuck clients %s", t) + + logging.error("Finished") + pool.close() + logging.error("Closed") + pool.join() + logging.error("Joined") From f2e292a3ee8a11bf3fffe12ef752cf1d1bab8f3a Mon Sep 17 00:00:00 2001 From: Tucker Date: Sat, 20 Jan 2024 02:48:59 -0500 Subject: [PATCH 02/17] Fix type error. --- smarts/core/utils/sumo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smarts/core/utils/sumo.py b/smarts/core/utils/sumo.py index 263dda72ec..2f162c295a 100644 --- a/smarts/core/utils/sumo.py +++ b/smarts/core/utils/sumo.py @@ -242,7 +242,7 @@ def port(self) -> Optional[int]: return self._sumo_port @property - def hostname(self) -> Optional[int]: + def hostname(self) -> str: """Get the used TraCI port.""" return self._host From 0930736ce8904438947603ec148009d8218b84f9 Mon Sep 17 00:00:00 2001 From: Tucker Date: Mon, 22 Jan 2024 14:01:41 -0500 Subject: [PATCH 03/17] Attempt to patch #2139. --- smarts/core/sumo_traffic_simulation.py | 85 ++++++++++++++++---------- smarts/core/utils/sumo.py | 38 ++++-------- 2 files changed, 64 insertions(+), 59 deletions(-) diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 76cf6921a6..7ff882267b 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -125,6 +125,8 @@ def __init__( self._sim = None self._handling_error = False self._traci_retries = traci_retries + # XXX: This is used to try to avoid interrupting other instances in race condition (see GH #2139) + self._foreign_traci_servers: List[TraciConn] = [] # start with the default recovery flags... self._recovery_flags = super().recovery_flags @@ -209,18 +211,25 @@ def _initialize_traci_conn(self, num_retries=5): ) try: - while not self._traci_conn.connected: - try: - self._traci_conn.connect( - timeout=5, - minimum_traci_version=20, - minimum_sumo_version=(1, 10, 0), - debug=self._debug, - ) - except traci.exceptions.FatalTraCIError: - # Could not connect in time just retry connection - pass + self._traci_conn.connect( + timeout=5, + minimum_traci_version=20, + minimum_sumo_version=(1, 10, 0), + debug=self._debug, + ) + + if not self._traci_conn.connected: + # Save the connection to try to avoid closing it for the other client. + self._foreign_traci_servers.append(self._traci_conn) + self._traci_conn = None + raise traci.exceptions.TraCIException( + "TraCI server was likely taken by other client." + ) + except traci.exceptions.FatalTraCIError: + # Could not connect in time just retry connection + current_retries += 1 + continue except traci.exceptions.TraCIException: # SUMO process died... unsure why this is not a fatal traci error current_retries += 1 @@ -230,7 +239,7 @@ def _initialize_traci_conn(self, num_retries=5): continue except KeyboardInterrupt: self._log.debug("Keyboard interrupted TraCI connection.") - self._traci_conn.close_traci_and_pipes() + self._traci_conn.close_traci_and_pipes(wait=False) raise break else: @@ -257,7 +266,7 @@ def _base_sumo_load_params(self): "--net-file=%s" % self._scenario.road_map.source, "--quit-on-end", "--log=%s" % self._log_file, - "--error-log=%s" % self._log_file, + "--error-log=%s.err" % self._log_file, "--no-step-log", "--no-warnings=1", "--seed=%s" % random.randint(0, 2147483648), @@ -293,6 +302,13 @@ def _base_sumo_load_params(self): return load_params + def _restart_sumo(self): + engine_config = config() + traci_retries = self._traci_retries or engine_config( + "sumo", "traci_retries", default=5, cast=int + ) + self._initialize_traci_conn(num_retries=traci_retries) + def setup(self, scenario) -> ProviderState: """Initialize the simulation with a new scenario.""" self._log.debug("Setting up SumoTrafficSim %s", self) @@ -316,24 +332,19 @@ def setup(self, scenario) -> ProviderState: ), "SumoTrafficSimulation requires a SumoRoadNetwork" self._log_file = scenario.unique_sumo_log_file() - if restart_sumo: - try: - engine_config = config() - traci_retries = self._traci_retries or engine_config( - "sumo", "traci_retries", default=5, cast=int - ) - self._initialize_traci_conn(num_retries=traci_retries) - except traci.exceptions.FatalTraCIError: - return ProviderState() - elif self._allow_reload: - assert ( - self._traci_conn is not None - ), "TraCI should be connected at this point." - try: - self._traci_conn.load(self._base_sumo_load_params()) - except traci.exceptions.FatalTraCIError as err: - self._handle_traci_exception(err, actors_relinquishable=False) - return ProviderState() + try: + if restart_sumo: + self._restart_sumo() + elif self._allow_reload: + assert ( + self._traci_conn is not None + ), "TraCI should be connected at this point." + try: + self._traci_conn.load(self._base_sumo_load_params()) + except traci.exceptions.FatalTraCIError: + self._restart_sumo() + except traci.exceptions.FatalTraCIError: + return ProviderState() assert self._traci_conn is not None, "No active TraCI connection" @@ -430,9 +441,19 @@ def teardown(self): self._remove_vehicles() except traci.exceptions.FatalTraCIError: pass - if self._traci_conn is not None: + if not self._allow_reload and self._traci_conn is not None: self._traci_conn.close_traci_and_pipes() + for i, trc in reversed( + [ + (j, trc) + for j, trc in enumerate(self._foreign_traci_servers) + if not trc.viable + ] + ): + self._foreign_traci_servers.pop(i) + trc.close_traci_and_pipes(wait=False) + self._cumulative_sim_seconds = 0 self._non_sumo_vehicle_ids = set() self._sumo_vehicle_ids = set() diff --git a/smarts/core/utils/sumo.py b/smarts/core/utils/sumo.py index 2f162c295a..9c27fe992d 100644 --- a/smarts/core/utils/sumo.py +++ b/smarts/core/utils/sumo.py @@ -98,9 +98,7 @@ def __init__( self._sumo_version: Tuple[int, ...] = tuple() self._host = host self._name = name - # self._log = logging self._log = logging.Logger(self.__class__.__name__) - # self._log.setLevel(logging.ERROR) self._connected = False if sumo_port is None: @@ -124,7 +122,7 @@ def __init__( def __del__(self) -> None: # We should not raise in delete. try: - self.close_traci_and_pipes() + self.close_traci_and_pipes(wait=False) except Exception: pass @@ -171,7 +169,7 @@ def connect( self._sumo_port, err, ) - self.close_traci_and_pipes(kill=True) + self.close_traci_and_pipes() raise except ConnectionRefusedError: self._log.error( @@ -180,7 +178,7 @@ def connect( self._host, self._sumo_port, ) - self.close_traci_and_pipes(kill=True) + self.close_traci_and_pipes() raise self._connected = True self._traci_conn = traci_conn @@ -205,7 +203,7 @@ def connect( ) # XXX: the error type is changed to TraCIException to make it consistent with the # process died case of `traci.connect`. Since TraCIException is fatal just in this case... - self.close_traci_and_pipes(kill=True) + self.close_traci_and_pipes() raise traci.exceptions.TraCIException(err) except OSError as err: self._log.error( @@ -215,7 +213,7 @@ def connect( self._sumo_port, err, ) - self.close_traci_and_pipes(kill=True) + self.close_traci_and_pipes() raise traci.exceptions.TraCIException(err) except ValueError: self.close_traci_and_pipes() @@ -272,11 +270,8 @@ def must_reset(self): """If the version of sumo will have errors if just reloading such that it must be reset.""" return self._sumo_version > (1, 12, 0) - def close_traci_and_pipes(self, wait: Optional[float] = 0, kill: bool = False): + def close_traci_and_pipes(self, wait: bool = True, kill: bool = True): """Safely closes all connections. We should expect this method to always work without throwing""" - assert wait is None or isinstance(wait, (int, float)) - if isinstance(wait, (int, float)): - wait = max(0.0, wait) def __safe_close(conn, **kwargs): try: @@ -295,27 +290,16 @@ def __safe_close(conn, **kwargs): if self._connected: self._log.debug("Closing TraCI connection to %s", self._sumo_port) - __safe_close(self._traci_conn, wait=bool(wait)) + __safe_close(self._traci_conn, wait=wait) if self._sumo_proc: __safe_close(self._sumo_proc.stdin) __safe_close(self._sumo_proc.stdout) __safe_close(self._sumo_proc.stderr) - if wait: - try: - self._sumo_proc.wait(timeout=wait) - except subprocess.TimeoutExpired as err: - kill = True - self._log.error( - "TraCI server process shutdown timed out '%s:%s' [%s]", - self._host, - self._sumo_port, - err, - ) if kill: self._sumo_proc.kill() self._sumo_proc = None - self._log.error( + self._log.info( "Killed TraCI server process '%s:%s", self._host, self._sumo_port ) @@ -342,7 +326,7 @@ def _wrap_traci_method( err, ) # TraCI cannot continue - traci_conn.close_traci_and_pipes(kill=True) + traci_conn.close_traci_and_pipes() raise traci.exceptions.FatalTraCIError("TraCI died.") from err except OSError as err: logging.error( @@ -353,11 +337,11 @@ def _wrap_traci_method( attribute_name, err, ) - traci_conn.close_traci_and_pipes(kill=True) + traci_conn.close_traci_and_pipes() raise OSError("Connection dropped.") from err except traci.exceptions.TraCIException as err: # Case where TraCI/SUMO can theoretically continue raise traci.exceptions.TraCIException("TraCI can continue.") from err except KeyboardInterrupt: - traci_conn.close_traci_and_pipes(kill=True) + traci_conn.close_traci_and_pipes(wait=False) raise From 3a4aee728ba2d9860f814bd3bb607a0ab7ef822f Mon Sep 17 00:00:00 2001 From: Tucker Date: Tue, 23 Jan 2024 00:01:34 -0500 Subject: [PATCH 04/17] Add sumo instance generating server. --- examples/e4_environment_config.py | 2 +- examples/tools/sumo_multi_clients.py | 2 +- smarts/core/bubble_manager.py | 2 +- smarts/core/configuration.py | 3 + smarts/core/lanepoints.py | 2 +- smarts/core/sumo_road_network.py | 2 +- smarts/core/sumo_traffic_simulation.py | 25 +- smarts/core/tests/test_sumo_version.py | 4 +- smarts/core/tests/test_traffic_simulation.py | 2 +- smarts/core/utils/{string.py => strings.py} | 0 smarts/core/utils/sumo_server.py | 128 ++++++++++ smarts/core/utils/{sumo.py => sumo_utils.py} | 239 +++++++++++++----- .../tests/test_traci_port_acquisition.py | 82 +++--- smarts/core/vehicle_index.py | 2 +- smarts/sstudio/generators.py | 2 +- 15 files changed, 391 insertions(+), 106 deletions(-) rename smarts/core/utils/{string.py => strings.py} (100%) create mode 100644 smarts/core/utils/sumo_server.py rename smarts/core/utils/{sumo.py => sumo_utils.py} (67%) diff --git a/examples/e4_environment_config.py b/examples/e4_environment_config.py index f9d6476afb..870e73a422 100644 --- a/examples/e4_environment_config.py +++ b/examples/e4_environment_config.py @@ -8,7 +8,7 @@ from tools.argument_parser import empty_parser from smarts.core.agent_interface import AgentInterface, AgentType -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.env.configs.hiway_env_configs import EnvReturnMode from smarts.env.gymnasium.hiway_env_v1 import HiWayEnvV1 from smarts.env.utils.action_conversion import ActionOptions diff --git a/examples/tools/sumo_multi_clients.py b/examples/tools/sumo_multi_clients.py index b79e83eeed..4b49e27f05 100644 --- a/examples/tools/sumo_multi_clients.py +++ b/examples/tools/sumo_multi_clients.py @@ -3,7 +3,7 @@ import threading import time -from smarts.core.utils.sumo import SUMO_PATH, sumolib, traci +from smarts.core.utils.sumo_utils import SUMO_PATH, sumolib, traci PORT = 8001 diff --git a/smarts/core/bubble_manager.py b/smarts/core/bubble_manager.py index 42f50a05d6..bdf228171c 100644 --- a/smarts/core/bubble_manager.py +++ b/smarts/core/bubble_manager.py @@ -44,7 +44,7 @@ from smarts.core.road_map import RoadMap from smarts.core.utils.cache import cache, clear_cache from smarts.core.utils.id import SocialAgentId -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.core.vehicle import Vehicle from smarts.core.vehicle_index import VehicleIndex from smarts.sstudio.sstypes import BoidAgentActor diff --git a/smarts/core/configuration.py b/smarts/core/configuration.py index 1d1bb83bce..4938f26e94 100644 --- a/smarts/core/configuration.py +++ b/smarts/core/configuration.py @@ -72,6 +72,9 @@ def _convert_truthy(t: str) -> bool: ("ray", "num_gpus"): 0, ("ray", "num_cpus"): None, ("ray", "log_to_driver"): False, + ("sumo", "server_port"): 62232, + ("sumo", "server_host"): "localhost", + ("sumo", "serve_mode"): "local", # local|remote } diff --git a/smarts/core/lanepoints.py b/smarts/core/lanepoints.py index 53a02842e0..67f05e20d6 100644 --- a/smarts/core/lanepoints.py +++ b/smarts/core/lanepoints.py @@ -102,7 +102,7 @@ def from_sumo( the network, the result of this function can be used to interpolate lane-points along lanes to the desired granularity. """ - from smarts.core.utils.sumo import sumolib # isort:skip + from smarts.core.utils.sumo_utils import sumolib # isort:skip from sumolib.net.edge import Edge # isort:skip from sumolib.net.lane import Lane # isort:skip from .sumo_road_network import SumoRoadNetwork diff --git a/smarts/core/sumo_road_network.py b/smarts/core/sumo_road_network.py index d79df24cc2..cf5397ff7f 100644 --- a/smarts/core/sumo_road_network.py +++ b/smarts/core/sumo_road_network.py @@ -40,7 +40,7 @@ from .utils.geometry import buffered_shape from .utils.glb import make_map_glb, make_road_line_glb -from smarts.core.utils.sumo import sumolib # isort:skip +from smarts.core.utils.sumo_utils import sumolib # isort:skip from sumolib.net.edge import Edge # isort:skip diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 7ff882267b..c3c594b43d 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -22,6 +22,7 @@ import random import time import weakref +from functools import partial from pathlib import Path from typing import Iterable, List, Optional, Sequence, Tuple @@ -48,7 +49,12 @@ from smarts.core.utils.core_logging import suppress_output from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState -from smarts.core.utils.sumo import traci, TraciConn # isort:skip +from smarts.core.utils.sumo_utils import ( + LocalSumoProcess, + RemoteSumoProcess, + traci, + TraciConn, +) # isort:skip import traci.constants as tc # isort:skip @@ -128,6 +134,15 @@ def __init__( # XXX: This is used to try to avoid interrupting other instances in race condition (see GH #2139) self._foreign_traci_servers: List[TraciConn] = [] + if (sumo_serve_mode := config()("sumo", "serve_mode")) == "local": + self._process_factory = partial(LocalSumoProcess, self._sumo_port) + elif sumo_serve_mode == "remote": + self._process_factory = partial( + RemoteSumoProcess, + remote_host=config()("sumo", "server_host"), + remote_port=config()("sumo", "server_port", cast=int), + ) + # start with the default recovery flags... self._recovery_flags = super().recovery_flags @@ -204,10 +219,12 @@ def _initialize_traci_conn(self, num_retries=5): sumo_port = self._sumo_port sumo_binary = "sumo" if self._headless else "sumo-gui" + sumo_process = self._process_factory() + sumo_process.generate( + base_params=self._base_sumo_load_params(), sumo_binary=sumo_binary + ) self._traci_conn = TraciConn( - sumo_port=sumo_port, - base_params=self._base_sumo_load_params(), - sumo_binary=sumo_binary, + sumo_process=sumo_process, ) try: diff --git a/smarts/core/tests/test_sumo_version.py b/smarts/core/tests/test_sumo_version.py index 222c779719..4dddc346af 100644 --- a/smarts/core/tests/test_sumo_version.py +++ b/smarts/core/tests/test_sumo_version.py @@ -25,12 +25,12 @@ def test_sumo_lib(): # import does runtime check by necessity - from smarts.core.utils.sumo import sumolib + from smarts.core.utils.sumo_utils import sumolib def test_sumo_version(): from smarts.core.utils import networking - from smarts.core.utils.sumo import SUMO_PATH, traci + from smarts.core.utils.sumo_utils import SUMO_PATH, traci load_params = [ "--start", diff --git a/smarts/core/tests/test_traffic_simulation.py b/smarts/core/tests/test_traffic_simulation.py index 34973bce72..2f5dc9b54d 100644 --- a/smarts/core/tests/test_traffic_simulation.py +++ b/smarts/core/tests/test_traffic_simulation.py @@ -33,7 +33,7 @@ from smarts.core.scenario import Scenario from smarts.core.smarts import SMARTS from smarts.core.sumo_traffic_simulation import SumoTrafficSimulation -from smarts.core.utils.sumo import traci +from smarts.core.utils.sumo_utils import traci SUMO_PORT = 8082 diff --git a/smarts/core/utils/string.py b/smarts/core/utils/strings.py similarity index 100% rename from smarts/core/utils/string.py rename to smarts/core/utils/strings.py diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/sumo_server.py new file mode 100644 index 0000000000..a6e2024a27 --- /dev/null +++ b/smarts/core/utils/sumo_server.py @@ -0,0 +1,128 @@ +# MIT License +# +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from __future__ import annotations + +import asyncio +import json +import os +import subprocess +from typing import Dict, Set + +from smarts.core import config +from smarts.core.utils.networking import find_free_port +from smarts.core.utils.sumo_utils import SUMO_PATH + + +class SumoServer: + """A centralized server for handling SUMO instances to prevent race conditions.""" + + def __init__(self, host, port) -> None: + self._host = host + self._port = port + + self._used_ports: Set[int] = set() + self._sumo_servers: Dict[int, subprocess.Popen[bytes]] = {} + + async def start(self): + """Start the server.""" + # Create a socket object + server = await asyncio.start_server(self.handle_client, self._host, self._port) + addr = server.sockets[0].getsockname() + print(f"Server listening on {addr}") + + async with server: + try: + await server.serve_forever() + except KeyboardInterrupt: + for port in self._sumo_servers: + self._handle_disconnect(port) + + def _handle_disconnect(self, port): + self._used_ports.discard(port) + _sumo_proc = self._sumo_servers.get(port) + if _sumo_proc is not None: + _sumo_proc.kill() + _sumo_proc = None + del self._sumo_servers[port] + + async def handle_client( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ): + """Read data from the client.""" + addr = writer.get_extra_info("peername") + print(f"Received connection from {addr}") + try: + port = None + _sumo_proc = None + while True: + data = await reader.readline() + message = data.decode("utf-8") + # print(f"Received {message!r} from {addr}") + if message.startswith("sumo"): + while {port := find_free_port()} in self._used_ports: + pass + self._used_ports.add(port) + # Send a response back to the client + sumo_binary, _, sumo_cmd = message.partition(":") + response_list = json.loads(sumo_cmd) + _sumo_proc = subprocess.Popen( + [ + os.path.join(SUMO_PATH, "bin", sumo_binary), + f"--remote-port={port}", + *response_list, + ], + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + self._sumo_servers[port] = _sumo_proc + + response = f"{self._host}:{port}" + print(f"Send: {response!r}") + writer.write(response.encode("utf-8")) + if message.startswith("e:"): + self._handle_disconnect(port) + break + if len(message) == 0: + break + + # Close the connection + await writer.drain() + writer.close() + except asyncio.CancelledError: + # Handle client disconnect + self._handle_disconnect(port) + print(f"Client {addr} disconnected unexpectedly.") + + +if __name__ == "__main__": + + # Define the host and port on which the server will listen + _host = config()( + "sumo", "server_host" + ) # Use '0.0.0.0' to listen on all available interfaces + _port = config()("sumo", "server_port") + + ss = SumoServer(_host, _port) + asyncio.run(ss.start()) diff --git a/smarts/core/utils/sumo.py b/smarts/core/utils/sumo_utils.py similarity index 67% rename from smarts/core/utils/sumo.py rename to smarts/core/utils/sumo_utils.py index 9c27fe992d..b8ac5ff08a 100644 --- a/smarts/core/utils/sumo.py +++ b/smarts/core/utils/sumo_utils.py @@ -22,11 +22,14 @@ """ from __future__ import annotations +import abc import functools import inspect +import json import logging import multiprocessing import os +import socket import subprocess import sys from typing import Any, List, Literal, Optional, Tuple @@ -57,6 +60,22 @@ ) from e +def _safe_close(conn, **kwargs): + try: + conn.close(**kwargs) + except (subprocess.SubprocessError, multiprocessing.ProcessError): + # Subprocess or process failed + pass + except traci.exceptions.FatalTraCIError: + # TraCI connection is already dead. + pass + except AttributeError: + # Socket was destroyed internally, likely due to an error. + pass + except Exception as err: + pass + + class DomainWrapper: """Wraps `traci.Domain` type for the `TraciConn` utility""" @@ -79,50 +98,166 @@ def __getattr__(self, name: str) -> Any: return attribute +class SumoProcess(metaclass=abc.ABCMeta): + """A simplified utility representing a SUMO process.""" + + @abc.abstractmethod + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + """Generate the process.""" + raise NotImplementedError + + @abc.abstractmethod + def terminate(self, kill: bool): + """Terminate this process.""" + raise NotImplementedError + + @abc.abstractmethod + def poll(self) -> int: + """Poll the underlying process.""" + raise NotImplementedError + + @abc.abstractmethod + def wait(self, timeout: Optional[float] = None) -> int: + """Wait on the underlying process.""" + raise NotImplementedError + + @property + @abc.abstractmethod + def port(self) -> int: + """The port this process is associated with.""" + raise NotImplementedError + + @property + @abc.abstractmethod + def host(self) -> str: + """The port this process is associated with.""" + raise NotImplementedError + + +class RemoteSumoProcess(SumoProcess): + """Connects to a sumo server.""" + + def __init__(self, remote_host, remote_port) -> None: + self._remote_host = remote_host + self._remote_port = remote_port + self._port = None + self._host = None + self._client_socket = None + + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket.connect((self._remote_host, self._remote_port)) + + client_socket.send(f"{sumo_binary}:{json.dumps(base_params)}\n".encode("utf-8")) + + self._client_socket = client_socket + + response = client_socket.recv(1024) + self._host, _, port = response.decode("utf-8").partition(":") + self._port = int(port) + + def terminate(self, kill: bool): + self._client_socket.send("e:".encode("utf-8")) + self._client_socket.close() + + @property + def port(self) -> int: + return self._port or 0 + + @property + def host(self) -> str: + return self._host or "-1" + + def poll(self) -> Optional[int]: + return None + + def wait(self, timeout: float | None = None) -> int: + return 0 + + +class LocalSumoProcess(SumoProcess): + """Connects to a local sumo process.""" + + def __init__(self, sumo_port) -> None: + self._sumo_proc = None + self._sumo_port = sumo_port + + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + """Generate the process.""" + if self._sumo_port is None: + self._sumo_port = networking.find_free_port() + sumo_cmd = [ + os.path.join(SUMO_PATH, "bin", sumo_binary), + f"--remote-port={self._sumo_port}", + *base_params, + ] + self._sumo_proc = subprocess.Popen( + sumo_cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + + @property + def port(self) -> int: + assert self._sumo_port is not None + return self._sumo_port + + @property + def host(self) -> str: + return "localhost" + + def terminate(self, kill): + """Terminate this process.""" + + if self._sumo_proc: + _safe_close(self._sumo_proc.stdin) + _safe_close(self._sumo_proc.stdout) + _safe_close(self._sumo_proc.stderr) + if kill: + self._sumo_proc.kill() + self._sumo_proc = None + + def poll(self): + """Poll the underlying process.""" + return self._sumo_proc.poll() + + def wait(self, timeout=None): + """Wait on the underlying process.""" + return self._sumo_proc.wait(timeout=timeout) + + class TraciConn: """A simplified utility for connecting to a SUMO process.""" def __init__( self, - sumo_port: Optional[int], - base_params: List[str], - sumo_binary: Literal[ - "sumo", "sumo-gui" - ] = "sumo", # Literal["sumo", "sumo-gui"] host: str = "localhost", name: str = "", + sumo_process: SumoProcess = False, ): - self._sumo_proc = None self._traci_conn = None self._sumo_port = None self._sumo_version: Tuple[int, ...] = tuple() self._host = host self._name = name self._log = logging.Logger(self.__class__.__name__) + self._log = logging self._connected = False - if sumo_port is None: - sumo_port = networking.find_free_port() - self._sumo_port = sumo_port - sumo_cmd = [ - os.path.join(SUMO_PATH, "bin", sumo_binary), - f"--remote-port={sumo_port}", - *base_params, - ] - - self._log.debug("Starting sumo process:\n\t %s", sumo_cmd) - self._sumo_proc = subprocess.Popen( - sumo_cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - ) + self._sumo_process = sumo_process def __del__(self) -> None: # We should not raise in delete. try: - self.close_traci_and_pipes(wait=False) + self.close_traci_and_pipes() except Exception: pass @@ -135,19 +270,16 @@ def connect( ): """Attempt a connection with the SUMO process.""" traci_conn = None + self._host = self._sumo_process.host + self._sumo_port = self._sumo_process.port try: # See if the process is still alive before attempting a connection. - if self._sumo_proc.poll() is not None: - raise traci.exceptions.TraCIException( - "TraCI server already finished before connection!!!" - ) - with suppress_output(stderr=not debug, stdout=True): traci_conn = traci.connect( - self._sumo_port, - host=self._host, + self._sumo_process.port, + host=self._sumo_process.host, numRetries=max(0, int(20 * timeout)), - proc=self._sumo_proc, + proc=self._sumo_process, waitBetweenRetries=0.05, ) # SUMO must be ready within timeout seconds # We will retry since this is our first sumo command @@ -180,9 +312,12 @@ def connect( ) self.close_traci_and_pipes() raise + self._connected = True self._traci_conn = traci_conn try: + if not self.viable: + raise traci.exceptions.TraCIException("TraCI server already finished!?") vers, vers_str = traci_conn.getVersion() if vers < minimum_traci_version: raise ValueError( @@ -222,12 +357,12 @@ def connect( @property def connected(self) -> bool: """Check if the connection is still valid.""" - return self._sumo_proc is not None and self._connected + return self._sumo_process is not None and self._connected @property def viable(self) -> bool: """If making a connection to the sumo process is still viable.""" - return self._sumo_proc is not None and self._sumo_proc.poll() is None + return self._sumo_process is not None and self._sumo_process.poll() is None @property def sumo_version(self) -> Tuple[int, ...]: @@ -273,41 +408,23 @@ def must_reset(self): def close_traci_and_pipes(self, wait: bool = True, kill: bool = True): """Safely closes all connections. We should expect this method to always work without throwing""" - def __safe_close(conn, **kwargs): - try: - conn.close(**kwargs) - except (subprocess.SubprocessError, multiprocessing.ProcessError): - # Subprocess or process failed - pass - except traci.exceptions.FatalTraCIError: - # TraCI connection is already dead. - pass - except AttributeError: - # Socket was destroyed internally, likely due to an error. - pass - except Exception as err: - self._log.error("Different error occurred: [%s]", err) - if self._connected: self._log.debug("Closing TraCI connection to %s", self._sumo_port) - __safe_close(self._traci_conn, wait=wait) + _safe_close(self._traci_conn, wait=wait) - if self._sumo_proc: - __safe_close(self._sumo_proc.stdin) - __safe_close(self._sumo_proc.stdout) - __safe_close(self._sumo_proc.stderr) - if kill: - self._sumo_proc.kill() - self._sumo_proc = None - self._log.info( - "Killed TraCI server process '%s:%s", self._host, self._sumo_port - ) + if self._sumo_process: + self._sumo_process.terminate(kill=kill) + self._log.info( + "Killed TraCI server process '%s:%s'", self._host, self._sumo_port + ) + self._sumo_process = None self._connected = False def teardown(self): """Clean up all resources.""" - self.close_traci_and_pipes() + self.close_traci_and_pipes(True) + self._traci_conn = None def _wrap_traci_method( diff --git a/smarts/core/utils/tests/test_traci_port_acquisition.py b/smarts/core/utils/tests/test_traci_port_acquisition.py index 0d581468c3..5ee0bf3234 100644 --- a/smarts/core/utils/tests/test_traci_port_acquisition.py +++ b/smarts/core/utils/tests/test_traci_port_acquisition.py @@ -23,12 +23,15 @@ import logging import multiprocessing +import os import random import time from multiprocessing.pool import AsyncResult from typing import List, Tuple -from smarts.core.utils.sumo import TraciConn +from smarts.core import config +from smarts.core.utils.file import make_dir_in_smarts_log_dir +from smarts.core.utils.sumo_utils import RemoteSumoProcess, TraciConn load_params = [ "--net-file=%s" % "./scenarios/sumo/loop/map.net.xml", @@ -53,13 +56,27 @@ ] MAX_PARALLEL = 32 -ITERATIONS = 800000 # 64512 ports available by Ubuntu standard +ITERATIONS = 60000 # 64512 ports available by Ubuntu standard LOGGING_STEP = 1000 def run_client(t): - conn = TraciConn(None, load_params, sumo_binary="sumo", name=f"Client@{t}") try: + f = os.path.abspath(make_dir_in_smarts_log_dir("_sumo_run_logs")) + f"/{t}" + lsp = RemoteSumoProcess( + remote_host=config()("sumo", "server_host"), + remote_port=config()("sumo", "server_port", cast=int), + ) + lsp.generate( + base_params=load_params + + [ + "--log=%s.log" % f, + "--message-log=%s" % f, + "--error-log=%s.err" % f, + ], + sumo_binary="sumo", + ) + conn = TraciConn(sumo_binary="sumo", name=f"Client@{t}", sumo_process=lsp) conn.connect( timeout=5, minimum_traci_version=20, @@ -68,10 +85,11 @@ def run_client(t): time.sleep(0.1) conn.getVersion() except KeyboardInterrupt: - conn.close_traci_and_pipes() + conn.close_traci_and_pipes(False) raise except Exception as err: - logging.error("Primary occurred. [%s]", err) + # logging.error("Primary occurred. [%s]", err) + # logging.exception(err) raise finally: # try: @@ -81,35 +99,37 @@ def run_client(t): diff = time.time() - t if diff > 9: logging.error("Client took %ss to close", diff) + conn.teardown() def test_traffic_sim_with_multi_client(): - pool = multiprocessing.Pool(processes=MAX_PARALLEL) - clients: List[Tuple[AsyncResult, float]] = [] - start = time.time() - # Attempt to run out of ports. - for i in range(ITERATIONS): - while len(clients) > MAX_PARALLEL: - for j, (c, t) in reversed( - [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] - ): - clients.pop(j) - current = time.time() - if i % LOGGING_STEP == 0: - logging.error("Working on %s at %ss", i, current - start) - clients.append((pool.apply_async(run_client, args=(current,)), current)) + with multiprocessing.Pool(processes=MAX_PARALLEL) as pool: + clients: List[Tuple[AsyncResult, float]] = [] + start = time.time() + # Attempt to run out of ports. + for i in range(ITERATIONS): + while len(clients) > MAX_PARALLEL: + for j, (c, t) in reversed( + [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] + ): + clients.pop(j) + current = time.time() + if i % LOGGING_STEP == 0: + logging.error("Working on %s at %ss", i, current - start) + clients.append((pool.apply_async(run_client, args=(current,)), current)) - for j, (c, t) in reversed( - [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] - ): - clients.pop(j) - logging.error("Popping remaining ready clients %s", t) + for j, (c, t) in reversed( + [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] + ): + clients.pop(j) + logging.error("Popping remaining ready clients %s", t) - for (c, t) in clients: - logging.error("Stuck clients %s", t) + for (c, t) in clients: + if time.time() - t > 0.2: + logging.error("Stuck clients %s", t) - logging.error("Finished") - pool.close() - logging.error("Closed") - pool.join() - logging.error("Joined") + logging.error("Finished") + pool.close() + logging.error("Closed") + pool.join() + logging.error("Joined") diff --git a/smarts/core/vehicle_index.py b/smarts/core/vehicle_index.py index bf08c406da..8c4f954d9f 100644 --- a/smarts/core/vehicle_index.py +++ b/smarts/core/vehicle_index.py @@ -52,7 +52,7 @@ from smarts.core.coordinates import Dimensions, Pose from smarts.core.utils import resources from smarts.core.utils.cache import cache, clear_cache -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.core.vehicle_state import VEHICLE_CONFIGS from .actor import ActorRole diff --git a/smarts/sstudio/generators.py b/smarts/sstudio/generators.py index 8adbe21269..f022ed4008 100644 --- a/smarts/sstudio/generators.py +++ b/smarts/sstudio/generators.py @@ -184,7 +184,7 @@ def plan_and_save( log_path = f"{self._log_dir}/{scenario_name}" os.makedirs(log_path, exist_ok=True) - from smarts.core.utils.sumo import sumolib + from smarts.core.utils.sumo_utils import sumolib int32_limits = np.iinfo(np.int32) duarouter_path = sumolib.checkBinary("duarouter") From 94da02f862245d96ef21222717df6707fcfc5c1f Mon Sep 17 00:00:00 2001 From: Tucker Date: Tue, 23 Jan 2024 00:11:12 -0500 Subject: [PATCH 05/17] Fix bad syntax. --- smarts/core/utils/sumo_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/sumo_server.py index a6e2024a27..2ef3ae084e 100644 --- a/smarts/core/utils/sumo_server.py +++ b/smarts/core/utils/sumo_server.py @@ -79,7 +79,7 @@ async def handle_client( message = data.decode("utf-8") # print(f"Received {message!r} from {addr}") if message.startswith("sumo"): - while {port := find_free_port()} in self._used_ports: + while (port := find_free_port()) in self._used_ports: pass self._used_ports.add(port) # Send a response back to the client From e4f3dcb0d51d5683daa87271210a41b5060405f3 Mon Sep 17 00:00:00 2001 From: Montgomery Alban Date: Tue, 23 Jan 2024 19:01:12 +0000 Subject: [PATCH 06/17] Fix docs test. --- smarts/core/sumo_traffic_simulation.py | 5 +++-- smarts/core/utils/sumo_server.py | 5 ++--- smarts/core/utils/sumo_utils.py | 5 ----- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index c3c594b43d..9c66e4861b 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -49,12 +49,13 @@ from smarts.core.utils.core_logging import suppress_output from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState -from smarts.core.utils.sumo_utils import ( +from smarts.core.utils.sumo_utils import ( # isort:skip LocalSumoProcess, RemoteSumoProcess, - traci, TraciConn, + traci, ) # isort:skip + import traci.constants as tc # isort:skip diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/sumo_server.py index 2ef3ae084e..37502433da 100644 --- a/smarts/core/utils/sumo_server.py +++ b/smarts/core/utils/sumo_server.py @@ -26,6 +26,7 @@ import json import os import subprocess +from asyncio.streams import StreamReader, StreamWriter from typing import Dict, Set from smarts.core import config @@ -65,9 +66,7 @@ def _handle_disconnect(self, port): _sumo_proc = None del self._sumo_servers[port] - async def handle_client( - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ): + async def handle_client(self, reader: StreamReader, writer: StreamWriter): """Read data from the client.""" addr = writer.get_extra_info("peername") print(f"Received connection from {addr}") diff --git a/smarts/core/utils/sumo_utils.py b/smarts/core/utils/sumo_utils.py index b8ac5ff08a..e272ddc741 100644 --- a/smarts/core/utils/sumo_utils.py +++ b/smarts/core/utils/sumo_utils.py @@ -189,7 +189,6 @@ def __init__(self, sumo_port) -> None: def generate( self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" ): - """Generate the process.""" if self._sumo_port is None: self._sumo_port = networking.find_free_port() sumo_cmd = [ @@ -215,8 +214,6 @@ def host(self) -> str: return "localhost" def terminate(self, kill): - """Terminate this process.""" - if self._sumo_proc: _safe_close(self._sumo_proc.stdin) _safe_close(self._sumo_proc.stdout) @@ -226,11 +223,9 @@ def terminate(self, kill): self._sumo_proc = None def poll(self): - """Poll the underlying process.""" return self._sumo_proc.poll() def wait(self, timeout=None): - """Wait on the underlying process.""" return self._sumo_proc.wait(timeout=timeout) From 103e46b4cb718b3fd966e24a6177915ad07daf7a Mon Sep 17 00:00:00 2001 From: Montgomery Alban Date: Thu, 25 Jan 2024 14:56:45 +0000 Subject: [PATCH 07/17] Add resource pooling --- smarts/core/configuration.py | 1 + smarts/core/utils/sumo_server.py | 70 ++++++++++++++++++++++++++++---- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/smarts/core/configuration.py b/smarts/core/configuration.py index 4938f26e94..c8116b62fe 100644 --- a/smarts/core/configuration.py +++ b/smarts/core/configuration.py @@ -72,6 +72,7 @@ def _convert_truthy(t: str) -> bool: ("ray", "num_gpus"): 0, ("ray", "num_cpus"): None, ("ray", "log_to_driver"): False, + ("sumo", "server_pool_size"): 12, ("sumo", "server_port"): 62232, ("sumo", "server_host"): "localhost", ("sumo", "serve_mode"): "local", # local|remote diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/sumo_server.py index 37502433da..4f33644030 100644 --- a/smarts/core/utils/sumo_server.py +++ b/smarts/core/utils/sumo_server.py @@ -23,11 +23,13 @@ from __future__ import annotations import asyncio +import asyncio.streams import json +import multiprocessing import os import subprocess from asyncio.streams import StreamReader, StreamWriter -from typing import Dict, Set +from typing import Any, Dict, Set from smarts.core import config from smarts.core.utils.networking import find_free_port @@ -37,34 +39,85 @@ class SumoServer: """A centralized server for handling SUMO instances to prevent race conditions.""" - def __init__(self, host, port) -> None: + def __init__(self, host, port, reserved_count) -> None: self._host = host self._port = port self._used_ports: Set[int] = set() self._sumo_servers: Dict[int, subprocess.Popen[bytes]] = {} + self._reserve_count: float = reserved_count + self._server_pool: multiprocessing.Queue() + async def start(self): """Start the server.""" # Create a socket object server = await asyncio.start_server(self.handle_client, self._host, self._port) - addr = server.sockets[0].getsockname() - print(f"Server listening on {addr}") + + address = server.sockets[0].getsockname() + print(f"Server listening on `{address = }`") + + resource_pooling_task = asyncio.create_task(self._resource_pooling_loop()) async with server: try: - await server.serve_forever() + await asyncio.gather(server.serve_forever(), resource_pooling_task) except KeyboardInterrupt: for port in self._sumo_servers: self._handle_disconnect(port) - def _handle_disconnect(self, port): - self._used_ports.discard(port) + async def _resource_pooling_loop( + self, + ): + # Get the current event loop. + loop = asyncio.get_running_loop() + futures: Dict[Any, asyncio.Future[Any]] = {} + while True: + for _ in range(self._reserve_count - len(self._sumo_servers)): + # Create a new Future object. + fut = loop.create_future() + while (port := find_free_port()) in self._used_ports: + pass + futures[port] = fut + loop.create_task(self.open_sumo(fut, port)) + + for p, f in futures.copy().items(): + if f.done(): + if f.cancelled(): + del futures[p] + continue + poll, port, _sumo_process = f.result() + if poll is not None: + continue + print(f"sumo pooled {port = }, {poll = }") + self._server_pool.put((port, _sumo_process)) + + async def open_sumo(self, future: asyncio.futures.Future, port: int): + _sumo_proc = subprocess.Popen( + [ + os.path.join( + SUMO_PATH, + "bin", + ), + f"--remote-port={port}", + ], + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + await asyncio.sleep(0.1) + if (poll := _sumo_proc.poll()) is None: + self._server_pool[port] = _sumo_proc + future.set_result((poll, port, _sumo_proc)) + + def _handle_disconnect(self, port: int): _sumo_proc = self._sumo_servers.get(port) if _sumo_proc is not None: _sumo_proc.kill() _sumo_proc = None del self._sumo_servers[port] + self._used_ports.discard(port) async def handle_client(self, reader: StreamReader, writer: StreamWriter): """Read data from the client.""" @@ -122,6 +175,7 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter): "sumo", "server_host" ) # Use '0.0.0.0' to listen on all available interfaces _port = config()("sumo", "server_port") + _server_pool_size = config()("sumo", "server_pool_size") - ss = SumoServer(_host, _port) + ss = SumoServer(_host, _port, _server_pool_size) asyncio.run(ss.start()) From 25789e9ef75c93a8546046ff76dcb8d699f98088 Mon Sep 17 00:00:00 2001 From: Tucker Date: Thu, 25 Jan 2024 15:01:40 -0500 Subject: [PATCH 08/17] Stabilize the server. No more detected errors. --- docs/conf.py | 4 +- smarts/core/sumo_traffic_simulation.py | 20 ++- smarts/core/utils/sumo_server.py | 150 ++++++++---------- smarts/core/utils/sumo_utils.py | 8 +- .../tests/test_traci_port_acquisition.py | 9 +- 5 files changed, 87 insertions(+), 104 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 1fb8684439..70bcad2e94 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -42,8 +42,8 @@ "sphinx.ext.viewcode", # link to sourcecode from docs "sphinx_rtd_theme", # Read The Docs theme "sphinx_click", # extract documentation from a `click` application - "sphinxcontrib.apidoc", - "sphinxcontrib.spelling", + "sphinxcontrib.apidoc", # automatically document the API + "sphinxcontrib.spelling", # check documentation for spelling ] extlinks = { diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 9c66e4861b..8d94b1e31f 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -20,11 +20,10 @@ import logging import random -import time import weakref from functools import partial from pathlib import Path -from typing import Iterable, List, Optional, Sequence, Tuple +from typing import Final, Iterable, List, Optional, Sequence, Tuple import numpy as np from shapely.affinity import rotate as shapely_rotate @@ -49,12 +48,15 @@ from smarts.core.utils.core_logging import suppress_output from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState -from smarts.core.utils.sumo_utils import ( # isort:skip +NO_CHECKS: Final = 0b00000 + +# isort:skip +from smarts.core.utils.sumo_utils import ( LocalSumoProcess, RemoteSumoProcess, TraciConn, traci, -) # isort:skip +) import traci.constants as tc # isort:skip @@ -135,7 +137,10 @@ def __init__( # XXX: This is used to try to avoid interrupting other instances in race condition (see GH #2139) self._foreign_traci_servers: List[TraciConn] = [] - if (sumo_serve_mode := config()("sumo", "serve_mode")) == "local": + if ( + self._sumo_port is not None + or (sumo_serve_mode := config()("sumo", "serve_mode")) == "local" + ): self._process_factory = partial(LocalSumoProcess, self._sumo_port) elif sumo_serve_mode == "remote": self._process_factory = partial( @@ -217,7 +222,6 @@ def _initialize_traci_conn(self, num_retries=5): self._traci_conn.close_traci_and_pipes() self._traci_conn = None - sumo_port = self._sumo_port sumo_binary = "sumo" if self._headless else "sumo-gui" sumo_process = self._process_factory() @@ -602,7 +606,7 @@ def _sync(self, provider_state: ProviderState): VEHICLE_CONFIGS[vehicle_state.vehicle_config_type].dimensions, ) self._create_vehicle(vehicle_id, dimensions, vehicle_state.role) - no_checks = 0b00000 + no_checks = NO_CHECKS self._traci_conn.vehicle.setSpeedMode(vehicle_id, no_checks) # update the state of all current managed vehicles @@ -647,7 +651,7 @@ def _sync(self, provider_state: ProviderState): ) for vehicle_id in vehicles_that_have_become_external: - no_checks = 0b00000 + no_checks = NO_CHECKS self._traci_conn.vehicle.setSpeedMode(vehicle_id, no_checks) self._traci_conn.vehicle.setColor( vehicle_id, SumoTrafficSimulation._color_for_role(ActorRole.SocialAgent) diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/sumo_server.py index 4f33644030..126cd5c467 100644 --- a/smarts/core/utils/sumo_server.py +++ b/smarts/core/utils/sumo_server.py @@ -28,10 +28,12 @@ import multiprocessing import os import subprocess +import time from asyncio.streams import StreamReader, StreamWriter -from typing import Any, Dict, Set +from typing import Any, Dict, List, Set from smarts.core import config +from smarts.core.utils.file import make_dir_in_smarts_log_dir from smarts.core.utils.networking import find_free_port from smarts.core.utils.sumo_utils import SUMO_PATH @@ -44,10 +46,7 @@ def __init__(self, host, port, reserved_count) -> None: self._port = port self._used_ports: Set[int] = set() - self._sumo_servers: Dict[int, subprocess.Popen[bytes]] = {} - self._reserve_count: float = reserved_count - self._server_pool: multiprocessing.Queue() async def start(self): """Start the server.""" @@ -57,104 +56,81 @@ async def start(self): address = server.sockets[0].getsockname() print(f"Server listening on `{address = }`") - resource_pooling_task = asyncio.create_task(self._resource_pooling_loop()) - async with server: - try: - await asyncio.gather(server.serve_forever(), resource_pooling_task) - except KeyboardInterrupt: - for port in self._sumo_servers: - self._handle_disconnect(port) - - async def _resource_pooling_loop( - self, - ): - # Get the current event loop. - loop = asyncio.get_running_loop() - futures: Dict[Any, asyncio.Future[Any]] = {} - while True: - for _ in range(self._reserve_count - len(self._sumo_servers)): - # Create a new Future object. - fut = loop.create_future() - while (port := find_free_port()) in self._used_ports: - pass - futures[port] = fut - loop.create_task(self.open_sumo(fut, port)) - - for p, f in futures.copy().items(): - if f.done(): - if f.cancelled(): - del futures[p] - continue - poll, port, _sumo_process = f.result() - if poll is not None: - continue - print(f"sumo pooled {port = }, {poll = }") - self._server_pool.put((port, _sumo_process)) - - async def open_sumo(self, future: asyncio.futures.Future, port: int): - _sumo_proc = subprocess.Popen( - [ - os.path.join( - SUMO_PATH, - "bin", - ), - f"--remote-port={port}", - ], - stdin=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - close_fds=True, - ) - await asyncio.sleep(0.1) - if (poll := _sumo_proc.poll()) is None: - self._server_pool[port] = _sumo_proc - future.set_result((poll, port, _sumo_proc)) - - def _handle_disconnect(self, port: int): - _sumo_proc = self._sumo_servers.get(port) - if _sumo_proc is not None: - _sumo_proc.kill() - _sumo_proc = None - del self._sumo_servers[port] - self._used_ports.discard(port) + await server.serve_forever() + + async def _process_manager(self, binary, args, f: asyncio.Future): + """Manages the lifecycle of the TraCI server.""" + _sumo_proc = None + try: + # Create a new Future object. + while (port := find_free_port()) in self._used_ports: + pass + self._used_ports.add(port) + _sumo_proc = await asyncio.create_subprocess_exec( + *[ + os.path.join(SUMO_PATH, "bin", binary), + f"--remote-port={port}", + ], + *args, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + loop = asyncio.get_event_loop() + future = loop.create_future() + f.set_result( + { + "port": port, + "future": future, + } + ) + + result = await asyncio.wait_for(future, None) + finally: + if port is not None: + self._used_ports.discard(port) + if _sumo_proc is not None and _sumo_proc.returncode is None: + _sumo_proc.kill() async def handle_client(self, reader: StreamReader, writer: StreamWriter): """Read data from the client.""" - addr = writer.get_extra_info("peername") - print(f"Received connection from {addr}") + address = writer.get_extra_info("peername") + print(f"Received connection from {address}") + loop = asyncio.get_event_loop() try: port = None - _sumo_proc = None + _traci_server_info = None while True: data = await reader.readline() message = data.decode("utf-8") # print(f"Received {message!r} from {addr}") if message.startswith("sumo"): - while (port := find_free_port()) in self._used_ports: - pass - self._used_ports.add(port) - # Send a response back to the client + kill_f = loop.create_future() sumo_binary, _, sumo_cmd = message.partition(":") response_list = json.loads(sumo_cmd) - _sumo_proc = subprocess.Popen( - [ - os.path.join(SUMO_PATH, "bin", sumo_binary), - f"--remote-port={port}", - *response_list, - ], - stdin=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - close_fds=True, + command_args = [ + *response_list, + ] + asyncio.create_task( + self._process_manager(sumo_binary, command_args, kill_f) ) - self._sumo_servers[port] = _sumo_proc + if _traci_server_info is not None: + print("Duplicate start request received.") + continue + _traci_server_info = await asyncio.wait_for(kill_f, None) + port = _traci_server_info["port"] response = f"{self._host}:{port}" - print(f"Send: {response!r}") + print(f"Send TraCI address: {response!r}") writer.write(response.encode("utf-8")) + if message.startswith("e:"): - self._handle_disconnect(port) + if _traci_server_info is None: + print("Kill received for uninitialized process.") + else: + _traci_server_info["future"].set_result("kill") break if len(message) == 0: break @@ -164,8 +140,7 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter): writer.close() except asyncio.CancelledError: # Handle client disconnect - self._handle_disconnect(port) - print(f"Client {addr} disconnected unexpectedly.") + pass if __name__ == "__main__": @@ -175,7 +150,8 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter): "sumo", "server_host" ) # Use '0.0.0.0' to listen on all available interfaces _port = config()("sumo", "server_port") - _server_pool_size = config()("sumo", "server_pool_size") + _server_pool_size = 2 + config()("sumo", "server_pool_size") ss = SumoServer(_host, _port, _server_pool_size) asyncio.run(ss.start()) diff --git a/smarts/core/utils/sumo_utils.py b/smarts/core/utils/sumo_utils.py index e272ddc741..c142f30740 100644 --- a/smarts/core/utils/sumo_utils.py +++ b/smarts/core/utils/sumo_utils.py @@ -114,7 +114,7 @@ def terminate(self, kill: bool): raise NotImplementedError @abc.abstractmethod - def poll(self) -> int: + def poll(self) -> Optional[int]: """Poll the underlying process.""" raise NotImplementedError @@ -175,7 +175,7 @@ def host(self) -> str: def poll(self) -> Optional[int]: return None - def wait(self, timeout: float | None = None) -> int: + def wait(self, timeout: Optional[float] = None) -> int: return 0 @@ -222,7 +222,7 @@ def terminate(self, kill): self._sumo_proc.kill() self._sumo_proc = None - def poll(self): + def poll(self) -> Optional[int]: return self._sumo_proc.poll() def wait(self, timeout=None): @@ -234,9 +234,9 @@ class TraciConn: def __init__( self, + sumo_process: SumoProcess, host: str = "localhost", name: str = "", - sumo_process: SumoProcess = False, ): self._traci_conn = None self._sumo_port = None diff --git a/smarts/core/utils/tests/test_traci_port_acquisition.py b/smarts/core/utils/tests/test_traci_port_acquisition.py index 5ee0bf3234..e17e94bc80 100644 --- a/smarts/core/utils/tests/test_traci_port_acquisition.py +++ b/smarts/core/utils/tests/test_traci_port_acquisition.py @@ -76,7 +76,10 @@ def run_client(t): ], sumo_binary="sumo", ) - conn = TraciConn(sumo_binary="sumo", name=f"Client@{t}", sumo_process=lsp) + conn = TraciConn( + sumo_process=lsp, + name=f"Client@{t}", + ) conn.connect( timeout=5, minimum_traci_version=20, @@ -88,8 +91,8 @@ def run_client(t): conn.close_traci_and_pipes(False) raise except Exception as err: - # logging.error("Primary occurred. [%s]", err) - # logging.exception(err) + logging.error("Primary occurred. [%s]", err) + logging.exception(err) raise finally: # try: From 2ed5b2b8abd7bd40d398f1d73bc25dd0ff6ac03a Mon Sep 17 00:00:00 2001 From: Tucker Date: Thu, 25 Jan 2024 15:16:19 -0500 Subject: [PATCH 09/17] Fix docs test. --- smarts/core/utils/sumo_server.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/sumo_server.py index 126cd5c467..3f70d3a180 100644 --- a/smarts/core/utils/sumo_server.py +++ b/smarts/core/utils/sumo_server.py @@ -25,18 +25,17 @@ import asyncio import asyncio.streams import json -import multiprocessing import os import subprocess -import time -from asyncio.streams import StreamReader, StreamWriter -from typing import Any, Dict, List, Set +from typing import TYPE_CHECKING, Set from smarts.core import config -from smarts.core.utils.file import make_dir_in_smarts_log_dir from smarts.core.utils.networking import find_free_port from smarts.core.utils.sumo_utils import SUMO_PATH +if TYPE_CHECKING: + from asyncio.streams import StreamReader, StreamWriter + class SumoServer: """A centralized server for handling SUMO instances to prevent race conditions.""" @@ -94,7 +93,7 @@ async def _process_manager(self, binary, args, f: asyncio.Future): if _sumo_proc is not None and _sumo_proc.returncode is None: _sumo_proc.kill() - async def handle_client(self, reader: StreamReader, writer: StreamWriter): + async def handle_client(self, reader, writer): """Read data from the client.""" address = writer.get_extra_info("peername") print(f"Received connection from {address}") @@ -150,8 +149,7 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter): "sumo", "server_host" ) # Use '0.0.0.0' to listen on all available interfaces _port = config()("sumo", "server_port") - _server_pool_size = 2 - config()("sumo", "server_pool_size") + _server_pool_size = config()("sumo", "server_pool_size") ss = SumoServer(_host, _port, _server_pool_size) asyncio.run(ss.start()) From ab31e24e468cf3051033126750e34e5132fba108 Mon Sep 17 00:00:00 2001 From: Tucker Date: Thu, 25 Jan 2024 16:39:22 -0500 Subject: [PATCH 10/17] Server now closes after a given time. --- smarts/core/configuration.py | 1 - smarts/core/sumo_traffic_simulation.py | 8 +- smarts/core/utils/sumo_server.py | 89 ++++++++++++++----- .../tests/test_traci_port_acquisition.py | 7 +- 4 files changed, 79 insertions(+), 26 deletions(-) diff --git a/smarts/core/configuration.py b/smarts/core/configuration.py index c8116b62fe..4938f26e94 100644 --- a/smarts/core/configuration.py +++ b/smarts/core/configuration.py @@ -72,7 +72,6 @@ def _convert_truthy(t: str) -> bool: ("ray", "num_gpus"): 0, ("ray", "num_cpus"): None, ("ray", "log_to_driver"): False, - ("sumo", "server_pool_size"): 12, ("sumo", "server_port"): 62232, ("sumo", "server_host"): "localhost", ("sumo", "serve_mode"): "local", # local|remote diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 8d94b1e31f..5a2f9689ae 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -46,6 +46,7 @@ from smarts.core.sumo_road_network import SumoRoadNetwork from smarts.core.traffic_provider import TrafficProvider from smarts.core.utils.core_logging import suppress_output +from smarts.core.utils.sumo_server import spawn_if_not from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState NO_CHECKS: Final = 0b00000 @@ -143,10 +144,13 @@ def __init__( ): self._process_factory = partial(LocalSumoProcess, self._sumo_port) elif sumo_serve_mode == "remote": + remote_host = config()("sumo", "server_host") + remote_port = config()("sumo", "server_port", cast=int) + spawn_if_not(remote_host, remote_port) self._process_factory = partial( RemoteSumoProcess, - remote_host=config()("sumo", "server_host"), - remote_port=config()("sumo", "server_port", cast=int), + remote_host=remote_host, + remote_port=remote_port, ) # start with the default recovery flags... diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/sumo_server.py index 3f70d3a180..898b1cab54 100644 --- a/smarts/core/utils/sumo_server.py +++ b/smarts/core/utils/sumo_server.py @@ -26,50 +26,63 @@ import asyncio.streams import json import os +import socket import subprocess -from typing import TYPE_CHECKING, Set +import time +from typing import Optional, Set from smarts.core import config from smarts.core.utils.networking import find_free_port from smarts.core.utils.sumo_utils import SUMO_PATH -if TYPE_CHECKING: - from asyncio.streams import StreamReader, StreamWriter - class SumoServer: """A centralized server for handling SUMO instances to prevent race conditions.""" - def __init__(self, host, port, reserved_count) -> None: + def __init__(self, host, port) -> None: self._host = host self._port = port self._used_ports: Set[int] = set() - self._reserve_count: float = reserved_count + self._last_client: float = time.time() - async def start(self): + async def start(self, timeout: Optional[float] = 60.0 * 60.0): """Start the server.""" # Create a socket object server = await asyncio.start_server(self.handle_client, self._host, self._port) address = server.sockets[0].getsockname() print(f"Server listening on `{address = }`") - async with server: - await server.serve_forever() + waitable = server.serve_forever() + if timeout is not None: + _timeout_watcher = asyncio.create_task(self._timeout_watcher(timeout)) + waitable = asyncio.gather(waitable, _timeout_watcher) + await waitable + + async def _timeout_watcher(self, timeout: float): + """Closes the server if it is not in use for `timeout` length of time.""" + + while True: + await asyncio.sleep(60) + if time.time() - self._last_client > timeout and len(self._used_ports) == 0: + print(f"Closing because `{timeout=}` was reached.") + loop = asyncio.get_event_loop() + loop.stop() async def _process_manager(self, binary, args, f: asyncio.Future): """Manages the lifecycle of the TraCI server.""" _sumo_proc = None + _port = None try: # Create a new Future object. - while (port := find_free_port()) in self._used_ports: + while (_port := find_free_port()) in self._used_ports: pass - self._used_ports.add(port) + self._used_ports.add(_port) _sumo_proc = await asyncio.create_subprocess_exec( *[ os.path.join(SUMO_PATH, "bin", binary), - f"--remote-port={port}", + f"--remote-port={_port}", ], *args, stdin=subprocess.DEVNULL, @@ -81,17 +94,18 @@ async def _process_manager(self, binary, args, f: asyncio.Future): future = loop.create_future() f.set_result( { - "port": port, + "port": _port, "future": future, } ) result = await asyncio.wait_for(future, None) finally: - if port is not None: - self._used_ports.discard(port) + if _port is not None: + self._used_ports.discard(_port) if _sumo_proc is not None and _sumo_proc.returncode is None: _sumo_proc.kill() + self._last_client = time.time() async def handle_client(self, reader, writer): """Read data from the client.""" @@ -142,14 +156,47 @@ async def handle_client(self, reader, writer): pass -if __name__ == "__main__": +def spawn_if_not(remote_host: str, remote_port: int): + """Create a new server if it does not already exist. + + Args: + remote_host (str): The host name. + remote_port (int): The host port. + """ + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + client_socket.connect((remote_host, remote_port)) + except socket.error: + if remote_host in ("localhost", "127.0.0.1"): + command = ["python", "-m", __name__] + # Use subprocess.Popen to start the process in the background + _ = subprocess.Popen( + command, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + shell=False, + ) + # Wait for process to start + client_socket.settimeout(5.0) + client_socket.connect((remote_host, remote_port)) + client_socket.close() + else: + client_socket.close() + + +def main(*_, _host=None, _port=None, timeout: Optional[float] = 10 * 60): + """The program entrypoint.""" # Define the host and port on which the server will listen - _host = config()( + _host = _host or config()( "sumo", "server_host" ) # Use '0.0.0.0' to listen on all available interfaces - _port = config()("sumo", "server_port") - _server_pool_size = config()("sumo", "server_pool_size") + _port = _port or config()("sumo", "server_port") - ss = SumoServer(_host, _port, _server_pool_size) - asyncio.run(ss.start()) + ss = SumoServer(_host, _port) + asyncio.run(ss.start(timeout=timeout)) + + +if __name__ == "__main__": + main(timeout=None) diff --git a/smarts/core/utils/tests/test_traci_port_acquisition.py b/smarts/core/utils/tests/test_traci_port_acquisition.py index e17e94bc80..3d6d71c144 100644 --- a/smarts/core/utils/tests/test_traci_port_acquisition.py +++ b/smarts/core/utils/tests/test_traci_port_acquisition.py @@ -61,6 +61,7 @@ def run_client(t): + conn = None try: f = os.path.abspath(make_dir_in_smarts_log_dir("_sumo_run_logs")) + f"/{t}" lsp = RemoteSumoProcess( @@ -88,7 +89,8 @@ def run_client(t): time.sleep(0.1) conn.getVersion() except KeyboardInterrupt: - conn.close_traci_and_pipes(False) + if conn is not None: + conn.close_traci_and_pipes(False) raise except Exception as err: logging.error("Primary occurred. [%s]", err) @@ -102,7 +104,8 @@ def run_client(t): diff = time.time() - t if diff > 9: logging.error("Client took %ss to close", diff) - conn.teardown() + if conn is not None: + conn.teardown() def test_traffic_sim_with_multi_client(): From 76276725bd79a44dee63ded73df1fb893739ee83 Mon Sep 17 00:00:00 2001 From: Tucker Date: Thu, 25 Jan 2024 17:17:04 -0500 Subject: [PATCH 11/17] Set "remote" as default. --- smarts/core/configuration.py | 2 +- smarts/core/utils/sumo_server.py | 22 +++++++++++++++------- smarts/core/utils/sumo_utils.py | 17 ++++++++++++++++- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/smarts/core/configuration.py b/smarts/core/configuration.py index 4938f26e94..ac83536d3b 100644 --- a/smarts/core/configuration.py +++ b/smarts/core/configuration.py @@ -74,7 +74,7 @@ def _convert_truthy(t: str) -> bool: ("ray", "log_to_driver"): False, ("sumo", "server_port"): 62232, ("sumo", "server_host"): "localhost", - ("sumo", "serve_mode"): "local", # local|remote + ("sumo", "serve_mode"): "remote", # local|remote } diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/sumo_server.py index 898b1cab54..244269cb33 100644 --- a/smarts/core/utils/sumo_server.py +++ b/smarts/core/utils/sumo_server.py @@ -22,6 +22,7 @@ from __future__ import annotations +import argparse import asyncio import asyncio.streams import json @@ -166,9 +167,9 @@ def spawn_if_not(remote_host: str, remote_port: int): client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: client_socket.connect((remote_host, remote_port)) - except socket.error: + except (OSError): if remote_host in ("localhost", "127.0.0.1"): - command = ["python", "-m", __name__] + command = ["python", "-m", __name__, "--timeout", "600"] # Use subprocess.Popen to start the process in the background _ = subprocess.Popen( @@ -177,11 +178,9 @@ def spawn_if_not(remote_host: str, remote_port: int): stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=False, + close_fds=True, ) - # Wait for process to start - client_socket.settimeout(5.0) - client_socket.connect((remote_host, remote_port)) - client_socket.close() + else: client_socket.close() @@ -199,4 +198,13 @@ def main(*_, _host=None, _port=None, timeout: Optional[float] = 10 * 60): if __name__ == "__main__": - main(timeout=None) + parser = argparse.ArgumentParser(__name__) + parser.add_argument( + "--timeout", + help="Duration of time until server shuts down if not in use.", + type=float, + default=None, + ) + args = parser.parse_args() + + main(timeout=args.timeout) diff --git a/smarts/core/utils/sumo_utils.py b/smarts/core/utils/sumo_utils.py index c142f30740..c6b58a6f76 100644 --- a/smarts/core/utils/sumo_utils.py +++ b/smarts/core/utils/sumo_utils.py @@ -32,6 +32,7 @@ import socket import subprocess import sys +import time from typing import Any, List, Literal, Optional, Tuple from smarts.core.utils import networking @@ -150,7 +151,21 @@ def generate( self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" ): client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client_socket.connect((self._remote_host, self._remote_port)) + + # Wait on server to start if it needs to. + error = None + for _ in range(5): + try: + client_socket.connect((self._remote_host, self._remote_port)) + except OSError as err: + time.sleep(0.1) + error = err + continue + break + else: + raise error or OSError( + f"Unable to connect to server {self._remote_host}:{self._remote_port}" + ) client_socket.send(f"{sumo_binary}:{json.dumps(base_params)}\n".encode("utf-8")) From 0e04b4f3eb491a6f3acb3e449aa31f17e6acd790 Mon Sep 17 00:00:00 2001 From: Montgomery Alban Date: Thu, 25 Jan 2024 22:56:13 +0000 Subject: [PATCH 12/17] attempt to fix tests --- smarts/core/utils/sumo_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smarts/core/utils/sumo_utils.py b/smarts/core/utils/sumo_utils.py index c6b58a6f76..b3fa055a99 100644 --- a/smarts/core/utils/sumo_utils.py +++ b/smarts/core/utils/sumo_utils.py @@ -158,7 +158,7 @@ def generate( try: client_socket.connect((self._remote_host, self._remote_port)) except OSError as err: - time.sleep(0.1) + time.sleep(1) error = err continue break From 636a06fb088746ab2152a28b4f397c8a158bbb7a Mon Sep 17 00:00:00 2001 From: Montgomery Alban Date: Fri, 26 Jan 2024 16:22:52 +0000 Subject: [PATCH 13/17] Update the disconnect error message. --- smarts/core/utils/sumo_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/smarts/core/utils/sumo_utils.py b/smarts/core/utils/sumo_utils.py index b3fa055a99..1f806193ea 100644 --- a/smarts/core/utils/sumo_utils.py +++ b/smarts/core/utils/sumo_utils.py @@ -163,9 +163,9 @@ def generate( continue break else: - raise error or OSError( - f"Unable to connect to server {self._remote_host}:{self._remote_port}" - ) + raise OSError( + f"Unable to connect to server {self._remote_host}:{self._remote_port}. Try running again or running the server using `python -m smarts.core.utils.sumo_server`." + ) from error client_socket.send(f"{sumo_binary}:{json.dumps(base_params)}\n".encode("utf-8")) @@ -338,7 +338,7 @@ def connect( ) # e.g. "SUMO 1.11.0" -> (1, 11, 0) if self._sumo_version < minimum_sumo_version: raise ValueError(f"SUMO version must be >= SUMO {minimum_sumo_version}") - except (traci.exceptions.FatalTraCIError) as err: + except traci.exceptions.FatalTraCIError as err: self._log.error( "[%s] TraCI disconnected for connection attempt '%s:%s': [%s]", self._name, From f2e93449aeec0287917461cdcd6dc790a40ac5b3 Mon Sep 17 00:00:00 2001 From: Montgomery Alban Date: Fri, 26 Jan 2024 16:30:16 +0000 Subject: [PATCH 14/17] Use a lower port. --- smarts/core/configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smarts/core/configuration.py b/smarts/core/configuration.py index ac83536d3b..29b8e2f067 100644 --- a/smarts/core/configuration.py +++ b/smarts/core/configuration.py @@ -72,7 +72,7 @@ def _convert_truthy(t: str) -> bool: ("ray", "num_gpus"): 0, ("ray", "num_cpus"): None, ("ray", "log_to_driver"): False, - ("sumo", "server_port"): 62232, + ("sumo", "server_port"): 8619, ("sumo", "server_host"): "localhost", ("sumo", "serve_mode"): "remote", # local|remote } From 727ad1a17244161fd2107167b614c95872f8633b Mon Sep 17 00:00:00 2001 From: Montgomery Alban Date: Fri, 26 Jan 2024 22:13:27 +0000 Subject: [PATCH 15/17] Set default back to local. --- smarts/core/configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smarts/core/configuration.py b/smarts/core/configuration.py index 29b8e2f067..e031dd1d45 100644 --- a/smarts/core/configuration.py +++ b/smarts/core/configuration.py @@ -74,7 +74,7 @@ def _convert_truthy(t: str) -> bool: ("ray", "log_to_driver"): False, ("sumo", "server_port"): 8619, ("sumo", "server_host"): "localhost", - ("sumo", "serve_mode"): "remote", # local|remote + ("sumo", "serve_mode"): "local", # local|remote } From 05a908860c6f04e465a5a83f912ac4a9e412c0d0 Mon Sep 17 00:00:00 2001 From: Tucker Date: Wed, 31 Jan 2024 13:49:38 -0500 Subject: [PATCH 16/17] Finalize changes. --- CHANGELOG.md | 1 + docs/ecosystem/sumo.rst | 29 +++++++++++++++++++ docs/resources/faq.rst | 3 ++ docs/spelling_wordlist.txt | 1 + smarts/core/configuration.py | 6 ++-- smarts/core/sumo_traffic_simulation.py | 10 +++---- ..._server.py => centralized_traci_server.py} | 26 +++++++++++++---- .../tests/test_traci_port_acquisition.py | 4 +-- 8 files changed, 64 insertions(+), 16 deletions(-) rename smarts/core/utils/{sumo_server.py => centralized_traci_server.py} (93%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65335b5459..23f720b557 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Copy and pasting the git commit messages is __NOT__ enough. - The following methods now exist explicitly `Vehicle.{add_sensor|detach_sensor|subscribed_to|sensor_property|}`. - Resources loaded with `load_yaml_config_with_substitution()` now substitute in SMARTS configuration with single squiggle bracket `${}` syntax. This is currently restricted to environment variable names prefixed with `"SMARTS_"`. This extends to benchmark configuration and vehicle configuration. - Default vehicle definitions can be now modified using `assets:default_vehicle_definitions_list`/`SMARTS_ASSSETS_DEFAULT_VEHICLE_DEFINITIONS_LIST` or by providing a `vehicle_definitions_list.yaml` in the scenario. These vehicle types are related to the `AgentInterface.vehicle_type` attribute. +- There is now a centralized `TraCI` server mananger that can be used to prevent port collisions. It can be run using `python smarts.core.utils.sumo_server` and the use of the server can be enabled with `SMARTS_SUMO_TRACI_SERVE_MODE="central"`. ### Changed - `VehicleIndex.build_agent_vehicle()` no longer has `filename` and `surface_patches` parameters. - The following modules have been renamed: `envision.types` -> `envision.etypes`, `smarts.core.utils.logging` -> `smarts.core.utils.core_logging`, `smarts.core.utils.math` -> `smarts.core.utils.core_math`, `smarts.sstudio.types` -> `smarts.sstudio.sstypes`. For compatibility reasons they can still be imported by their original module name. diff --git a/docs/ecosystem/sumo.rst b/docs/ecosystem/sumo.rst index 020eb68245..34df393846 100644 --- a/docs/ecosystem/sumo.rst +++ b/docs/ecosystem/sumo.rst @@ -13,6 +13,35 @@ SMARTS currently directly installs SUMO version >=1.15.0 via `pip`. Alternative installation methods, albeit more difficult, are described below. +Centralized TraCI management +---------------------------- +.. _centralized_traci_management: + +With the default behaviour each SMARTS instance will attempt to ask the operating system + for a port to generate a ``TraCI`` server on which can result in cross-connection of SMARTS and ``TraCI`` server instances. + +.. code-block:: bash + + ## console 1 (or in background OR on remote machine) + # Run the centralized sumo port management server. + # Use `export SMARTS_SUMO_CENTRAL_PORT=62232` or `--port=62232` + $ python -m smarts.core.utils.centralized_traci_server + +By setting ``SMARTS_SUMO_TRACI_SERVE_MODE`` to ``"central"`` SMARTS will use the ``TraCI`` management server. + +.. code-block:: bash + + ## console 2 + ## Set environment variable to switch to the server. + # This can also be set in the engine configuration. + $ export SMARTS_SUMO_TRACI_SERVE_MODE=central + ## Optional configuration + # export SMARTS_SUMO_CENTRAL_HOST=localhost + # export SMARTS_SUMO_CENTRAL_PORT=62232 + ## do run + $ python experiment.py + + Package managers ---------------- diff --git a/docs/resources/faq.rst b/docs/resources/faq.rst index 61c20489fd..6453402ddf 100644 --- a/docs/resources/faq.rst +++ b/docs/resources/faq.rst @@ -24,3 +24,6 @@ This is a list of frequently asked questions. Feel free to suggest new entries! # Set xorg server $ sudo wget -O /etc/X11/xorg.conf http://xpra.org/xorg.conf $ sudo /usr/bin/Xorg -noreset +extension GLX +extension RANDR +extension RENDER -logfile ./xdummy.log -config /etc/X11/xorg.conf $DISPLAY & 0 + +4. The simulation keeps crashing on connection in ``SumoTrafficSimulation``. How do I fix this? + This is likely due to using large scale parallelization. You will want to use the centralized management server. See :ref:`centralized_traci_management`. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 7a91088731..ced97e6deb 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -73,6 +73,7 @@ superclass terminateds timestep Todo +TraCI travelled truncateds unassociated diff --git a/smarts/core/configuration.py b/smarts/core/configuration.py index e031dd1d45..8e381513d4 100644 --- a/smarts/core/configuration.py +++ b/smarts/core/configuration.py @@ -72,9 +72,9 @@ def _convert_truthy(t: str) -> bool: ("ray", "num_gpus"): 0, ("ray", "num_cpus"): None, ("ray", "log_to_driver"): False, - ("sumo", "server_port"): 8619, - ("sumo", "server_host"): "localhost", - ("sumo", "serve_mode"): "local", # local|remote + ("sumo", "central_port"): 8619, + ("sumo", "central_host"): "localhost", + ("sumo", "traci_serve_mode"): "local", # local|central } diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 5a2f9689ae..0389bcabfc 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -46,7 +46,7 @@ from smarts.core.sumo_road_network import SumoRoadNetwork from smarts.core.traffic_provider import TrafficProvider from smarts.core.utils.core_logging import suppress_output -from smarts.core.utils.sumo_server import spawn_if_not +from smarts.core.utils.centralized_traci_server import spawn_if_not from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState NO_CHECKS: Final = 0b00000 @@ -140,12 +140,12 @@ def __init__( if ( self._sumo_port is not None - or (sumo_serve_mode := config()("sumo", "serve_mode")) == "local" + or (traci_serve_mode := config()("sumo", "traci_serve_mode")) == "local" ): self._process_factory = partial(LocalSumoProcess, self._sumo_port) - elif sumo_serve_mode == "remote": - remote_host = config()("sumo", "server_host") - remote_port = config()("sumo", "server_port", cast=int) + elif traci_serve_mode == "central": + remote_host = config()("sumo", "central_host") + remote_port = config()("sumo", "central_port", cast=int) spawn_if_not(remote_host, remote_port) self._process_factory = partial( RemoteSumoProcess, diff --git a/smarts/core/utils/sumo_server.py b/smarts/core/utils/centralized_traci_server.py similarity index 93% rename from smarts/core/utils/sumo_server.py rename to smarts/core/utils/centralized_traci_server.py index 244269cb33..814694b902 100644 --- a/smarts/core/utils/sumo_server.py +++ b/smarts/core/utils/centralized_traci_server.py @@ -37,7 +37,7 @@ from smarts.core.utils.sumo_utils import SUMO_PATH -class SumoServer: +class CentralizedTraCIServer: """A centralized server for handling SUMO instances to prevent race conditions.""" def __init__(self, host, port) -> None: @@ -169,7 +169,15 @@ def spawn_if_not(remote_host: str, remote_port: int): client_socket.connect((remote_host, remote_port)) except (OSError): if remote_host in ("localhost", "127.0.0.1"): - command = ["python", "-m", __name__, "--timeout", "600"] + command = [ + "python", + "-m", + __name__, + "--timeout", + "600", + "--port", + remote_port, + ] # Use subprocess.Popen to start the process in the background _ = subprocess.Popen( @@ -189,11 +197,11 @@ def main(*_, _host=None, _port=None, timeout: Optional[float] = 10 * 60): """The program entrypoint.""" # Define the host and port on which the server will listen _host = _host or config()( - "sumo", "server_host" + "sumo", "central_host" ) # Use '0.0.0.0' to listen on all available interfaces - _port = _port or config()("sumo", "server_port") + _port = _port or config()("sumo", "central_port") - ss = SumoServer(_host, _port) + ss = CentralizedTraCIServer(_host, _port) asyncio.run(ss.start(timeout=timeout)) @@ -205,6 +213,12 @@ def main(*_, _host=None, _port=None, timeout: Optional[float] = 10 * 60): type=float, default=None, ) + parser.add_argument( + "--port", + help="The port to host on.", + type=int, + default=None, + ) args = parser.parse_args() - main(timeout=args.timeout) + main(_port=args.port, timeout=args.timeout) diff --git a/smarts/core/utils/tests/test_traci_port_acquisition.py b/smarts/core/utils/tests/test_traci_port_acquisition.py index 3d6d71c144..a541a9c5c3 100644 --- a/smarts/core/utils/tests/test_traci_port_acquisition.py +++ b/smarts/core/utils/tests/test_traci_port_acquisition.py @@ -65,8 +65,8 @@ def run_client(t): try: f = os.path.abspath(make_dir_in_smarts_log_dir("_sumo_run_logs")) + f"/{t}" lsp = RemoteSumoProcess( - remote_host=config()("sumo", "server_host"), - remote_port=config()("sumo", "server_port", cast=int), + remote_host=config()("sumo", "central_host"), + remote_port=config()("sumo", "central_port", cast=int), ) lsp.generate( base_params=load_params From adbc2eddda38e40582fb6caaad77fddc6cac21d9 Mon Sep 17 00:00:00 2001 From: Tucker Date: Wed, 31 Jan 2024 13:50:00 -0500 Subject: [PATCH 17/17] Make format --- smarts/core/sumo_traffic_simulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 0389bcabfc..85596d26c3 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -45,8 +45,8 @@ from smarts.core.signals import SignalLightState, SignalState from smarts.core.sumo_road_network import SumoRoadNetwork from smarts.core.traffic_provider import TrafficProvider -from smarts.core.utils.core_logging import suppress_output from smarts.core.utils.centralized_traci_server import spawn_if_not +from smarts.core.utils.core_logging import suppress_output from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState NO_CHECKS: Final = 0b00000