Skip to content

Commit

Permalink
Check if route is set for the signal, point and track controller
Browse files Browse the repository at this point in the history
  • Loading branch information
arneboockmeyer committed Sep 30, 2024
1 parent be99638 commit 59de8b6
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 2 deletions.
21 changes: 21 additions & 0 deletions interlocking/interlockingcontroller/pointcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ def can_route_be_set(self, route, train_id: str):
return False
return True

def is_route_set(self, route, train_id: str):
for point in route.get_points_of_route():
if point.state == OccupancyState.FREE:
return False
if train_id not in point.used_by:
return False

# This is the redundancy, iff the method get_points_of_route fails
# (and return nothing but covers more than one track)
if len(route.tracks) > 1:
any_point_reserved = False
any_point_has_train_id = False
for point in self.points.values():
if point.state != OccupancyState.FREE:
any_point_reserved = True
if train_id in point.used_by:
any_point_has_train_id = True

return any_point_reserved and any_point_has_train_id
return True

def do_two_routes_collide(self, route_1, route_2):
points_of_route_1 = route_1.get_points_of_route()
points_of_route_2 = route_2.get_points_of_route()
Expand Down
3 changes: 3 additions & 0 deletions interlocking/interlockingcontroller/signalcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ async def set_route(self, route, train_id: str):
route.start_signal.used_by.add(train_id)
return result

def is_route_set(self, route, train_id: str):
return route.start_signal.state == OccupancyState.RESERVED and train_id in route.start_signal.used_by

async def set_signal_halt(self, signal):
return await self.set_signal_aspect(signal, "halt")

Expand Down
24 changes: 24 additions & 0 deletions interlocking/interlockingcontroller/trackcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,30 @@ def can_route_be_set(self, route, train_id: str):
return False
return self.overlap_controller.can_any_overlap_be_reserved(route, train_id)

def is_route_set(self, route, train_id: str):
for segment in route.get_segments_of_route():
if segment.state == OccupancyState.FREE:
return False
if train_id not in segment.used_by:
return False

# This is the redundancy, iff the method get_segments_of_route fails
# (e.g. returns nothing)
any_segment_reserved = False
any_segment_has_train_id = False
for base_track_id in self.tracks:
track = self.tracks[base_track_id]
for segment in track.segments:
if segment.state != OccupancyState.FREE:
any_segment_reserved = True
if train_id in segment.used_by:
any_segment_has_train_id = True
if not any_segment_reserved or not any_segment_has_train_id:
return False

# TODO: Overlap
return True

def do_two_routes_collide(self, route_1: Route, route_2: Route):
segments_of_route_1 = set(map(lambda seg: seg.segment_id, route_1.get_segments_of_route()))
segments_of_route_2 = set(map(lambda seg: seg.segment_id, route_2.get_segments_of_route()))
Expand Down
13 changes: 13 additions & 0 deletions interlocking/interlockinginterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,19 @@ def can_route_be_set(self, yaramo_route, train_id: str):
return can_be_set

def is_route_set(self, yaramo_route, train_id: str) -> IsRouteSetResult:
route: Route = self.get_route_from_yaramo_route(yaramo_route)
if route not in self.active_routes:
return IsRouteSetResult.ROUTE_NOT_SET
if route.used_by != train_id:
return IsRouteSetResult.ROUTE_SET_FOR_WRONG_TRAIN

if not self.point_controller.is_route_set(route, train_id):
return IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY
if not self.signal_controller.is_route_set(route, train_id):
return IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY
if not self.track_controller.is_route_set(route, train_id):
return IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY

return IsRouteSetResult.ROUTE_SET_CORRECTLY

def do_two_routes_collide(self, yaramo_route_1, yaramo_route_2):
Expand Down
5 changes: 3 additions & 2 deletions interlocking/model/helper/isroutesetresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

class IsRouteSetResult(Enum):
ROUTE_SET_CORRECTLY = 0
ROUTE_NOT_SET = 1
ROUTE_SET_FOR_WRONG_TRAIN = 2
ROUTE_NOT_SET = 1 # The route wasn't set at all
ROUTE_NOT_SET_CORRECTLY = 2 # Route should be set but state is corrupted
ROUTE_SET_FOR_WRONG_TRAIN = 3
39 changes: 39 additions & 0 deletions test/interlocking_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,45 @@ def test_is_route_set():
# Route set with wrong train
assert interlocking.is_route_set(route_1, "IC1234") == IsRouteSetResult.ROUTE_SET_FOR_WRONG_TRAIN

# Manipulate state of a point on the route
asyncio.run(interlocking.reset())
route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2")
asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101"))
interlocking.point_controller.points["d43f9"].state = OccupancyState.FREE
assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY

asyncio.run(interlocking.reset())
route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2")
asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101"))
interlocking.point_controller.points["d43f9"].used_by = set()
assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY

# Manipulate state of the start signal
asyncio.run(interlocking.reset())
route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2")
asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101"))
interlocking.signal_controller.signals["0ab8c048-f4ea-4d4d-97cd-510d0b05651f"].state = OccupancyState.FREE
assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY

asyncio.run(interlocking.reset())
route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2")
asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101"))
interlocking.signal_controller.signals["0ab8c048-f4ea-4d4d-97cd-510d0b05651f"].used_by = set()
assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY

# Manipulate state of a segment
asyncio.run(interlocking.reset())
route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2")
asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101"))
interlocking.track_controller.tracks["94742"].segments[0].state = OccupancyState.FREE
assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY

asyncio.run(interlocking.reset())
route_1 = topologyhelper.get_route_by_signal_names(topology, "60BS1", "60BS2")
asyncio.run(interlockinghelper.set_route(interlocking, route_1, True, "RB101"))
interlocking.track_controller.tracks["94742"].segments[0].used_by = set()
assert interlocking.is_route_set(route_1, "RB101") == IsRouteSetResult.ROUTE_NOT_SET_CORRECTLY

def test_reset_route():
topology = topologyhelper.get_topology_from_planpro_file("./complex-example.ppxml")
interlocking = interlockinghelper.get_interlocking(topology)
Expand Down

0 comments on commit 59de8b6

Please sign in to comment.