diff --git a/interlocking/interlockingcontroller/pointcontroller.py b/interlocking/interlockingcontroller/pointcontroller.py index 73c3434..03018d0 100644 --- a/interlocking/interlockingcontroller/pointcontroller.py +++ b/interlocking/interlockingcontroller/pointcontroller.py @@ -49,6 +49,27 @@ def can_route_be_set(self, route, train_id: str): return False return True + def is_route_set(self, route, train_id: str): + for point in route.get_points_of_route(): + if point.state == OccupancyState.FREE: + return False + if train_id not in point.used_by: + return False + + # This is the redundancy, iff the method get_points_of_route fails + # (and return nothing but covers more than one track) + if len(route.tracks) > 1: + any_point_reserved = False + any_point_has_train_id = False + for point in self.points.values(): + if point.state != OccupancyState.FREE: + any_point_reserved = True + if train_id in point.used_by: + any_point_has_train_id = True + + return any_point_reserved and any_point_has_train_id + return True + def do_two_routes_collide(self, route_1, route_2): points_of_route_1 = route_1.get_points_of_route() points_of_route_2 = route_2.get_points_of_route() diff --git a/interlocking/interlockingcontroller/signalcontroller.py b/interlocking/interlockingcontroller/signalcontroller.py index 7eb5def..756e91a 100644 --- a/interlocking/interlockingcontroller/signalcontroller.py +++ b/interlocking/interlockingcontroller/signalcontroller.py @@ -24,6 +24,9 @@ async def set_route(self, route, train_id: str): route.start_signal.used_by.add(train_id) return result + def is_route_set(self, route, train_id: str): + return route.start_signal.state == OccupancyState.RESERVED and train_id in route.start_signal.used_by + async def set_signal_halt(self, signal): return await self.set_signal_aspect(signal, "halt") diff --git a/interlocking/interlockingcontroller/trackcontroller.py b/interlocking/interlockingcontroller/trackcontroller.py index 6dae879..03750b2 100644 --- a/interlocking/interlockingcontroller/trackcontroller.py +++ b/interlocking/interlockingcontroller/trackcontroller.py @@ -33,6 +33,30 @@ def can_route_be_set(self, route, train_id: str): return False return self.overlap_controller.can_any_overlap_be_reserved(route, train_id) + def is_route_set(self, route, train_id: str): + for segment in route.get_segments_of_route(): + if segment.state == OccupancyState.FREE: + return False + if train_id not in segment.used_by: + return False + + # This is the redundancy, iff the method get_segments_of_route fails + # (e.g. returns nothing) + any_segment_reserved = False + any_segment_has_train_id = False + for base_track_id in self.tracks: + track = self.tracks[base_track_id] + for segment in track.segments: + if segment.state != OccupancyState.FREE: + any_segment_reserved = True + if train_id in segment.used_by: + any_segment_has_train_id = True + if not any_segment_reserved or not any_segment_has_train_id: + return False + + # TODO: Overlap + return True + def do_two_routes_collide(self, route_1: Route, route_2: Route): segments_of_route_1 = set(map(lambda seg: seg.segment_id, route_1.get_segments_of_route())) segments_of_route_2 = set(map(lambda seg: seg.segment_id, route_2.get_segments_of_route())) diff --git a/interlocking/interlockinginterface.py b/interlocking/interlockinginterface.py index 0d6925e..b818309 100644 --- a/interlocking/interlockinginterface.py +++ b/interlocking/interlockinginterface.py @@ -173,6 +173,19 @@ def can_route_be_set(self, yaramo_route, train_id: str): return can_be_set def is_route_set(self, yaramo_route, train_id: str) -> IsRouteSetResult: + route: Route = self.get_route_from_yaramo_route(yaramo_route) + if route not in self.active_routes: + return IsRouteSetResult.ROUTE_NOT_SET + if route.used_by != train_id: + return IsRouteSetResult.ROUTE_SET_FOR_WRONG_TRAIN + + if not self.point_controller.is_route_set(route, train_id): + return IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + if not self.signal_controller.is_route_set(route, train_id): + return IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + if not self.track_controller.is_route_set(route, train_id): + return IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + return IsRouteSetResult.ROUTE_SET_CORRECTLY def do_two_routes_collide(self, yaramo_route_1, yaramo_route_2): diff --git a/interlocking/model/helper/isroutesetresult.py b/interlocking/model/helper/isroutesetresult.py index cdeecdb..ae73878 100644 --- a/interlocking/model/helper/isroutesetresult.py +++ b/interlocking/model/helper/isroutesetresult.py @@ -3,5 +3,6 @@ class IsRouteSetResult(Enum): ROUTE_SET_CORRECTLY = 0 - ROUTE_NOT_SET = 1 - ROUTE_SET_FOR_WRONG_TRAIN = 2 \ No newline at end of file + ROUTE_NOT_SET = 1 # The route wasn't set at all + ROUTE_NOT_SET_CORRECTLY = 2 # Route should be set but state is corrupted + ROUTE_SET_FOR_WRONG_TRAIN = 3 diff --git a/test/interlocking_test.py b/test/interlocking_test.py index a0fb1c7..a6d6318 100644 --- a/test/interlocking_test.py +++ b/test/interlocking_test.py @@ -131,6 +131,45 @@ def test_is_route_set(): # Route set with wrong train assert interlocking.is_route_set(route_1, "IC1234") == IsRouteSetResult.ROUTE_SET_FOR_WRONG_TRAIN + # Manipulate state of a point on the route + asyncio.run(interlocking.reset()) + route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2") + asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101")) + interlocking.point_controller.points["d43f9"].state = OccupancyState.FREE + assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + + asyncio.run(interlocking.reset()) + route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2") + asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101")) + interlocking.point_controller.points["d43f9"].used_by = set() + assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + + # Manipulate state of the start signal + asyncio.run(interlocking.reset()) + route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2") + asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101")) + interlocking.signal_controller.signals["0ab8c048-f4ea-4d4d-97cd-510d0b05651f"].state = OccupancyState.FREE + assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + + asyncio.run(interlocking.reset()) + route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2") + asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101")) + interlocking.signal_controller.signals["0ab8c048-f4ea-4d4d-97cd-510d0b05651f"].used_by = set() + assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + + # Manipulate state of a segment + asyncio.run(interlocking.reset()) + route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2") + asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101")) + interlocking.track_controller.tracks["94742"].segments[0].state = OccupancyState.FREE + assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + + asyncio.run(interlocking.reset()) + route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2") + asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101")) + interlocking.track_controller.tracks["94742"].segments[0].used_by = set() + assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY + def test_reset_route(): topology = topologyhelper.get_topology_from_planpro_file("./complex-example.ppxml") interlocking = interlockinghelper.get_interlocking(topology)