diff --git a/.github/workflows/ci-auto-commit-linux.yml b/.github/workflows/ci-auto-commit-linux.yml index 41304ab5a7..45790d4450 100644 --- a/.github/workflows/ci-auto-commit-linux.yml +++ b/.github/workflows/ci-auto-commit-linux.yml @@ -24,7 +24,7 @@ jobs: - name: Add safe directory run: git config --global --add safe.directory $GITHUB_WORKSPACE - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: ref: master - name: Install SMARTS diff --git a/.github/workflows/ci-auto-commit-mac.yml b/.github/workflows/ci-auto-commit-mac.yml index 2a45d2bd13..ed20770121 100644 --- a/.github/workflows/ci-auto-commit-mac.yml +++ b/.github/workflows/ci-auto-commit-mac.yml @@ -11,20 +11,20 @@ env: jobs: auto-commit-mac: - runs-on: macos-11 + runs-on: macos-12 steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: ref: master - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.9" - name: Update requirements run: | cd $GITHUB_WORKSPACE - python3.8 -m venv ${{env.venv_dir}} + python3.9 -m venv ${{env.venv_dir}} . ${{env.venv_dir}}/bin/activate pip install --upgrade pip pip install wheel==0.38.4 diff --git a/.github/workflows/ci-base-tests-linux.yml b/.github/workflows/ci-base-tests-linux.yml index 7d339cd519..b15940a1a0 100644 --- a/.github/workflows/ci-base-tests-linux.yml +++ b/.github/workflows/ci-base-tests-linux.yml @@ -23,7 +23,7 @@ jobs: - ./examples/tests/test_examples.py steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Install dependencies run: | python3.8 -m venv ${{env.venv_dir}} @@ -65,6 +65,7 @@ jobs: --ignore=./smarts/core/tests/test_smarts_memory_growth.py \ --ignore=./smarts/core/tests/test_env_frame_rate.py \ --ignore=./smarts/env/tests/test_benchmark.py \ + --ignore=./smarts/core/utils/tests/test_traci_port_acquisition.py \ -k 'not test_long_determinism' examples-rl: @@ -78,7 +79,7 @@ jobs: - e11_platoon steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Install dependencies run: | cd ${GITHUB_WORKSPACE}/examples/${{matrix.tests}} @@ -110,7 +111,7 @@ jobs: - e11_platoon steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Install dependencies run: | cd ${GITHUB_WORKSPACE} diff --git a/.github/workflows/ci-base-tests-mac.yml b/.github/workflows/ci-base-tests-mac.yml index 3a541f299d..9e2bf542e2 100644 --- a/.github/workflows/ci-base-tests-mac.yml +++ b/.github/workflows/ci-base-tests-mac.yml @@ -12,7 +12,7 @@ env: jobs: base-tests-mac: - runs-on: macos-11 + runs-on: macos-12 strategy: matrix: tests: @@ -26,15 +26,13 @@ jobs: - ./smarts/ray steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: ref: master - - name: Setup Python - run: | - brew update - brew install python@3.8 - brew unlink python@3.9 - brew link --force --overwrite python@3.8 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.9" - name: Setup SUMO run: | brew install xquartz @@ -43,7 +41,7 @@ jobs: brew install geos - name: Install dependencies run: | - python3.8 -m venv ${{env.venv_dir}} + python3.9 -m venv ${{env.venv_dir}} . ${{env.venv_dir}}/bin/activate pip install --upgrade pip pip install wheel==0.38.4 @@ -71,4 +69,5 @@ jobs: --ignore=./smarts/core/tests/test_renderers.py \ --ignore=./smarts/core/tests/test_smarts.py \ --ignore=./smarts/core/tests/test_env_frame_rate.py \ - --ignore=./smarts/core/tests/test_observations.py + --ignore=./smarts/core/tests/test_observations.py \ + --ignore=./smarts/core/utils/tests/test_traci_port_acquisition.py diff --git a/.github/workflows/ci-format.yml b/.github/workflows/ci-format.yml index cff1792ff6..7e0dcb1f28 100644 --- a/.github/workflows/ci-format.yml +++ b/.github/workflows/ci-format.yml @@ -12,7 +12,7 @@ jobs: container: ghcr.io/smarts-project/smarts:v0.6.1-minimal steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Check header run: | cd $GITHUB_WORKSPACE @@ -24,7 +24,7 @@ jobs: container: ghcr.io/smarts-project/smarts:v0.6.1-minimal steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Check docstring run: | cd $GITHUB_WORKSPACE @@ -51,7 +51,7 @@ jobs: - name: Add safe directory run: git config --global --add safe.directory $GITHUB_WORKSPACE - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: fetch-depth: 0 - name: Install SMARTS @@ -85,7 +85,7 @@ jobs: - name: Add safe directory run: git config --global --add safe.directory $GITHUB_WORKSPACE - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: fetch-depth: 0 - name: Install Enchant @@ -123,7 +123,7 @@ jobs: - name: Add safe directory run: git config --global --add safe.directory $GITHUB_WORKSPACE - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: fetch-depth: 0 - name: Install SMARTS with all extras and check for conflicts diff --git a/.github/workflows/ci-pull-request.yml b/.github/workflows/ci-pull-request.yml index 33705af060..fe3c8ca8c0 100644 --- a/.github/workflows/ci-pull-request.yml +++ b/.github/workflows/ci-pull-request.yml @@ -14,7 +14,7 @@ jobs: container: ghcr.io/smarts-project/smarts:v0.6.1-minimal steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Setup package run: | cd $GITHUB_WORKSPACE diff --git a/.github/workflows/ci-python-version-test.yml b/.github/workflows/ci-python-version-test.yml index 58f884ac76..122918eb88 100644 --- a/.github/workflows/ci-python-version-test.yml +++ b/.github/workflows/ci-python-version-test.yml @@ -15,7 +15,7 @@ jobs: matrix: python-version: ['3.8', '3.9', '3.10', '3.11'] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: diff --git a/.github/workflows/ci-test-learning.yml b/.github/workflows/ci-test-learning.yml index d991a220a9..3bc301981c 100644 --- a/.github/workflows/ci-test-learning.yml +++ b/.github/workflows/ci-test-learning.yml @@ -17,7 +17,7 @@ jobs: container: ghcr.io/smarts-project/smarts:v0.6.1-minimal steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: ref: master - name: Setup package diff --git a/.github/workflows/ci-test-long-determinism.yml b/.github/workflows/ci-test-long-determinism.yml index cea63fdc4b..b9ad6dc430 100644 --- a/.github/workflows/ci-test-long-determinism.yml +++ b/.github/workflows/ci-test-long-determinism.yml @@ -17,7 +17,7 @@ jobs: container: ghcr.io/smarts-project/smarts:v0.6.1-minimal steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: ref: master - name: Setup package diff --git a/.github/workflows/ci-test-memory-growth.yml b/.github/workflows/ci-test-memory-growth.yml index 8cd57f47a0..e0066860b6 100644 --- a/.github/workflows/ci-test-memory-growth.yml +++ b/.github/workflows/ci-test-memory-growth.yml @@ -17,7 +17,7 @@ jobs: container: ghcr.io/smarts-project/smarts:v0.6.1-minimal steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: ref: master - name: Setup package diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ec2fac88f..ebd4d045d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Copy and pasting the git commit messages is __NOT__ enough. - Default vehicle definitions can be now modified using `assets:default_vehicle_definitions_list`/`SMARTS_ASSSETS_DEFAULT_VEHICLE_DEFINITIONS_LIST` or by providing a `vehicle_definitions_list.yaml` in the scenario. These vehicle types are related to the `AgentInterface.vehicle_type` attribute. - New `CustomRender` agent interface option added. This allows using `glsl` fragment scripts to generate images from camera textures and simulation buffers. - New `ObfuscationMap` agent interface option added. This uses the `OccupancyGridMap` to help generate an image of ground viewable area from the ego vehicle's perspective. +- There is now a centralized `TraCI` server mananger that can be used to prevent port collisions. It can be run using `python smarts.core.utils.sumo_server` and the use of the server can be enabled with `SMARTS_SUMO_TRACI_SERVE_MODE="central"`. ### Changed - `VehicleIndex.build_agent_vehicle()` no longer has `filename` and `surface_patches` parameters. - The following modules have been renamed: `envision.types` -> `envision.etypes`, `smarts.core.utils.logging` -> `smarts.core.utils.core_logging`, `smarts.core.utils.math` -> `smarts.core.utils.core_math`, `smarts.sstudio.types` -> `smarts.sstudio.sstypes`. For compatibility reasons they can still be imported by their original module name. @@ -42,6 +43,7 @@ Copy and pasting the git commit messages is __NOT__ enough. - Deprecated a few things related to vehicles in the `Scenario` class, including the `vehicle_filepath`, `tire_parameters_filepath`, and `controller_parameters_filepath`. The functionality is now handled through the vehicle definitions. - `AgentInterface.vehicle_type` is now deprecated with potential to be restored. ### Fixed +- The performance of SUMO roadmap queries using `SumoRoadNetwork.{nearest_lanes|nearest_lane|offset_along_lane}()` have been greatly improved for long lanes. - `SumoTrafficSimulation` gives clearer reasons as to why it failed to connect to the TraCI server. - Suppressed an issue where `pybullet_utils.pybullet.BulletClient` would cause an error because it was catching a non `BaseException` type. - Fixed a bug where `smarts.core.vehicle_index.VehicleIndex.attach_sensors_to_vehicle()` would pass a method instead of a `PlanFrame` to the generated vehicle `SensorState`. @@ -65,6 +67,7 @@ Copy and pasting the git commit messages is __NOT__ enough. - Fixed a bug where vehicle sensor meta attributes would reference the wrong vehicle. - Resolved issue with road waypoints not showing waypoints if the horizon was larger than the start of the lane. - Fixed an issue where `SMARTS.reset()` would be unable to render cameras. +- Squashed TraCI "retrying" stdout messages. ### Removed ### Security diff --git a/docs/conf.py b/docs/conf.py index 6649acaf37..a68811692b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -42,8 +42,8 @@ "sphinx.ext.viewcode", # link to sourcecode from docs "sphinx_rtd_theme", # Read The Docs theme "sphinx_click", # extract documentation from a `click` application - "sphinxcontrib.apidoc", - "sphinxcontrib.spelling", + "sphinxcontrib.apidoc", # automatically document the API + "sphinxcontrib.spelling", # check documentation for spelling ] extlinks = { diff --git a/docs/ecosystem/sumo.rst b/docs/ecosystem/sumo.rst index 020eb68245..34df393846 100644 --- a/docs/ecosystem/sumo.rst +++ b/docs/ecosystem/sumo.rst @@ -13,6 +13,35 @@ SMARTS currently directly installs SUMO version >=1.15.0 via `pip`. Alternative installation methods, albeit more difficult, are described below. +Centralized TraCI management +---------------------------- +.. _centralized_traci_management: + +With the default behaviour each SMARTS instance will attempt to ask the operating system + for a port to generate a ``TraCI`` server on which can result in cross-connection of SMARTS and ``TraCI`` server instances. + +.. code-block:: bash + + ## console 1 (or in background OR on remote machine) + # Run the centralized sumo port management server. + # Use `export SMARTS_SUMO_CENTRAL_PORT=62232` or `--port=62232` + $ python -m smarts.core.utils.centralized_traci_server + +By setting ``SMARTS_SUMO_TRACI_SERVE_MODE`` to ``"central"`` SMARTS will use the ``TraCI`` management server. + +.. code-block:: bash + + ## console 2 + ## Set environment variable to switch to the server. + # This can also be set in the engine configuration. + $ export SMARTS_SUMO_TRACI_SERVE_MODE=central + ## Optional configuration + # export SMARTS_SUMO_CENTRAL_HOST=localhost + # export SMARTS_SUMO_CENTRAL_PORT=62232 + ## do run + $ python experiment.py + + Package managers ---------------- diff --git a/docs/resources/faq.rst b/docs/resources/faq.rst index 3a0c72ed76..6fd5ee84d1 100644 --- a/docs/resources/faq.rst +++ b/docs/resources/faq.rst @@ -31,9 +31,13 @@ This is a list of frequently asked questions. Feel free to suggest new entries! # Do as needed: $ sudo /usr/bin/Xorg -noreset +extension GLX +extension RANDR +extension RENDER -logfile ./xdummy.log -config /etc/X11/xorg.conf $DISPLAY & + Note that ``mesa-utils`` installs ``llvm``, which is one option out of several that emulate ``OpenGL`` using software. ``llvm`` is not needed if a GPU is available. 4. Custom rendering and Obfuscation maps show completely blank. (Ubuntu) This is due to needing ``OpenGL`` to render using scripts. If you have a GPU make sure ``OpenGL`` is installed and the GPU has the necessary drivers for rendering. See the previous question if you need software rendering. + +5. The simulation keeps crashing on connection in ``SumoTrafficSimulation``. How do I fix this? + This is likely due to using large scale parallelization. You will want to use the centralized management server. See :ref:`centralized_traci_management`. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 14211cbfc1..e838cc56ee 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -75,6 +75,7 @@ superclass terminateds timestep Todo +TraCI travelled truncateds unassociated diff --git a/examples/e4_environment_config.py b/examples/e4_environment_config.py index f9d6476afb..870e73a422 100644 --- a/examples/e4_environment_config.py +++ b/examples/e4_environment_config.py @@ -8,7 +8,7 @@ from tools.argument_parser import empty_parser from smarts.core.agent_interface import AgentInterface, AgentType -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.env.configs.hiway_env_configs import EnvReturnMode from smarts.env.gymnasium.hiway_env_v1 import HiWayEnvV1 from smarts.env.utils.action_conversion import ActionOptions diff --git a/examples/tools/sumo_multi_clients.py b/examples/tools/sumo_multi_clients.py index b79e83eeed..4b49e27f05 100644 --- a/examples/tools/sumo_multi_clients.py +++ b/examples/tools/sumo_multi_clients.py @@ -3,7 +3,7 @@ import threading import time -from smarts.core.utils.sumo import SUMO_PATH, sumolib, traci +from smarts.core.utils.sumo_utils import SUMO_PATH, sumolib, traci PORT = 8001 diff --git a/requirements.txt b/requirements.txt index ea7ad77372..215592c001 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,15 @@ -absl-py==2.0.0 +absl-py==2.1.0 aiosignal==1.3.1 astunparse==1.6.3 attrs==23.2.0 Automat==22.10.0 cachetools==5.3.2 -certifi==2023.11.17 +certifi==2024.2.2 charset-normalizer==3.3.2 click==8.1.7 cloudpickle==3.0.0 constantly==23.10.4 -coverage==7.4.0 +coverage==7.4.1 decorator==5.1.1 dm-tree==0.1.8 exceptiongroup==1.2.0 @@ -20,10 +20,10 @@ frozenlist==1.4.1 fsspec==2023.12.2 future==0.18.3 gast==0.4.0 -google-auth==2.26.1 +google-auth==2.27.0 google-auth-oauthlib==1.0.0 google-pasta==0.2.0 -grpcio==1.60.0 +grpcio==1.60.1 h5py==3.10.0 hyperlink==21.0.0 idna==3.6 @@ -31,13 +31,13 @@ importlib-metadata==7.0.1 importlib-resources==6.1.1 incremental==22.10.0 iniconfig==2.0.0 -Jinja2==3.1.2 -jsonschema==4.20.0 +Jinja2==3.1.3 +jsonschema==4.21.1 jsonschema-specifications==2023.12.1 keras==2.13.1 libclang==16.0.6 -Markdown==3.5.1 -MarkupSafe==2.1.3 +Markdown==3.5.2 +MarkupSafe==2.1.5 mpmath==1.3.0 msgpack==1.0.7 networkx==3.1 @@ -64,25 +64,25 @@ panda3d-gltf==0.13 panda3d-simplepbr==0.10 pillow==10.2.0 pkgutil_resolve_name==1.3.10 -pluggy==1.3.0 -protobuf==4.25.1 -psutil==5.9.7 +pluggy==1.4.0 +protobuf==4.25.2 +psutil==5.9.8 py==1.11.0 py-cpuinfo==9.0.0 pyasn1==0.5.1 pyasn1-modules==0.3.0 pybullet==3.2.6 -pytest==7.4.4 +pytest==8.0.0 pytest-benchmark==4.0.0 pytest-cov==4.1.0 pytest-forked==1.6.0 pytest-xdist==3.5.0 PyYAML==6.0.1 ray==2.9.0 -referencing==0.32.1 +referencing==0.33.0 requests==2.31.0 requests-oauthlib==1.3.1 -rpds-py==0.16.2 +rpds-py==0.17.1 rsa==4.9 scipy==1.10.1 shapely==2.0.2 @@ -99,11 +99,11 @@ termcolor==2.4.0 tomli==2.0.1 torch==2.1.2 torchvision==0.16.2 -trimesh==4.0.8 +trimesh==4.1.3 triton==2.1.0 Twisted==23.10.0 typing_extensions==4.5.0 -urllib3==2.1.0 +urllib3==2.2.0 wcwidth==0.2.13 Werkzeug==3.0.1 wrapt==1.16.0 diff --git a/smarts/core/bubble_manager.py b/smarts/core/bubble_manager.py index 62671f974b..c81acc234c 100644 --- a/smarts/core/bubble_manager.py +++ b/smarts/core/bubble_manager.py @@ -44,7 +44,7 @@ from smarts.core.road_map import RoadMap from smarts.core.utils.cache import cache, clear_cache from smarts.core.utils.id import SocialAgentId -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.core.vehicle import Vehicle from smarts.core.vehicle_index import VehicleIndex from smarts.sstudio.sstypes import BoidAgentActor diff --git a/smarts/core/lanepoints.py b/smarts/core/lanepoints.py index 92bf303fd4..13e9baf976 100644 --- a/smarts/core/lanepoints.py +++ b/smarts/core/lanepoints.py @@ -104,7 +104,7 @@ def from_sumo( the network, the result of this function can be used to interpolate lane-points along lanes to the desired granularity. """ - from smarts.core.utils.sumo import sumolib # isort:skip + from smarts.core.utils.sumo_utils import sumolib # isort:skip from sumolib.net.edge import Edge # isort:skip from sumolib.net.lane import Lane # isort:skip from .sumo_road_network import SumoRoadNetwork diff --git a/smarts/core/local_traffic_provider.py b/smarts/core/local_traffic_provider.py index 46f0a6404c..f8d86308af 100644 --- a/smarts/core/local_traffic_provider.py +++ b/smarts/core/local_traffic_provider.py @@ -35,6 +35,8 @@ from shapely.affinity import rotate as shapely_rotate from shapely.geometry import box as shapely_box +from smarts.core.utils.core_logging import timeit + from .actor import ActorRole, ActorState from .coordinates import Dimensions, Heading, Point, Pose, RefLinePoint from .provider import Provider, ProviderManager, ProviderRecoveryFlags, ProviderState @@ -235,20 +237,22 @@ def _create_actor_caches(self): hhx, hhy = radians_to_vec(ovs.pose.heading) * (0.5 * length) back = Point(center.x - hhx, center.y - hhy) front = Point(center.x + hhx, center.y + hhy) - back_lane = self.road_map.nearest_lane(back, radius=length) - front_lane = self.road_map.nearest_lane(front, radius=length) + back_lane = self.road_map.nearest_lane(back, radius=length * 0.5) + front_lane = self.road_map.nearest_lane(front, radius=length * 0.5) if back_lane: back_offset = back_lane.offset_along_lane(back) lbc = self._lane_bumpers_cache.setdefault(back_lane, []) - insort(lbc, (back_offset, ovs, 1)) + lbc.append((back_offset, ovs, 1)) if front_lane: front_offset = front_lane.offset_along_lane(front) lbc = self._lane_bumpers_cache.setdefault(front_lane, []) - insort(lbc, (front_offset, ovs, 2)) + lbc.append((front_offset, ovs, 2)) if front_lane and back_lane != front_lane: # it's changing lanes, don't misjudge the target lane... fake_back_offset = front_lane.offset_along_lane(back) - insort(self._lane_bumpers_cache[front_lane], (fake_back_offset, ovs, 0)) + self._lane_bumpers_cache[front_lane].append((fake_back_offset, ovs, 0)) + for cache in self._lane_bumpers_cache.values(): + cache.sort() def _cached_lane_offset(self, vs: VehicleState, lane: RoadMap.Lane): lane_offsets = self._offsets_cache.setdefault(vs.actor_id, dict()) @@ -269,49 +273,53 @@ def _relinquish_actor(self, actor_state: ActorState): def step(self, actions, dt: float, elapsed_sim_time: float) -> ProviderState: sim = self._sim() assert sim - self._add_actors_for_time(elapsed_sim_time, dt) - for other in self._other_vehicle_states: - if other.actor_id in self._reserved_areas: - del self._reserved_areas[other.actor_id] + with timeit("Adding actors", self._logger.debug): + self._add_actors_for_time(elapsed_sim_time, dt) + for other in self._other_vehicle_states: + if other.actor_id in self._reserved_areas: + del self._reserved_areas[other.actor_id] # precompute nearest lanes and offsets for all vehicles and cache # (this prevents having to do it O(ovs^2) times) - self._create_actor_caches() + with timeit("Generating caches", self._logger.debug): + self._create_actor_caches() # Do state update in two passes so that we don't use next states in the # computations for actors encountered later in the iterator. - for actor in self._my_actors.values(): - actor.compute_next_state(dt) + with timeit("Computing states", self._logger.debug): + for actor in self._my_actors.values(): + actor.compute_next_state(dt) dones = set() losts = set() removed = set() remap_ids: Dict[str, str] = dict() - for actor_id, actor in self._my_actors.items(): - actor.step(dt) - if actor.finished_route: - dones.add(actor.actor_id) - elif actor.off_route: - losts.add(actor) - elif actor.teleporting: - # pybullet doesn't like it when a vehicle jumps from one side of the map to another, - # so we need to give teleporting vehicles a new id and thus a new chassis. - actor.bump_id() - remap_ids[actor_id] = actor.actor_id - for actor in losts - removed: - removed.add(actor.actor_id) - self._relinquish_actor(actor.state) - for actor_id in dones - removed: - actor = self._my_actors.get(actor_id) - if actor: - sim.provider_removing_actor(self, actor_id) - # The following is not really necessary due to the above calling teardown(), - # but it doesn't hurt... - if actor_id in self._my_actors: - del self._my_actors[actor_id] - for orig_id, new_id in remap_ids.items(): - self._my_actors[new_id] = self._my_actors[orig_id] - del self._my_actors[orig_id] + with timeit("Stepping actors", self._logger.debug): + for actor_id, actor in self._my_actors.items(): + actor.step(dt) + if actor.finished_route: + dones.add(actor.actor_id) + elif actor.off_route: + losts.add(actor) + elif actor.teleporting: + # pybullet doesn't like it when a vehicle jumps from one side of the map to another, + # so we need to give teleporting vehicles a new id and thus a new chassis. + actor.bump_id() + remap_ids[actor_id] = actor.actor_id + for actor in losts - removed: + removed.add(actor.actor_id) + self._relinquish_actor(actor.state) + for actor_id in dones - removed: + actor = self._my_actors.get(actor_id) + if actor: + sim.provider_removing_actor(self, actor_id) + # The following is not really necessary due to the above calling teardown(), + # but it doesn't hurt... + if actor_id in self._my_actors: + del self._my_actors[actor_id] + for orig_id, new_id in remap_ids.items(): + self._my_actors[new_id] = self._my_actors[orig_id] + del self._my_actors[orig_id] return self._provider_state diff --git a/smarts/core/sumo_road_network.py b/smarts/core/sumo_road_network.py index 119bb9c829..d74c16a1e0 100644 --- a/smarts/core/sumo_road_network.py +++ b/smarts/core/sumo_road_network.py @@ -17,19 +17,24 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. +from __future__ import annotations + +import itertools import logging +import math import os import random from functools import cached_property, lru_cache from pathlib import Path from subprocess import check_output -from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union +from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union, overload import numpy as np from shapely.geometry import Point as shPoint from shapely.geometry import Polygon from shapely.ops import nearest_points, snap +from smarts.core.utils.core_logging import timeit from smarts.sstudio.sstypes import MapSpec from .coordinates import BoundingBox, Heading, Point, Pose, RefLinePoint @@ -40,8 +45,17 @@ from .utils.geometry import buffered_shape from .utils.glb import make_map_glb, make_road_line_glb -from smarts.core.utils.sumo import sumolib # isort:skip -from sumolib.net.edge import Edge # isort:skip +from smarts.core.utils.sumo_utils import sumolib # isort:skip + + +def pairwise(iterable): + """Generates pairs of neighboring elements. + >>> list(pairwise('ABCDEFG')) + [('A', 'B'), ('B', 'C'), ('C', 'D'), ('D', 'E'), ('E', 'F'), ('F', 'G')] + """ + a, b = itertools.tee(iterable) + next(b, None) + return zip(a, b) class SumoRoadNetwork(RoadMap): @@ -53,19 +67,81 @@ class SumoRoadNetwork(RoadMap): This corresponds on a 1:1 scale to lanes 3.2m wide, which is typical in North America (although US highway lanes are wider at ~3.7m).""" - def __init__(self, graph, net_file: str, map_spec: MapSpec): + def __init__(self, graph: sumolib.net.Net, net_file: str, map_spec: MapSpec): self._log = logging.getLogger(self.__class__.__name__) self._graph = graph self._net_file = net_file self._map_spec = map_spec self._default_lane_width = SumoRoadNetwork._spec_lane_width(map_spec) self._surfaces = dict() - self._lanes = dict() - self._roads = dict() + self._lanes: Dict[str, SumoRoadNetwork.Lane] = dict() + self._roads: Dict[str, SumoRoadNetwork.Road] = dict() self._features = dict() self._waypoints_cache = SumoRoadNetwork._WaypointsCache() + self._rtree_roads = None self._load_traffic_lights() + def _init_rtree( + self, shapeList: List[sumolib.net.edge.Edge], includeJunctions=True + ): + import rtree + + result = rtree.index.Index() + result.interleaved = True + MAX_VAL = 1e100 + for ri, shape in enumerate(shapeList): + sumo_lanes: List[sumolib.net.lane.Lane] = shape.getLanes() + lane_bbs = list( + lane.getBoundingBox(includeJunctions) for lane in sumo_lanes + ) + cxmin, cymin, cxmax, cymax = MAX_VAL, MAX_VAL, -MAX_VAL, -MAX_VAL + for xmin, ymin, xmax, ymax in lane_bbs: + cxmin = min(cxmin, xmin) + cxmax = max(cxmax, xmax) + cymin = min(cymin, ymin) + cymax = max(cymax, ymax) + + bb = (cxmin, cymin, cxmax, cymax) + result.add(ri, bb) + return result + + def _update_rtree( + self, rtree_, shapeList: List[sumolib.net.edge.Edge], includeJunctions=True + ): + import rtree + + rtree_: rtree.index.Index + MAX_VAL = 1e100 + for ri, shape in enumerate(shapeList): + sumo_lanes: List[sumolib.net.lane.Lane] = shape.getLanes() + lane_bbs = list( + lane.getBoundingBox(includeJunctions) for lane in sumo_lanes + ) + cxmin, cymin, cxmax, cymax = MAX_VAL, MAX_VAL, -MAX_VAL, -MAX_VAL + for xmin, ymin, xmax, ymax in lane_bbs: + cxmin = min(cxmin, xmin) + cxmax = max(cxmax, xmax) + cymin = min(cymin, ymin) + cymax = max(cymax, ymax) + + bb = (cxmin, cymin, cxmax, cymax) + rtree_.add(ri, bb) + + def nearest_roads(self, point: Point, radius: float): + """Finds the nearest roads to the given point within the given radius.""" + x = point[0] + y = point[1] + r = radius + edges: List[sumolib.net.edge.Edge] = sorted( + self._graph.getEdges(), key=lambda e: e.getID() + ) + if self._rtree_roads is None: + self._rtree_roads = self._init_rtree(edges) + near_roads: List[RoadMap.Road] = [] + for i in self._rtree_roads.intersection((x - r, y - r, x + r, y + r)): + near_roads.append(self.road_by_id(edges[i].getID())) + return near_roads + @staticmethod def _check_net_origin(bbox): assert len(bbox) == 4 @@ -81,7 +157,7 @@ def shifted_net_file_path(cls, net_file_path): @classmethod @lru_cache(maxsize=1) - def _shift_coordinates(cls, net_file_path, shifted_path): + def _shift_coordinates(cls, net_file_path: str, shifted_path: str): assert shifted_path != net_file_path logger = logging.getLogger(cls.__name__) logger.info(f"normalizing net coordinates into {shifted_path}...") @@ -310,19 +386,146 @@ def surface_by_id(self, surface_id: str) -> Optional[RoadMap.Surface]: class Lane(RoadMap.Lane, Surface): """Describes a Sumo lane surface.""" - def __init__(self, lane_id: str, sumo_lane, road_map): + def __init__( + self, + lane_id: str, + sumo_lane: sumolib.net.lane.Lane, + road_map: SumoRoadNetwork, + ): super().__init__(lane_id, road_map) self._lane_id = lane_id self._sumo_lane = sumo_lane self._road = road_map.road_by_id(sumo_lane.getEdge().getID()) assert self._road + self._rtree_lane_fragments = None + self._lane_shape_for_rtree: Optional[List[Tuple[float, float]]] = None + def __hash__(self) -> int: return hash(self.lane_id) ^ hash(self._map) - @property + def _init_rtree(self, lines): + import rtree + + rtree.index.Property() + result = rtree.index.Index() + result.interleaved = True + for ri, (s, e) in enumerate(lines): + result.add( + ri, + ( + min(e[0], s[0]), + min(e[1], s[1]), + max(e[0], s[0]), + max(e[1], s[1]), + ), + ) + return result + + def _ensure_rtree(self): + if self._rtree_lane_fragments is None: + self._lane_shape_for_rtree = self._sumo_lane.getShape(False) + lane_fragments = list(pairwise(self._sumo_lane.getShape(False))) + self._rtree_lane_fragments = self._init_rtree(lane_fragments) + + @lru_cache(maxsize=128) + def _segment_offset(self, end_index: int, start_index: int = 0) -> float: + dist = 0.0 + for index in range(start_index, end_index): + dist += np.linalg.norm( + np.subtract( + self._lane_shape_for_rtree[index + 1], + self._lane_shape_for_rtree[index], + ) + ) + return dist + + @overload + def get_distance(self, point: Point, radius: float) -> float: + ... + + @overload + def get_distance( + self, point: Point, radius: float, *, get_offset: bool + ) -> Tuple[float, Optional[float]]: + ... + + @overload + def get_distance( + self, point: Point, radius: float, *, perpendicular: bool + ) -> float: + ... + + @overload + def get_distance( + self, point: Point, radius: float, /, get_offset: bool, perpendicular: bool + ) -> Tuple[float, Optional[float]]: + ... + + def get_distance( + self, + point: Point, + radius: float, + get_offset=..., + perpendicular: bool = False, + ) -> Union[float, Tuple[float, Optional[float]]]: + """Get the distance on the lane from the given point within the given radius. + Specifying to get the offset returns the offset value. + """ + x = point[0] + y = point[1] + r = radius + self._ensure_rtree() + + dist = math.inf + INVALID_DISTANCE = -1 + INVALID_INDEX = -1 + found_index = INVALID_INDEX + for i in self._rtree_lane_fragments.intersection( + (x - r, y - r, x + r, y + r) + ): + d = sumolib.geomhelper.distancePointToLine( + point, + self._lane_shape_for_rtree[i], + self._lane_shape_for_rtree[i + 1], + perpendicular=perpendicular, + ) + + if d == INVALID_DISTANCE and i != 0 and dist == math.inf: + # distance to inner corner + dist = min( + sumolib.geomhelper.distance( + point, self._lane_shape_for_rtree[i] + ), + sumolib.geomhelper.distance( + point, self._lane_shape_for_rtree[i + 1] + ), + ) + found_index = i + elif d != INVALID_DISTANCE and (dist is None or d < dist): + dist = d + found_index = i + + if get_offset is not ...: + if get_offset is False: + return dist, None + offset = 0.0 + if found_index != INVALID_INDEX: + offset = self._segment_offset(found_index) + offset += sumolib.geomhelper.lineOffsetWithMinimumDistanceToPoint( + point, + self._lane_shape_for_rtree[found_index], + self._lane_shape_for_rtree[found_index + 1], + False, + ) + assert isinstance(offset, float) + return dist, offset + return dist + + @cached_property def bounding_box(self): - raise NotImplementedError() + xmin, ymin, xmax, ymax = self._sumo_lane.getBoundingBox(False) + return BoundingBox(Point(xmin, ymin), Point(xmax, ymax)) @property def lane_id(self) -> str: @@ -552,9 +755,15 @@ def offset_along_lane(self, world_point: Point) -> float: shape = self._sumo_lane.getShape(False) point = world_point[:2] if point not in shape: - return sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint( - point, shape, perpendicular=False - ) + if self._lane_shape_for_rtree is None and len(shape) < 5: + offset = sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint( + point, shape, perpendicular=False + ) + else: + _, offset = self.get_distance( + world_point, 8, get_offset=True, perpendicular=False + ) + return offset # SUMO geomhelper.polygonOffset asserts when the point is part of the shape. # We get around the assertion with a check if the point is part of the shape. offset = 0 @@ -577,7 +786,7 @@ def project_along( def from_lane_coord(self, lane_point: RefLinePoint) -> Point: shape = self._sumo_lane.getShape(False) x, y = sumolib.geomhelper.positionAtShapeOffset(shape, lane_point.s) - if lane_point.t != 0: + if lane_point.t != 0 and lane_point.t is not None: dv = 1 if lane_point.s < self.length else -1 x2, y2 = sumolib.geomhelper.positionAtShapeOffset( shape, lane_point.s + dv @@ -645,7 +854,12 @@ class Road(RoadMap.Road, Surface): """This is akin to a 'road segment' in real life. Many of these might correspond to a single named road in reality.""" - def __init__(self, road_id: str, sumo_edge: Edge, road_map): + def __init__( + self, + road_id: str, + sumo_edge: sumolib.net.edge.Edge, + road_map: SumoRoadNetwork, + ): super().__init__(road_id, road_map) self._road_id = road_id self._sumo_edge = sumo_edge @@ -771,7 +985,7 @@ def shape( bline = buffered_shape(line, 0.0) return line if bline.is_empty else bline - def road_by_id(self, road_id: str) -> RoadMap.Road: + def road_by_id(self, road_id: str) -> SumoRoadNetwork.Road: road = self._roads.get(road_id) if road: return road @@ -781,6 +995,8 @@ def road_by_id(self, road_id: str) -> RoadMap.Road: ), f"SumoRoadNetwork got request for unknown road_id: '{road_id}'" road = SumoRoadNetwork.Road(road_id, sumo_edge, self) self._roads[road_id] = road + if self._rtree_roads is not None: + self._update_rtree(self._rtree_roads, [road._sumo_edge], False) assert road_id not in self._surfaces self._surfaces[road_id] = road return road @@ -797,22 +1013,30 @@ def nearest_lanes( ) -> List[Tuple[RoadMap.Lane, float]]: if radius is None: radius = self._default_lane_width - # XXX: note that this getNeighboringLanes() call is fairly heavy/expensive (as revealed by profiling) - # The includeJunctions parameter is the opposite of include_junctions because - # what it does in the Sumo query is attach the "node" that is the junction (node) - # shape to the shape of the non-special lanes that connect to it. So if - # includeJunctions is True, we are more likely to hit "normal" lanes - # even when in an intersection where we want to hit "special" - # lanes when we specify include_junctions=True. Note that "special" - # lanes are always candidates to be returned, no matter what. - # See: https://github.com/eclipse/sumo/issues/5854 - candidate_lanes = self._graph.getNeighboringLanes( - point[0], - point[1], - r=radius, - includeJunctions=not include_junctions, - allowFallback=False, # makes this call fail if rtree is not installed - ) + # # XXX: note that this getNeighboringLanes() call is fairly heavy/expensive (as revealed by profiling) + # # The includeJunctions parameter is the opposite of include_junctions because + # # what it does in the Sumo query is attach the "node" that is the junction (node) + # # shape to the shape of the non-special lanes that connect to it. So if + # # includeJunctions is True, we are more likely to hit "normal" lanes + # # even when in an intersection where we want to hit "special" + # # lanes when we specify include_junctions=True. Note that "special" + # # lanes are always candidates to be returned, no matter what. + # # See: https://github.com/eclipse/sumo/issues/5854 + # with timeit("Old sumo lane distance check", print): + # candidate_lanes = self._graph.getNeighboringLanes( + # point[0], + # point[1], + # r=radius, + # includeJunctions=not include_junctions, + # allowFallback=False, # makes this call fail if rtree is not installed + # ) + candidate_lanes = [] + for r in self.nearest_roads(point, radius): + for l in r.lanes: + l: SumoRoadNetwork.Lane + if (distance := l.get_distance(point, radius)) < math.inf: + candidate_lanes.append((l._sumo_lane, distance)) + if not include_junctions: candidate_lanes = [ lane for lane in candidate_lanes if not lane[0].getEdge().isSpecial() @@ -910,8 +1134,8 @@ def _generate_routes( ] def _internal_routes_between( - self, start_edge: Edge, end_edge: Edge - ) -> List[List[Edge]]: + self, start_edge: sumolib.net.edge.Edge, end_edge: sumolib.net.edge.Edge + ) -> List[List[sumolib.net.edge.Edge]]: if start_edge.isSpecial() or end_edge.isSpecial(): return [[start_edge, end_edge]] routes = [] diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index e4d5fe9d49..1cac65c3b4 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -21,10 +21,10 @@ import logging import random -import time import weakref +from functools import partial from pathlib import Path -from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Tuple +from typing import TYPE_CHECKING, Final, Iterable, List, Optional, Sequence, Tuple import numpy as np from shapely.affinity import rotate as shapely_rotate @@ -46,10 +46,20 @@ from smarts.core.signals import SignalLightState, SignalState from smarts.core.sumo_road_network import SumoRoadNetwork from smarts.core.traffic_provider import TrafficProvider +from smarts.core.utils.centralized_traci_server import spawn_if_not from smarts.core.utils.core_logging import suppress_output from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState -from smarts.core.utils.sumo import traci, TraciConn # isort:skip +NO_CHECKS: Final = 0b00000 + +# isort:skip +from smarts.core.utils.sumo_utils import ( + LocalSumoProcess, + RemoteSumoProcess, + TraciConn, + traci, +) + import traci.constants as tc # isort:skip if TYPE_CHECKING: @@ -129,6 +139,23 @@ def __init__( self._sim = None self._handling_error = False self._traci_retries = traci_retries + # XXX: This is used to try to avoid interrupting other instances in race condition (see GH #2139) + self._foreign_traci_servers: List[TraciConn] = [] + + if ( + self._sumo_port is not None + or (traci_serve_mode := config()("sumo", "traci_serve_mode")) == "local" + ): + self._process_factory = partial(LocalSumoProcess, self._sumo_port) + elif traci_serve_mode == "central": + remote_host = config()("sumo", "central_host") + remote_port = config()("sumo", "central_port", cast=int) + spawn_if_not(remote_host, remote_port) + self._process_factory = partial( + RemoteSumoProcess, + remote_host=remote_host, + remote_port=remote_port, + ) # start with the default recovery flags... self._recovery_flags = super().recovery_flags @@ -203,43 +230,46 @@ def _initialize_traci_conn(self, num_retries=5): self._traci_conn.close_traci_and_pipes() self._traci_conn = None - sumo_port = self._sumo_port sumo_binary = "sumo" if self._headless else "sumo-gui" + sumo_process = self._process_factory() + sumo_process.generate( + base_params=self._base_sumo_load_params(), sumo_binary=sumo_binary + ) self._traci_conn = TraciConn( - sumo_port=sumo_port, - base_params=self._base_sumo_load_params(), - sumo_binary=sumo_binary, + sumo_process=sumo_process, ) try: - while self._traci_conn.viable and not self._traci_conn.connected: - try: - self._traci_conn.connect( - timeout=5, - minimum_traci_version=20, - minimum_sumo_version=(1, 10, 0), - debug=self._debug, - ) - except traci.exceptions.FatalTraCIError: - # Could not connect in time just retry connection - pass + self._traci_conn.connect( + timeout=5, + minimum_traci_version=20, + minimum_sumo_version=(1, 10, 0), + debug=self._debug, + ) + + if not self._traci_conn.connected: + # Save the connection to try to avoid closing it for the other client. + self._foreign_traci_servers.append(self._traci_conn) + self._traci_conn = None + raise traci.exceptions.TraCIException( + "TraCI server was likely taken by other client." + ) + except traci.exceptions.FatalTraCIError: + # Could not connect in time just retry connection + current_retries += 1 + continue except traci.exceptions.TraCIException: # SUMO process died... unsure why this is not a fatal traci error current_retries += 1 - - self._traci_conn.close_traci_and_pipes() continue except ConnectionRefusedError: # Some other process somehow owns the port... sumo needs to be restarted. continue - except OSError: - # TraCI or SUMO version are not at the minimum required version. - raise except KeyboardInterrupt: self._log.debug("Keyboard interrupted TraCI connection.") - self._traci_conn.close_traci_and_pipes() + self._traci_conn.close_traci_and_pipes(wait=False) raise break else: @@ -266,7 +296,7 @@ def _base_sumo_load_params(self): "--net-file=%s" % self._scenario.road_map.source, "--quit-on-end", "--log=%s" % self._log_file, - "--error-log=%s" % self._log_file, + "--error-log=%s.err" % self._log_file, "--no-step-log", "--no-warnings=1", "--seed=%s" % random.randint(0, 2147483648), @@ -302,6 +332,13 @@ def _base_sumo_load_params(self): return load_params + def _restart_sumo(self): + engine_config = config() + traci_retries = self._traci_retries or engine_config( + "sumo", "traci_retries", default=5, cast=int + ) + self._initialize_traci_conn(num_retries=traci_retries) + def setup(self, scenario) -> ProviderState: """Initialize the simulation with a new scenario.""" self._log.debug("Setting up SumoTrafficSim %s", self) @@ -325,24 +362,19 @@ def setup(self, scenario) -> ProviderState: ), "SumoTrafficSimulation requires a SumoRoadNetwork" self._log_file = scenario.unique_sumo_log_file() - if restart_sumo: - try: - engine_config = config() - traci_retries = self._traci_retries or engine_config( - "sumo", "traci_retries", default=5, cast=int - ) - self._initialize_traci_conn(num_retries=traci_retries) - except traci.exceptions.FatalTraCIError: - return ProviderState() - elif self._allow_reload: - assert ( - self._traci_conn is not None - ), "TraCI should be connected at this point." - try: - self._traci_conn.load(self._base_sumo_load_params()) - except traci.exceptions.FatalTraCIError as err: - self._handle_traci_exception(err, actors_relinquishable=False) - return ProviderState() + try: + if restart_sumo: + self._restart_sumo() + elif self._allow_reload: + assert ( + self._traci_conn is not None + ), "TraCI should be connected at this point." + try: + self._traci_conn.load(self._base_sumo_load_params()) + except traci.exceptions.FatalTraCIError: + self._restart_sumo() + except traci.exceptions.FatalTraCIError: + return ProviderState() assert self._traci_conn is not None, "No active TraCI connection" @@ -382,7 +414,7 @@ def _handle_traci_exception( self._handling_error = True if isinstance(error, traci.exceptions.TraCIException): # XXX: Needs further investigation whenever this happens. - self._log.warning("TraCI has provided a warning %s", error) + self._log.debug("TraCI has provided a warning %s", error) return if isinstance(error, traci.exceptions.FatalTraCIError): self._log.error( @@ -439,6 +471,18 @@ def teardown(self): self._remove_vehicles() except traci.exceptions.FatalTraCIError: pass + if not self._allow_reload and self._traci_conn is not None: + self._traci_conn.close_traci_and_pipes() + + for i, trc in reversed( + [ + (j, trc) + for j, trc in enumerate(self._foreign_traci_servers) + if not trc.viable + ] + ): + self._foreign_traci_servers.pop(i) + trc.close_traci_and_pipes(wait=False) self._cumulative_sim_seconds = 0 self._non_sumo_vehicle_ids = set() @@ -573,7 +617,7 @@ def _sync(self, provider_state: ProviderState): VEHICLE_CONFIGS[vehicle_state.vehicle_config_type].dimensions, ) self._create_vehicle(vehicle_id, dimensions, vehicle_state.role) - no_checks = 0b00000 + no_checks = NO_CHECKS self._traci_conn.vehicle.setSpeedMode(vehicle_id, no_checks) # update the state of all current managed vehicles @@ -618,7 +662,7 @@ def _sync(self, provider_state: ProviderState): ) for vehicle_id in vehicles_that_have_become_external: - no_checks = 0b00000 + no_checks = NO_CHECKS self._traci_conn.vehicle.setSpeedMode(vehicle_id, no_checks) self._traci_conn.vehicle.setColor( vehicle_id, SumoTrafficSimulation._color_for_role(ActorRole.SocialAgent) diff --git a/smarts/core/tests/test_sumo_version.py b/smarts/core/tests/test_sumo_version.py index 222c779719..4dddc346af 100644 --- a/smarts/core/tests/test_sumo_version.py +++ b/smarts/core/tests/test_sumo_version.py @@ -25,12 +25,12 @@ def test_sumo_lib(): # import does runtime check by necessity - from smarts.core.utils.sumo import sumolib + from smarts.core.utils.sumo_utils import sumolib def test_sumo_version(): from smarts.core.utils import networking - from smarts.core.utils.sumo import SUMO_PATH, traci + from smarts.core.utils.sumo_utils import SUMO_PATH, traci load_params = [ "--start", diff --git a/smarts/core/tests/test_traffic_simulation.py b/smarts/core/tests/test_traffic_simulation.py index 7b17c7566f..f73ababdae 100644 --- a/smarts/core/tests/test_traffic_simulation.py +++ b/smarts/core/tests/test_traffic_simulation.py @@ -33,7 +33,7 @@ from smarts.core.scenario import Scenario from smarts.core.smarts import SMARTS from smarts.core.sumo_traffic_simulation import SumoTrafficSimulation -from smarts.core.utils.sumo import traci +from smarts.core.utils.sumo_utils import traci SUMO_PORT = 8082 diff --git a/smarts/core/utils/centralized_traci_server.py b/smarts/core/utils/centralized_traci_server.py new file mode 100644 index 0000000000..814694b902 --- /dev/null +++ b/smarts/core/utils/centralized_traci_server.py @@ -0,0 +1,224 @@ +# MIT License +# +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from __future__ import annotations + +import argparse +import asyncio +import asyncio.streams +import json +import os +import socket +import subprocess +import time +from typing import Optional, Set + +from smarts.core import config +from smarts.core.utils.networking import find_free_port +from smarts.core.utils.sumo_utils import SUMO_PATH + + +class CentralizedTraCIServer: + """A centralized server for handling SUMO instances to prevent race conditions.""" + + def __init__(self, host, port) -> None: + self._host = host + self._port = port + + self._used_ports: Set[int] = set() + self._last_client: float = time.time() + + async def start(self, timeout: Optional[float] = 60.0 * 60.0): + """Start the server.""" + # Create a socket object + server = await asyncio.start_server(self.handle_client, self._host, self._port) + + address = server.sockets[0].getsockname() + print(f"Server listening on `{address = }`") + async with server: + waitable = server.serve_forever() + if timeout is not None: + _timeout_watcher = asyncio.create_task(self._timeout_watcher(timeout)) + waitable = asyncio.gather(waitable, _timeout_watcher) + await waitable + + async def _timeout_watcher(self, timeout: float): + """Closes the server if it is not in use for `timeout` length of time.""" + + while True: + await asyncio.sleep(60) + if time.time() - self._last_client > timeout and len(self._used_ports) == 0: + print(f"Closing because `{timeout=}` was reached.") + loop = asyncio.get_event_loop() + loop.stop() + + async def _process_manager(self, binary, args, f: asyncio.Future): + """Manages the lifecycle of the TraCI server.""" + _sumo_proc = None + _port = None + try: + # Create a new Future object. + while (_port := find_free_port()) in self._used_ports: + pass + self._used_ports.add(_port) + _sumo_proc = await asyncio.create_subprocess_exec( + *[ + os.path.join(SUMO_PATH, "bin", binary), + f"--remote-port={_port}", + ], + *args, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + loop = asyncio.get_event_loop() + future = loop.create_future() + f.set_result( + { + "port": _port, + "future": future, + } + ) + + result = await asyncio.wait_for(future, None) + finally: + if _port is not None: + self._used_ports.discard(_port) + if _sumo_proc is not None and _sumo_proc.returncode is None: + _sumo_proc.kill() + self._last_client = time.time() + + async def handle_client(self, reader, writer): + """Read data from the client.""" + address = writer.get_extra_info("peername") + print(f"Received connection from {address}") + loop = asyncio.get_event_loop() + try: + port = None + _traci_server_info = None + while True: + data = await reader.readline() + message = data.decode("utf-8") + # print(f"Received {message!r} from {addr}") + if message.startswith("sumo"): + kill_f = loop.create_future() + sumo_binary, _, sumo_cmd = message.partition(":") + response_list = json.loads(sumo_cmd) + command_args = [ + *response_list, + ] + asyncio.create_task( + self._process_manager(sumo_binary, command_args, kill_f) + ) + if _traci_server_info is not None: + print("Duplicate start request received.") + continue + _traci_server_info = await asyncio.wait_for(kill_f, None) + port = _traci_server_info["port"] + + response = f"{self._host}:{port}" + print(f"Send TraCI address: {response!r}") + writer.write(response.encode("utf-8")) + + if message.startswith("e:"): + if _traci_server_info is None: + print("Kill received for uninitialized process.") + else: + _traci_server_info["future"].set_result("kill") + break + if len(message) == 0: + break + + # Close the connection + await writer.drain() + writer.close() + except asyncio.CancelledError: + # Handle client disconnect + pass + + +def spawn_if_not(remote_host: str, remote_port: int): + """Create a new server if it does not already exist. + + Args: + remote_host (str): The host name. + remote_port (int): The host port. + """ + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + client_socket.connect((remote_host, remote_port)) + except (OSError): + if remote_host in ("localhost", "127.0.0.1"): + command = [ + "python", + "-m", + __name__, + "--timeout", + "600", + "--port", + remote_port, + ] + + # Use subprocess.Popen to start the process in the background + _ = subprocess.Popen( + command, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + shell=False, + close_fds=True, + ) + + else: + client_socket.close() + + +def main(*_, _host=None, _port=None, timeout: Optional[float] = 10 * 60): + """The program entrypoint.""" + # Define the host and port on which the server will listen + _host = _host or config()( + "sumo", "central_host" + ) # Use '0.0.0.0' to listen on all available interfaces + _port = _port or config()("sumo", "central_port") + + ss = CentralizedTraCIServer(_host, _port) + asyncio.run(ss.start(timeout=timeout)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(__name__) + parser.add_argument( + "--timeout", + help="Duration of time until server shuts down if not in use.", + type=float, + default=None, + ) + parser.add_argument( + "--port", + help="The port to host on.", + type=int, + default=None, + ) + args = parser.parse_args() + + main(_port=args.port, timeout=args.timeout) diff --git a/smarts/core/utils/string.py b/smarts/core/utils/strings.py similarity index 100% rename from smarts/core/utils/string.py rename to smarts/core/utils/strings.py diff --git a/smarts/core/utils/sumo.py b/smarts/core/utils/sumo.py deleted file mode 100644 index 9ad042af59..0000000000 --- a/smarts/core/utils/sumo.py +++ /dev/null @@ -1,246 +0,0 @@ -# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -"""Importing this module "redirects" the import to the "real" sumolib. This is available -for convenience and to reduce code duplication as sumolib lives under SUMO_HOME. -""" -from __future__ import annotations - -import functools -import inspect -import logging -import multiprocessing -import os -import subprocess -import sys -from typing import Any, List, Optional, Tuple - -from smarts.core.utils import networking -from smarts.core.utils.core_logging import suppress_output - -try: - import sumo - - SUMO_PATH = sumo.SUMO_HOME - os.environ["SUMO_HOME"] = sumo.SUMO_HOME -except ImportError: - if "SUMO_HOME" not in os.environ: - raise ImportError("SUMO_HOME not set, can't import sumolib") - SUMO_PATH = os.environ["SUMO_HOME"] - -tools_path = os.path.join(SUMO_PATH, "tools") -if tools_path not in sys.path: - sys.path.append(tools_path) - -try: - import sumo.tools.sumolib as sumolib - import sumo.tools.traci as traci -except ModuleNotFoundError as e: - raise ImportError( - "Missing dependencies for SUMO. Install them using the command `pip install -e .[sumo]` at the source directory." - ) from e - - -class DomainWrapper: - """Wraps `traci.Domain` type for the `TraciConn` utility""" - - def __init__(self, sumo_proc, domain: traci.domain.Domain) -> None: - self._domain = domain - self._sumo_proc = sumo_proc - - def __getattr__(self, name: str) -> Any: - attribute = getattr(self._domain, name) - - if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): - attribute = functools.partial( - _wrap_traci_method, method=attribute, sumo_process=self._sumo_proc - ) - - return attribute - - -class TraciConn: - """A simplified utility for connecting to a SUMO process.""" - - def __init__( - self, - sumo_port: Optional[int], - base_params: List[str], - sumo_binary: str = "sumo", # Literal["sumo", "sumo-gui"] - ): - self._sumo_proc = None - self._traci_conn = None - self._sumo_port = None - self._sumo_version: Tuple[int, ...] = tuple() - - if sumo_port is None: - sumo_port = networking.find_free_port() - self._sumo_port = sumo_port - sumo_cmd = [ - os.path.join(SUMO_PATH, "bin", sumo_binary), - f"--remote-port={sumo_port}", - *base_params, - ] - - logging.debug("Starting sumo process:\n\t %s", sumo_cmd) - self._sumo_proc = subprocess.Popen( - sumo_cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - ) - - def __del__(self) -> None: - # We should not raise in delete. - try: - self.close_traci_and_pipes() - except Exception: - pass - - def connect( - self, - timeout: float, - minimum_traci_version: int, - minimum_sumo_version: Tuple[int, ...], - debug: bool = False, - ): - """Attempt a connection with the SUMO process.""" - traci_conn = None - try: - with suppress_output(stderr=not debug, stdout=False): - traci_conn = traci.connect( - self._sumo_port, - numRetries=max(0, int(20 * timeout)), - proc=self._sumo_proc, - waitBetweenRetries=0.05, - ) # SUMO must be ready within timeout seconds - # We will retry since this is our first sumo command - except traci.exceptions.FatalTraCIError: - logging.debug("TraCI could not connect in time.") - raise - except traci.exceptions.TraCIException: - logging.error("SUMO process died.") - raise - except ConnectionRefusedError: - logging.error( - "Connection refused. Tried to connect to an unpaired TraCI client." - ) - raise - - try: - vers, vers_str = traci_conn.getVersion() - if vers < minimum_traci_version: - raise OSError( - f"TraCI API version must be >= {minimum_traci_version}. Got version ({vers})" - ) - self._sumo_version = tuple( - int(v) for v in vers_str.partition(" ")[2].split(".") - ) # e.g. "SUMO 1.11.0" -> (1, 11, 0) - if self._sumo_version < minimum_sumo_version: - raise OSError(f"SUMO version must be >= SUMO {minimum_sumo_version}") - except traci.exceptions.FatalTraCIError as err: - logging.debug("TraCI disconnected, process may have died.") - # XXX: the error type is changed to TraCIException to make it consistent with the - # process died case of `traci.connect`. - raise traci.exceptions.TraCIException(err) - except OSError: - self.close_traci_and_pipes() - raise - self._traci_conn = traci_conn - - @property - def connected(self) -> bool: - """Check if the connection is still valid.""" - return self._sumo_proc is not None and self._traci_conn is not None - - @property - def viable(self) -> bool: - """If making a connection to the sumo process is still viable.""" - return self._sumo_proc is not None and self._sumo_proc.poll() is None - - @property - def sumo_version(self) -> Tuple[int, ...]: - """Get the current SUMO version as a tuple.""" - return self._sumo_version - - def __getattr__(self, name: str) -> Any: - if not self.connected: - return None - - attribute = getattr(self._traci_conn, name) - - if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): - attribute = functools.partial( - _wrap_traci_method, method=attribute, sumo_process=self - ) - - if isinstance(attribute, traci.domain.Domain): - attribute = DomainWrapper(sumo_proc=self, domain=attribute) - - return attribute - - def must_reset(self): - """If the version of sumo will have errors if just reloading such that it must be reset.""" - return self._sumo_version > (1, 12, 0) - - def close_traci_and_pipes(self): - """Safely closes all connections. We should expect this method to always work without throwing""" - - def __safe_close(conn): - try: - conn.close() - except (subprocess.SubprocessError, multiprocessing.ProcessError): - # Subprocess or process failed - pass - except traci.exceptions.FatalTraCIError: - # TraCI connection is already dead. - pass - except AttributeError: - # Socket was destroyed internally, likely due to an error. - pass - - if self._traci_conn: - __safe_close(self._traci_conn) - - if self._sumo_proc: - __safe_close(self._sumo_proc.stdin) - __safe_close(self._sumo_proc.stdout) - __safe_close(self._sumo_proc.stderr) - self._sumo_proc.kill() - - self._sumo_proc = None - self._traci_conn = None - - def teardown(self): - """Clean up all resources.""" - self.close_traci_and_pipes() - - -def _wrap_traci_method(*args, method, sumo_process: TraciConn, **kwargs): - # Argument order must be `*args` first so `method` and `sumo_process` are keyword only arguments. - try: - return method(*args, **kwargs) - except traci.exceptions.FatalTraCIError: - # TraCI cannot continue - sumo_process.close_traci_and_pipes() - raise - except traci.exceptions.TraCIException: - # Case where TraCI/SUMO can theoretically continue - raise diff --git a/smarts/core/utils/sumo_utils.py b/smarts/core/utils/sumo_utils.py new file mode 100644 index 0000000000..1f806193ea --- /dev/null +++ b/smarts/core/utils/sumo_utils.py @@ -0,0 +1,474 @@ +# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Importing this module "redirects" the import to the "real" sumolib. This is available +for convenience and to reduce code duplication as sumolib lives under SUMO_HOME. +""" +from __future__ import annotations + +import abc +import functools +import inspect +import json +import logging +import multiprocessing +import os +import socket +import subprocess +import sys +import time +from typing import Any, List, Literal, Optional, Tuple + +from smarts.core.utils import networking +from smarts.core.utils.core_logging import suppress_output + +try: + import sumo + + SUMO_PATH = sumo.SUMO_HOME + os.environ["SUMO_HOME"] = sumo.SUMO_HOME +except ImportError: + if "SUMO_HOME" not in os.environ: + raise ImportError("SUMO_HOME not set, can't import sumolib") + SUMO_PATH = os.environ["SUMO_HOME"] + +tools_path = os.path.join(SUMO_PATH, "tools") +if tools_path not in sys.path: + sys.path.append(tools_path) + +try: + import sumo.tools.sumolib as sumolib + import sumo.tools.traci as traci +except ModuleNotFoundError as e: + raise ImportError( + "Missing dependencies for SUMO. Install them using the command `pip install -e .[sumo]` at the source directory." + ) from e + + +def _safe_close(conn, **kwargs): + try: + conn.close(**kwargs) + except (subprocess.SubprocessError, multiprocessing.ProcessError): + # Subprocess or process failed + pass + except traci.exceptions.FatalTraCIError: + # TraCI connection is already dead. + pass + except AttributeError: + # Socket was destroyed internally, likely due to an error. + pass + except Exception as err: + pass + + +class DomainWrapper: + """Wraps `traci.Domain` type for the `TraciConn` utility""" + + def __init__(self, traci_conn, domain: traci.domain.Domain, attribute_name) -> None: + self._domain = domain + self._traci_conn = traci_conn + self._attribute_name = attribute_name + + def __getattr__(self, name: str) -> Any: + attribute = getattr(self._domain, name) + + if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): + attribute = functools.partial( + _wrap_traci_method, + method=attribute, + traci_conn=self._traci_conn, + attribute_name=self._attribute_name, + ) + + return attribute + + +class SumoProcess(metaclass=abc.ABCMeta): + """A simplified utility representing a SUMO process.""" + + @abc.abstractmethod + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + """Generate the process.""" + raise NotImplementedError + + @abc.abstractmethod + def terminate(self, kill: bool): + """Terminate this process.""" + raise NotImplementedError + + @abc.abstractmethod + def poll(self) -> Optional[int]: + """Poll the underlying process.""" + raise NotImplementedError + + @abc.abstractmethod + def wait(self, timeout: Optional[float] = None) -> int: + """Wait on the underlying process.""" + raise NotImplementedError + + @property + @abc.abstractmethod + def port(self) -> int: + """The port this process is associated with.""" + raise NotImplementedError + + @property + @abc.abstractmethod + def host(self) -> str: + """The port this process is associated with.""" + raise NotImplementedError + + +class RemoteSumoProcess(SumoProcess): + """Connects to a sumo server.""" + + def __init__(self, remote_host, remote_port) -> None: + self._remote_host = remote_host + self._remote_port = remote_port + self._port = None + self._host = None + self._client_socket = None + + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Wait on server to start if it needs to. + error = None + for _ in range(5): + try: + client_socket.connect((self._remote_host, self._remote_port)) + except OSError as err: + time.sleep(1) + error = err + continue + break + else: + raise OSError( + f"Unable to connect to server {self._remote_host}:{self._remote_port}. Try running again or running the server using `python -m smarts.core.utils.sumo_server`." + ) from error + + client_socket.send(f"{sumo_binary}:{json.dumps(base_params)}\n".encode("utf-8")) + + self._client_socket = client_socket + + response = client_socket.recv(1024) + self._host, _, port = response.decode("utf-8").partition(":") + self._port = int(port) + + def terminate(self, kill: bool): + self._client_socket.send("e:".encode("utf-8")) + self._client_socket.close() + + @property + def port(self) -> int: + return self._port or 0 + + @property + def host(self) -> str: + return self._host or "-1" + + def poll(self) -> Optional[int]: + return None + + def wait(self, timeout: Optional[float] = None) -> int: + return 0 + + +class LocalSumoProcess(SumoProcess): + """Connects to a local sumo process.""" + + def __init__(self, sumo_port) -> None: + self._sumo_proc = None + self._sumo_port = sumo_port + + def generate( + self, base_params: List[str], sumo_binary: Literal["sumo", "sumo-gui"] = "sumo" + ): + if self._sumo_port is None: + self._sumo_port = networking.find_free_port() + sumo_cmd = [ + os.path.join(SUMO_PATH, "bin", sumo_binary), + f"--remote-port={self._sumo_port}", + *base_params, + ] + self._sumo_proc = subprocess.Popen( + sumo_cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + + @property + def port(self) -> int: + assert self._sumo_port is not None + return self._sumo_port + + @property + def host(self) -> str: + return "localhost" + + def terminate(self, kill): + if self._sumo_proc: + _safe_close(self._sumo_proc.stdin) + _safe_close(self._sumo_proc.stdout) + _safe_close(self._sumo_proc.stderr) + if kill: + self._sumo_proc.kill() + self._sumo_proc = None + + def poll(self) -> Optional[int]: + return self._sumo_proc.poll() + + def wait(self, timeout=None): + return self._sumo_proc.wait(timeout=timeout) + + +class TraciConn: + """A simplified utility for connecting to a SUMO process.""" + + def __init__( + self, + sumo_process: SumoProcess, + host: str = "localhost", + name: str = "", + ): + self._traci_conn = None + self._sumo_port = None + self._sumo_version: Tuple[int, ...] = tuple() + self._host = host + self._name = name + self._log = logging.Logger(self.__class__.__name__) + self._log = logging + self._connected = False + + self._sumo_process = sumo_process + + def __del__(self) -> None: + # We should not raise in delete. + try: + self.close_traci_and_pipes() + except Exception: + pass + + def connect( + self, + timeout: float, + minimum_traci_version: int, + minimum_sumo_version: Tuple[int, ...], + debug: bool = False, + ): + """Attempt a connection with the SUMO process.""" + traci_conn = None + self._host = self._sumo_process.host + self._sumo_port = self._sumo_process.port + try: + # See if the process is still alive before attempting a connection. + with suppress_output(stderr=not debug, stdout=True): + traci_conn = traci.connect( + self._sumo_process.port, + host=self._sumo_process.host, + numRetries=max(0, int(20 * timeout)), + proc=self._sumo_process, + waitBetweenRetries=0.05, + ) # SUMO must be ready within timeout seconds + # We will retry since this is our first sumo command + except traci.exceptions.FatalTraCIError as err: + self._log.error( + "[%s] TraCI could not connect in time to '%s:%s' [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + # XXX: Actually not fatal... + raise + except traci.exceptions.TraCIException as err: + self._log.error( + "[%s] SUMO process died while trying to connect to '%s:%s' [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + self.close_traci_and_pipes() + raise + except ConnectionRefusedError: + self._log.error( + "[%s] Intended TraCI server '%s:%s' refused connection.", + self._name, + self._host, + self._sumo_port, + ) + self.close_traci_and_pipes() + raise + + self._connected = True + self._traci_conn = traci_conn + try: + if not self.viable: + raise traci.exceptions.TraCIException("TraCI server already finished!?") + vers, vers_str = traci_conn.getVersion() + if vers < minimum_traci_version: + raise ValueError( + f"TraCI API version must be >= {minimum_traci_version}. Got version ({vers})" + ) + self._sumo_version = tuple( + int(v) for v in vers_str.partition(" ")[2].split(".") + ) # e.g. "SUMO 1.11.0" -> (1, 11, 0) + if self._sumo_version < minimum_sumo_version: + raise ValueError(f"SUMO version must be >= SUMO {minimum_sumo_version}") + except traci.exceptions.FatalTraCIError as err: + self._log.error( + "[%s] TraCI disconnected for connection attempt '%s:%s': [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + # XXX: the error type is changed to TraCIException to make it consistent with the + # process died case of `traci.connect`. Since TraCIException is fatal just in this case... + self.close_traci_and_pipes() + raise traci.exceptions.TraCIException(err) + except OSError as err: + self._log.error( + "[%s] OS error occurred for TraCI connection attempt '%s:%s': [%s]", + self._name, + self._host, + self._sumo_port, + err, + ) + self.close_traci_and_pipes() + raise traci.exceptions.TraCIException(err) + except ValueError: + self.close_traci_and_pipes() + raise + + @property + def connected(self) -> bool: + """Check if the connection is still valid.""" + return self._sumo_process is not None and self._connected + + @property + def viable(self) -> bool: + """If making a connection to the sumo process is still viable.""" + return self._sumo_process is not None and self._sumo_process.poll() is None + + @property + def sumo_version(self) -> Tuple[int, ...]: + """Get the current SUMO version as a tuple.""" + return self._sumo_version + + @property + def port(self) -> Optional[int]: + """Get the used TraCI port.""" + return self._sumo_port + + @property + def hostname(self) -> str: + """Get the used TraCI port.""" + return self._host + + def __getattr__(self, name: str) -> Any: + if not self.connected: + raise traci.exceptions.FatalTraCIError("TraCI died.") + + attribute = getattr(self._traci_conn, name) + + if inspect.isbuiltin(attribute) or inspect.ismethod(attribute): + attribute = functools.partial( + _wrap_traci_method, + method=attribute, + attribute_name=name, + traci_conn=self, + ) + elif isinstance(attribute, traci.domain.Domain): + attribute = DomainWrapper( + traci_conn=self, domain=attribute, attribute_name=name + ) + else: + raise NotImplementedError() + + return attribute + + def must_reset(self): + """If the version of sumo will have errors if just reloading such that it must be reset.""" + return self._sumo_version > (1, 12, 0) + + def close_traci_and_pipes(self, wait: bool = True, kill: bool = True): + """Safely closes all connections. We should expect this method to always work without throwing""" + + if self._connected: + self._log.debug("Closing TraCI connection to %s", self._sumo_port) + _safe_close(self._traci_conn, wait=wait) + + if self._sumo_process: + self._sumo_process.terminate(kill=kill) + self._log.info( + "Killed TraCI server process '%s:%s'", self._host, self._sumo_port + ) + self._sumo_process = None + + self._connected = False + + def teardown(self): + """Clean up all resources.""" + self.close_traci_and_pipes(True) + self._traci_conn = None + + +def _wrap_traci_method( + *args, method, traci_conn: TraciConn, attribute_name: str, **kwargs +): + # Argument order must be `*args` first so `method` and `sumo_process` are keyword only arguments. + try: + return method(*args, **kwargs) + except traci.exceptions.FatalTraCIError as err: + logging.error( + "[%s] TraCI '%s:%s' disconnected for call '%s', process may have died: [%s]", + traci_conn._name, + traci_conn.hostname, + traci_conn.port, + attribute_name, + err, + ) + # TraCI cannot continue + traci_conn.close_traci_and_pipes() + raise traci.exceptions.FatalTraCIError("TraCI died.") from err + except OSError as err: + logging.error( + "[%s] OS error occurred for TraCI '%s:%s' call '%s': [%s]", + traci_conn._name, + traci_conn.hostname, + traci_conn.port, + attribute_name, + err, + ) + traci_conn.close_traci_and_pipes() + raise OSError("Connection dropped.") from err + except traci.exceptions.TraCIException as err: + # Case where TraCI/SUMO can theoretically continue + raise traci.exceptions.TraCIException("TraCI can continue.") from err + except KeyboardInterrupt: + traci_conn.close_traci_and_pipes(wait=False) + raise diff --git a/smarts/core/utils/tests/test_traci_port_acquisition.py b/smarts/core/utils/tests/test_traci_port_acquisition.py new file mode 100644 index 0000000000..a541a9c5c3 --- /dev/null +++ b/smarts/core/utils/tests/test_traci_port_acquisition.py @@ -0,0 +1,141 @@ +# MIT License +# +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +from __future__ import annotations + +import logging +import multiprocessing +import os +import random +import time +from multiprocessing.pool import AsyncResult +from typing import List, Tuple + +from smarts.core import config +from smarts.core.utils.file import make_dir_in_smarts_log_dir +from smarts.core.utils.sumo_utils import RemoteSumoProcess, TraciConn + +load_params = [ + "--net-file=%s" % "./scenarios/sumo/loop/map.net.xml", + "--quit-on-end", + "--no-step-log", + "--no-warnings=1", + "--seed=%s" % random.randint(0, 2147483648), + "--time-to-teleport=%s" % -1, + "--collision.check-junctions=true", + "--collision.action=none", + "--lanechange.duration=3.0", + # TODO: 1--lanechange.duration1 or 1--lateral-resolution`, in combination with `route_id`, + # causes lane change crashes as of SUMO 1.6.0. + # Controlling vehicles that have been added to the simulation with a route causes + # lane change related crashes. + # "--lateral-resolution=100", # smooth lane changes + "--step-length=%f" % 0.1, + "--default.action-step-length=%f" % 0.1, + "--begin=0", # start simulation at time=0 + "--end=31536000", # keep the simulation running for a year + "--start", +] + +MAX_PARALLEL = 32 +ITERATIONS = 60000 # 64512 ports available by Ubuntu standard +LOGGING_STEP = 1000 + + +def run_client(t): + conn = None + try: + f = os.path.abspath(make_dir_in_smarts_log_dir("_sumo_run_logs")) + f"/{t}" + lsp = RemoteSumoProcess( + remote_host=config()("sumo", "central_host"), + remote_port=config()("sumo", "central_port", cast=int), + ) + lsp.generate( + base_params=load_params + + [ + "--log=%s.log" % f, + "--message-log=%s" % f, + "--error-log=%s.err" % f, + ], + sumo_binary="sumo", + ) + conn = TraciConn( + sumo_process=lsp, + name=f"Client@{t}", + ) + conn.connect( + timeout=5, + minimum_traci_version=20, + minimum_sumo_version=(1, 10, 0), + ) + time.sleep(0.1) + conn.getVersion() + except KeyboardInterrupt: + if conn is not None: + conn.close_traci_and_pipes(False) + raise + except Exception as err: + logging.error("Primary occurred. [%s]", err) + logging.exception(err) + raise + finally: + # try: + # conn.close_traci_and_pipes() + # except Exception as err: + # logging.error("Secondary occurred. [%s]", err) + diff = time.time() - t + if diff > 9: + logging.error("Client took %ss to close", diff) + if conn is not None: + conn.teardown() + + +def test_traffic_sim_with_multi_client(): + with multiprocessing.Pool(processes=MAX_PARALLEL) as pool: + clients: List[Tuple[AsyncResult, float]] = [] + start = time.time() + # Attempt to run out of ports. + for i in range(ITERATIONS): + while len(clients) > MAX_PARALLEL: + for j, (c, t) in reversed( + [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] + ): + clients.pop(j) + current = time.time() + if i % LOGGING_STEP == 0: + logging.error("Working on %s at %ss", i, current - start) + clients.append((pool.apply_async(run_client, args=(current,)), current)) + + for j, (c, t) in reversed( + [(j, (c, t)) for j, (c, t) in enumerate(clients) if c.ready()] + ): + clients.pop(j) + logging.error("Popping remaining ready clients %s", t) + + for (c, t) in clients: + if time.time() - t > 0.2: + logging.error("Stuck clients %s", t) + + logging.error("Finished") + pool.close() + logging.error("Closed") + pool.join() + logging.error("Joined") diff --git a/smarts/core/vehicle_index.py b/smarts/core/vehicle_index.py index 6181f1f853..05d3a01020 100644 --- a/smarts/core/vehicle_index.py +++ b/smarts/core/vehicle_index.py @@ -54,7 +54,7 @@ from smarts.core.coordinates import Dimensions, Pose from smarts.core.utils import resources from smarts.core.utils.cache import cache, clear_cache -from smarts.core.utils.string import truncate +from smarts.core.utils.strings import truncate from smarts.core.vehicle_state import VEHICLE_CONFIGS from .actor import ActorRole diff --git a/smarts/env/tests/test_determinism.py b/smarts/env/tests/test_determinism.py index 13a85e1e9b..323aeef986 100644 --- a/smarts/env/tests/test_determinism.py +++ b/smarts/env/tests/test_determinism.py @@ -139,8 +139,8 @@ def test_short_determinism(): def test_long_determinism(): max_steps_per_episode = 55000 - episode_count = 1 - capture_step = 13750 + episode_count = 10 + capture_step = 100 scenarios = "scenarios/sumo/intersections/2lane" determinism( agent_spec(max_steps_per_episode), scenarios, episode_count, capture_step diff --git a/smarts/sstudio/generators.py b/smarts/sstudio/generators.py index 8adbe21269..f022ed4008 100644 --- a/smarts/sstudio/generators.py +++ b/smarts/sstudio/generators.py @@ -184,7 +184,7 @@ def plan_and_save( log_path = f"{self._log_dir}/{scenario_name}" os.makedirs(log_path, exist_ok=True) - from smarts.core.utils.sumo import sumolib + from smarts.core.utils.sumo_utils import sumolib int32_limits = np.iinfo(np.int32) duarouter_path = sumolib.checkBinary("duarouter") diff --git a/utils/setup/mac_requirements.txt b/utils/setup/mac_requirements.txt index 7ea06ab0f6..244c0c965d 100644 --- a/utils/setup/mac_requirements.txt +++ b/utils/setup/mac_requirements.txt @@ -1,4 +1,4 @@ -absl-py==2.0.0 +absl-py==2.1.0 aiosignal==1.3.1 astunparse==1.6.3 attrs==23.2.0 @@ -9,7 +9,7 @@ charset-normalizer==3.3.2 click==8.1.7 cloudpickle==3.0.0 constantly==23.10.4 -coverage==7.4.0 +coverage==7.4.1 decorator==5.1.1 dm-tree==0.1.8 exceptiongroup==1.2.0 @@ -19,29 +19,29 @@ flatbuffers==23.5.26 frozenlist==1.4.1 fsspec==2023.12.2 future==0.18.3 -gast==0.4.0 -google-auth==2.26.1 -google-auth-oauthlib==1.0.0 +gast==0.5.4 +google-auth==2.27.0 +google-auth-oauthlib==1.2.0 google-pasta==0.2.0 grpcio==1.60.0 h5py==3.10.0 hyperlink==21.0.0 idna==3.6 importlib-metadata==7.0.1 -importlib-resources==6.1.1 incremental==22.10.0 iniconfig==2.0.0 -Jinja2==3.1.2 -jsonschema==4.20.0 +Jinja2==3.1.3 +jsonschema==4.21.1 jsonschema-specifications==2023.12.1 -keras==2.13.1 +keras==2.15.0 libclang==16.0.6 -Markdown==3.5.1 -MarkupSafe==2.1.3 +Markdown==3.5.2 +MarkupSafe==2.1.4 +ml-dtypes==0.2.0 mpmath==1.3.0 msgpack==1.0.7 -networkx==3.1 -numpy==1.24.3 +networkx==3.2.1 +numpy==1.26.3 oauthlib==3.2.2 opencv-python==4.9.0.80 opencv-python-headless==4.9.0.80 @@ -49,51 +49,50 @@ opt-einsum==3.3.0 packaging==23.2 Panda3D==1.10.14 panda3d-gltf==0.13 -panda3d-simplepbr==0.10 +panda3d-simplepbr==0.11.2 pillow==10.2.0 -pkgutil_resolve_name==1.3.10 -pluggy==1.3.0 -protobuf==4.25.1 -psutil==5.9.7 +pluggy==1.4.0 +protobuf==4.23.4 +psutil==5.9.8 py==1.11.0 py-cpuinfo==9.0.0 pyasn1==0.5.1 pyasn1-modules==0.3.0 pybullet==3.2.6 -pytest==7.4.4 +pytest==8.0.0 pytest-benchmark==4.0.0 pytest-cov==4.1.0 pytest-forked==1.6.0 pytest-xdist==3.5.0 PyYAML==6.0.1 ray==2.9.0 -referencing==0.32.1 +referencing==0.33.0 requests==2.31.0 requests-oauthlib==1.3.1 -rpds-py==0.16.2 +rpds-py==0.17.1 rsa==4.9 -scipy==1.10.1 +scipy==1.12.0 shapely==2.0.2 six==1.16.0 sympy==1.12 tableprint==0.9.1 -tensorboard==2.13.0 +tensorboard==2.15.1 tensorboard-data-server==0.7.2 -tensorflow==2.13.1 -tensorflow-estimator==2.13.0 -tensorflow-io-gcs-filesystem==0.34.0 -tensorflow-probability==0.21.0 +tensorflow==2.15.0 +tensorflow-estimator==2.15.0 +tensorflow-io-gcs-filesystem==0.35.0 +tensorflow-probability==0.23.0 termcolor==2.4.0 tomli==2.0.1 torch==2.1.2 torchvision==0.16.2 -trimesh==4.0.8 +trimesh==4.1.0 Twisted==23.10.0 -typing_extensions==4.5.0 +typing_extensions==4.9.0 urllib3==2.1.0 wcwidth==0.2.13 Werkzeug==3.0.1 -wrapt==1.16.0 +wrapt==1.14.1 yattag==1.15.2 zipp==3.17.0 zope.interface==6.1