Skip to content
This repository has been archived by the owner on Oct 28, 2023. It is now read-only.

Commit

Permalink
停止位置の探索を一般化
Browse files Browse the repository at this point in the history
  • Loading branch information
n4o847 committed Oct 8, 2023
1 parent 7506392 commit 71824f3
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 58 deletions.
119 changes: 117 additions & 2 deletions ptcs/ptcs_control/components/train.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
from __future__ import annotations

import math
from collections.abc import Iterable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypeVar

from .base import BaseComponent
from .position import DirectedPosition, UndirectedPosition
from .section import SectionConnection

if TYPE_CHECKING:
from .junction import Junction
from .position import DirectedPosition
from .section import Section
from .sensor_position import SensorPosition
from .stop import Stop


T = TypeVar("T")


@dataclass
class Train(BaseComponent):
"""列車"""
Expand Down Expand Up @@ -54,6 +58,9 @@ def calc_input(self, speed: float) -> int:
else:
return math.floor(self.min_input + (self.max_input - self.min_input) * speed / self.max_speed)

def compute_tail_position(self) -> DirectedPosition:
return self.head_position.get_retracted_position(self.length)

def move_forward_mr(self, motor_rotation: int) -> None:
"""
列車をモータ motor_rotation 回転分だけ進める。
Expand Down Expand Up @@ -245,3 +252,111 @@ def find_forward_stop(self) -> tuple[Stop, float] | None:
return forward_stop, forward_stop_distance
else:
return None

def find_forward_object(
self,
object_position_pairs: Iterable[tuple[T, UndirectedPosition | DirectedPosition]],
) -> tuple[T, float] | None:
"""
指定された列車が次にたどり着く停止位置とそこまでの距離を取得する。
停止位置に到達できない場合は None を返す。
NOTE: `find_forward_train` を一般化したアルゴリズム
"""

section = self.head_position.section

forward_object: T | None = None
forward_object_distance: float = math.inf

# 指定された列車と同一セクションに存在する、
# 指定された列車の前方にある停止位置のうち、最も近いもの`forward_object`を取得

for object, object_position in object_position_pairs:
match object_position:
case UndirectedPosition():
if not (object_position.section == self.head_position.section):
continue
case DirectedPosition():
if not (
object_position.section == self.head_position.section
and object_position.target_junction == self.head_position.target_junction
):
continue

if (
# 端点A(target_junction)<---|object|--train---<端点B
self.head_position.target_junction == section.connected_junctions[SectionConnection.A]
and object_position.mileage <= self.head_position.mileage
) or (
# 端点A>---train--|object|--->端点B(target_junction)
self.head_position.target_junction == section.connected_junctions[SectionConnection.B]
and self.head_position.mileage <= object_position.mileage
):
distance = abs(self.head_position.mileage - object_position.mileage)
if distance < forward_object_distance:
forward_object = object
forward_object_distance = distance

# 指定された列車と同一セクションに停止位置が存在しなければ次のセクションに移り、
# 停止位置が見つかるまで繰り返す

section = self.head_position.section
target_junction = self.head_position.target_junction

if self.head_position.target_junction == section.connected_junctions[SectionConnection.A]:
distance = self.head_position.mileage
elif self.head_position.target_junction == section.connected_junctions[SectionConnection.B]:
distance = section.length - self.head_position.mileage
else:
raise

# 無限ループ検出用
visited: set[tuple[Section, Junction]] = set()

while forward_object is None:
next_section_and_junction = section.get_next_section_and_target_junction_strict(target_junction)

if next_section_and_junction:
# 無限ループを検出したら None を返す
if next_section_and_junction in visited:
return None

visited.add(next_section_and_junction)

section, target_junction = next_section_and_junction

for object, object_position in object_position_pairs:
match object_position:
case UndirectedPosition():
if not (object_position.section == section):
continue
case DirectedPosition():
if not (
object_position.section == section
and object_position.target_junction == target_junction
):
continue

# 端点A(target_junction)<---|object|-----<端点B
if target_junction == section.connected_junctions[SectionConnection.A]:
new_distance = distance + section.length - object_position.mileage
# 端点A>-----|object|--->端点B(target_junction)
elif target_junction == section.connected_junctions[SectionConnection.B]:
new_distance = distance + object_position.mileage
else:
raise

if new_distance < forward_object_distance:
forward_object = object
forward_object_distance = new_distance

else:
break

distance += section.length

# 停止目標を発見できたら、そこまでの距離とともに返す
if forward_object:
return forward_object, forward_object_distance
else:
return None
122 changes: 66 additions & 56 deletions ptcs/ptcs_control/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from .components.junction import Junction, JunctionConnection, PointDirection
from .components.obstacle import Obstacle
from .components.position import DirectedPosition, UndirectedPosition
from .components.section import Section, SectionConnection
from .components.sensor_position import SensorPosition
from .components.station import Station
Expand Down Expand Up @@ -224,6 +225,11 @@ def _calc_speed(self) -> None:
MAX_SPEED: float = 40 # 最高速度[cm/s] NOTE:将来的には車両のパラメータとしてとして定義
MERGIN: float = 10 # 停止余裕距離[cm]

objects: list[tuple[Train, DirectedPosition] | tuple[Obstacle, UndirectedPosition]] = [
*((train, train.compute_tail_position()) for train in self.trains.values()),
*((obstacle, obstacle.position) for obstacle in self.obstacles.values()),
]

for train in self.trains.values():
# [ATP]停止位置までの距離`distance`を、先行列車の位置と、ジャンクションの状態をもとに計算する

Expand All @@ -232,68 +238,72 @@ def _calc_speed(self) -> None:
current_section = train.head_position.section
target_junction = train.head_position.target_junction

while True:
next_section, next_junction = current_section.get_next_section_and_target_junction(target_junction)

# いま見ているセクションが閉鎖 -> 即時停止
# ただしすでに列車が閉鎖セクションに入ってしまった場合は、駅まで動かしたいので、止めない
if current_section != train.head_position.section and current_section.is_blocked is True:
distance += 0
break

# 先行列車に到達できる -> 先行列車の手前で停止
elif forward_train_and_distance := train.find_forward_train():
distance += forward_train_and_distance[1] - MERGIN
break

# 目指すジャンクションが自列車側に開通していない or 次のセクションが閉鎖
# -> 目指すジャンクションの手前で停止
elif (
current_section.get_next_section_and_target_junction_strict(target_junction) is None
or next_section.is_blocked is True
):
if current_section == train.head_position.section:
forward_object_and_distance = train.find_forward_object(objects)
if forward_object_and_distance:
distance += forward_object_and_distance[1] - MERGIN
else:
while True:
next_section, next_junction = current_section.get_next_section_and_target_junction(target_junction)

# いま見ているセクションが閉鎖 -> 即時停止
# ただしすでに列車が閉鎖セクションに入ってしまった場合は、駅まで動かしたいので、止めない
if current_section != train.head_position.section and current_section.is_blocked is True:
distance += 0
break

# 先行列車に到達できる -> 先行列車の手前で停止
elif forward_train_and_distance := train.find_forward_train():
distance += forward_train_and_distance[1] - MERGIN
break

# 目指すジャンクションが自列車側に開通していない or 次のセクションが閉鎖
# -> 目指すジャンクションの手前で停止
elif (
current_section.get_next_section_and_target_junction_strict(target_junction) is None
or next_section.is_blocked is True
):
if current_section == train.head_position.section:
if target_junction == current_section.connected_junctions[SectionConnection.A]:
distance += train.head_position.mileage - MERGIN
elif target_junction == current_section.connected_junctions[SectionConnection.B]:
distance += current_section.length - train.head_position.mileage - MERGIN
else:
raise
else:
distance += current_section.length - MERGIN
break

# 次のセクションが閉鎖 -> 目指すジャンクションの手前で停止
elif next_section.is_blocked is True:
if target_junction == current_section.connected_junctions[SectionConnection.A]:
distance += train.head_position.mileage - MERGIN
distance += train.mileage - MERGIN
elif target_junction == current_section.connected_junctions[SectionConnection.B]:
distance += current_section.length - train.head_position.mileage - MERGIN
distance += current_section.length - train.mileage - MERGIN
else:
raise
else:
distance += current_section.length - MERGIN
break

# 次のセクションが閉鎖 -> 目指すジャンクションの手前で停止
elif next_section.is_blocked is True:
if target_junction == current_section.connected_junctions[SectionConnection.A]:
distance += train.mileage - MERGIN
elif target_junction == current_section.connected_junctions[SectionConnection.B]:
distance += current_section.length - train.mileage - MERGIN
else:
raise
break
break

# 停止条件を満たさなければ次に移る
else:
if current_section == train.head_position.section:
if (
train.head_position.target_junction
== current_section.connected_junctions[SectionConnection.A]
):
distance += train.head_position.mileage
elif (
train.head_position.target_junction
== current_section.connected_junctions[SectionConnection.B]
):
distance += current_section.length - train.head_position.mileage
else:
raise
# 停止条件を満たさなければ次に移る
else:
distance += current_section.length
(
current_section,
target_junction,
) = current_section.get_next_section_and_target_junction(target_junction)
if current_section == train.head_position.section:
if (
train.head_position.target_junction
== current_section.connected_junctions[SectionConnection.A]
):
distance += train.head_position.mileage
elif (
train.head_position.target_junction
== current_section.connected_junctions[SectionConnection.B]
):
distance += current_section.length - train.head_position.mileage
else:
raise
else:
distance += current_section.length
(
current_section,
target_junction,
) = current_section.get_next_section_and_target_junction(target_junction)

if distance < 0:
distance = 0
Expand Down

0 comments on commit 71824f3

Please sign in to comment.